1919import subprocess
2020import time
2121from abc import ABC , abstractmethod
22+ from enum import Enum
2223from typing import Optional , Dict , Any
2324
2425from planet_auth .auth_exception import AuthException
@@ -96,9 +97,15 @@ def _default_storage_provider():
9697class _SOPSAwareFilesystemObjectStorageProvider (ObjectStorageProvider ):
9798 """
9899 Storage provider geared around backing a single object in a single file
99- with paths take from the root of the local file system.
100+ with paths taken from the root of the local file system.
100101 """
101102
103+ _STORAGE_TYPE_KEY = "___SOPSAwareFilesystemObjectStorageProvider__storage_type"
104+
105+ class _StorageType (Enum ):
106+ PLAINTEXT = "plaintext"
107+ SOPS = "sops"
108+
102109 def __init__ (self , root : Optional [pathlib .Path ] = None ):
103110 if root :
104111 self ._storage_root = root
@@ -116,13 +123,34 @@ def _obj_filepath(self, obj_key):
116123 return obj_path
117124
118125 @staticmethod
119- def _is_sops_path (file_path ) :
126+ def _is_sops_path (file_path : pathlib . Path ) -> bool :
120127 # TODO: Could be ".json.sops", or ".sops.json", depending on file
121128 # level or field level encryption, respectively. We currently
122129 # only look for and support field level encryption in json
123130 # files with a ".sops.json" suffix.
124131 return bool (file_path .suffixes == [".sops" , ".json" ])
125132
133+ # @staticmethod
134+ # def _convert_to_sops_path(file_path: pathlib.Path) -> pathlib.Path:
135+ # if _SOPSAwareFilesystemObjectStorageProvider._is_sops_path(file_path):
136+ # return file_path
137+ # else:
138+ # if bool(file_path.suffix == ".json"):
139+ # return file_path.with_name(file_path.stem + ".sops" + file_path.suffix)
140+ # else:
141+ # return file_path.with_suffix(file_path.suffix + ".sops.json")
142+
143+ @staticmethod
144+ def _filter_write_object (data : dict ) -> dict :
145+ # TODO: consider making this part of the base class?
146+ final_data = {
147+ k : v
148+ for k , v in data .items ()
149+ # if k not in [_SOPSAwareFilesystemObjectStorageProvider._STORAGE_TYPE_KEY]
150+ if not (isinstance (k , str ) and k .startswith ("__" ))
151+ }
152+ return final_data
153+
126154 @staticmethod
127155 def _read_json (file_path : pathlib .Path ):
128156 auth_logger .debug (msg = "Loading JSON data from file {}" .format (file_path ))
@@ -137,7 +165,7 @@ def _read_json_sops(file_path: pathlib.Path):
137165
138166 @staticmethod
139167 def _write_json (file_path : pathlib .Path , data : dict ):
140- auth_logger .debug (msg = "Writing JSON data to file {}" .format (file_path ))
168+ auth_logger .debug (msg = "Writing JSON data to cleartext file {}" .format (file_path ))
141169 with open (file_path , mode = "w" , encoding = "UTF-8" ) as file_w :
142170 os .chmod (file_path , stat .S_IREAD | stat .S_IWRITE )
143171 _no_none_data = {key : value for key , value in data .items () if value is not None }
@@ -155,26 +183,53 @@ def _write_json_sops(file_path: pathlib.Path, data: dict):
155183 # ['sops', '-e', '--input-type', 'json', '--output-type',
156184 # 'json', '--output', file_path, '/dev/stdin'],
157185 # stdin=data_f)
158- auth_logger .debug (msg = "Writing JSON data to SOPS encrypted file {}" .format (file_path ))
159186 _SOPSAwareFilesystemObjectStorageProvider ._write_json (file_path , data )
187+ auth_logger .debug (msg = "Writing JSON data to SOPS encrypted file {}" .format (file_path ))
160188 subprocess .check_call (["sops" , "-e" , "--input-type" , "json" , "--output-type" , "json" , "-i" , file_path ])
161189
162190 @staticmethod
163191 def _load_file (file_path : pathlib .Path ) -> dict :
164192 if _SOPSAwareFilesystemObjectStorageProvider ._is_sops_path (file_path ):
165193 new_data = _SOPSAwareFilesystemObjectStorageProvider ._read_json_sops (file_path )
194+ new_data [_SOPSAwareFilesystemObjectStorageProvider ._STORAGE_TYPE_KEY ] = (
195+ _SOPSAwareFilesystemObjectStorageProvider ._StorageType .SOPS .value
196+ )
166197 else :
167198 new_data = _SOPSAwareFilesystemObjectStorageProvider ._read_json (file_path )
199+ new_data [_SOPSAwareFilesystemObjectStorageProvider ._STORAGE_TYPE_KEY ] = (
200+ _SOPSAwareFilesystemObjectStorageProvider ._StorageType .PLAINTEXT .value
201+ )
168202
169203 return new_data
170204
205+ @staticmethod
206+ def _do_sops (file_path : pathlib .Path , data : dict ) -> bool :
207+ if _SOPSAwareFilesystemObjectStorageProvider ._is_sops_path (file_path ):
208+ return True
209+ if (
210+ data
211+ and data .get (_SOPSAwareFilesystemObjectStorageProvider ._STORAGE_TYPE_KEY )
212+ == _SOPSAwareFilesystemObjectStorageProvider ._StorageType .SOPS .value
213+ ):
214+ auth_logger .warning (msg = f"Data sourced from SOPS being written cleartext to the file { file_path } ." )
215+ # Upgrading to SOPS would be great, but also problematic.
216+ # return True
217+
218+ return False
219+
171220 @staticmethod
172221 def _save_file (file_path : pathlib .Path , data : dict ):
173222 file_path .parent .mkdir (parents = True , exist_ok = True )
174- if _SOPSAwareFilesystemObjectStorageProvider ._is_sops_path (file_path ):
175- _SOPSAwareFilesystemObjectStorageProvider ._write_json_sops (file_path , data )
223+ do_sops = _SOPSAwareFilesystemObjectStorageProvider ._do_sops (file_path , data )
224+ write_data = _SOPSAwareFilesystemObjectStorageProvider ._filter_write_object (data )
225+
226+ if do_sops :
227+ # This has to be with the caller, otherwise the caller would not know
228+ # where we actually wrote the data and would likely not be able to find it again.
229+ # sops_file_path = _SOPSAwareFilesystemObjectStorageProvider._convert_to_sops_path(file_path)
230+ _SOPSAwareFilesystemObjectStorageProvider ._write_json_sops (file_path , write_data )
176231 else :
177- _SOPSAwareFilesystemObjectStorageProvider ._write_json (file_path , data )
232+ _SOPSAwareFilesystemObjectStorageProvider ._write_json (file_path , write_data )
178233
179234 def load_obj (self , key : ObjectStorageProvider_KeyType ) -> dict :
180235 obj_filepath = self ._obj_filepath (key )
0 commit comments