Apoorva
04/21/2022, 1:24 PMclass HistogramDataSet(AbstractVersionedDataSet):
def __init__(self, filepath: str, version: Version = None, credentials: Dict[str, Any] = None):
_credentials = deepcopy(credentials) or {}
protocol, path = get_protocol_and_path(filepath)
self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials)
super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob, )
def _load(self):
load_path = get_filepath_str(self._filepath, self._protocol)
log.info(f'load_path: {load_path}')
try:
with self._fs.open(load_path) as f:
return json.load(f)
except FileNotFoundError:
return None
def _save(self, data) -> None:
"""Saves data to the specified filepath."""
save_path = get_filepath_str(self._filepath, self._protocol)
with self._fs.open(save_path, mode="w") as f:
json.dump(data, f, default=dumper)
self._invalidate_cache()
for versioning I am using kedro functionality