Dhaval
02/17/2022, 9:01 PMkedro.io.core.DataSetError: Save path `/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl/2022-02-17T20.20.56.877Z/Master_table.pkl` for PickleDataSet(backend=<module 'pickle' from '/home/thakkar/anaconda3/envs/kedro_project/lib/python3.8/pickle.py'>, filepath=/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl, load_args={}, protocol=file, save_args={}, version=Version(load=None, save='2022-02-17T20.20.56.877Z')) must not exist if versioning is enabled.
I am currently using the example code for prefect from the kedro tutorials and there's this weird bug that I came across. Whenever I register a flow, when the folders in the data
folder are empty, the first run works completely fine but when the same flow is run again, it gives the error.
PS: I have enabled versioned=True for this dataset. Ideally every run should have it's own timestamped folder and then the pkl/csv file but that's not the case when working with Prefect. I don't know what is going on under the hood over there so I could really appreciate some helpavan-sh
02/18/2022, 8:59 AMregister_prefect_flow.py
that the kedro session is initiated first, and will be reused for every kedro run.
I don't have a solution yet, but looking at kedro-airflow
to see how it's handling sessions between different runs.datajoely
02/18/2022, 9:50 AMavan-sh
02/19/2022, 5:01 AMSESSION_STORE_CLASS
as ShelveStore.
2. All the catalog entries needs to be written back to disk.
You can find a working setup here(https://github.com/avan-sh/spaceflights-prefect) using the prefect_flow.py
file.
I don't think this is the best solution, but works at least.datajoely
02/19/2022, 9:47 AMDhaval
02/19/2022, 8:09 PMTraceback (most recent call last):
File "/home/thakkar/anaconda3/envs/ciena/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/thakkar/anaconda3/envs/ciena/lib/python3.8/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "register_prefect_flow.py", line 66, in run
session.run(self.pipeline_name, node_names=[self.node_name])
File "/home/thakkar/anaconda3/envs/ciena/lib/python3.8/site-packages/kedro/framework/session/session.py", line 338, in run
save_version = run_id = self.store["session_id"]
KeyError: 'session_id'
I didn't understand the first step 1. You'll have to set SESSION_STORE_CLASS as ShelveStore
How do I set this up?avan-sh
02/20/2022, 6:24 PMDhaval
02/20/2022, 8:32 PM-p
, --pipeline
command on the terminal. What should I do?avan-sh
02/21/2022, 1:20 AMDhaval
02/21/2022, 11:57 AMdatajoely
02/21/2022, 12:00 PMavan-sh
02/21/2022, 12:22 PMfrom kedro.framework.project import pipelines
from pathlib import Path
from kedro.framework.startup import bootstrap_project
project_path = Path.cwd()
metadata = bootstrap_project(project_path)
pipeline_names = ["__default__", "dp", "ds"]
for pipeline_name in pipeline_names:
print(f"PIPELINE - {pipeline_name}")
pipeline = pipelines.get(pipeline_name)
for node, parent_nodes in pipeline.node_dependencies.items():
print(f"|-{node.name}")
for parent in parent_nodes:
print(f"|-------{parent.name}")
print("")
Dhaval
02/21/2022, 2:02 PMValueError: Pipeline input(s) {'data_joins.join_1.left_preprocessed', 'data_joins.join_1.right_preprocessed'} not found in the DataCatalog
This runs perfectly fine when using kedro run --pipeline "Data Join"
and also with the default prefect python file shared on the docs. The modification you suggested has this issue as of nowavan-sh
02/21/2022, 2:11 PMDhaval
02/21/2022, 2:21 PMdatajoely
02/21/2022, 2:25 PMDhaval
02/21/2022, 2:34 PMdatajoely
02/21/2022, 2:37 PMDhaval
02/21/2022, 2:38 PMkedro.io.core.DataSetError: Save path `/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl/2022-02-17T20.20.56.877Z/Master_table.pkl` for PickleDataSet(backend=<module 'pickle' from '/home/thakkar/anaconda3/envs/kedro_project/lib/python3.8/pickle.py'>, filepath=/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl, load_args={}, protocol=file, save_args={}, version=Version(load=None, save='2022-02-17T20.20.56.877Z')) must not exist if versioning is enabled.
What @User and I suspect is happening behind the scenes is that the session is initialized only once for the prefect flow and it remains static whereas it should be dynamic to save the versioned datasets.
So for example, when the flow was created the timestamp was 2022-02-20 10am. This flow when ran at 11 am, creates a folder inside the data/02_intermediate/sample.pkl/2022-02-20 10am/sample.pkl
Note: The flow was ran at 11 am but still it creates a folder for 10 am
Now when the 2nd flow is scheduled, the timestamp is 12pm but the execution fails because the flow runs with the same timestamp, which is 10 am. This fails because the 10 am folder is already present as explained in the error above
Hope this clears the issuedatajoely
02/21/2022, 2:58 PMDhaval
02/22/2022, 4:43 AMavan-sh
02/22/2022, 9:05 AM