```kedro.io.core.DataSetError: Save path `/home/th...
# beginners-need-help
d
Copy code
kedro.io.core.DataSetError: Save path `/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl/2022-02-17T20.20.56.877Z/Master_table.pkl` for PickleDataSet(backend=<module 'pickle' from '/home/thakkar/anaconda3/envs/kedro_project/lib/python3.8/pickle.py'>, filepath=/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl, load_args={}, protocol=file, save_args={}, version=Version(load=None, save='2022-02-17T20.20.56.877Z')) must not exist if versioning is enabled.
I am currently using the example code for prefect from the kedro tutorials and there's this weird bug that I came across. Whenever I register a flow, when the folders in the
data
folder are empty, the first run works completely fine but when the same flow is run again, it gives the error. PS: I have enabled versioned=True for this dataset. Ideally every run should have it's own timestamped folder and then the pkl/csv file but that's not the case when working with Prefect. I don't know what is going on under the hood over there so I could really appreciate some help
a
Hi @User, looked into the issue. I think it's down to how prefect runs are being handled. I see from
register_prefect_flow.py
that the kedro session is initiated first, and will be reused for every kedro run. I don't have a solution yet, but looking at
kedro-airflow
to see how it's handling sessions between different runs.
d
this looks like a correct diagnosis - let me check with the team what a solution could look like
a
@Dhaval, I took a crack at adding a prefect flow. I tested it in local and works fine. There's a few changes you'll need to do to your project. 1. You'll have to set
SESSION_STORE_CLASS
as ShelveStore. 2. All the catalog entries needs to be written back to disk. You can find a working setup here(https://github.com/avan-sh/spaceflights-prefect) using the
prefect_flow.py
file. I don't think this is the best solution, but works at least.
d
You absolute star! Thank you for suggestion a solution
d
Thanks a lot for the solution, I'm going to have a look at it tomorrow 😊
@User Tried for 2 different projects but the errors remained the same:
Copy code
Traceback (most recent call last):
  File "/home/thakkar/anaconda3/envs/ciena/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/thakkar/anaconda3/envs/ciena/lib/python3.8/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "register_prefect_flow.py", line 66, in run
    session.run(self.pipeline_name, node_names=[self.node_name])
  File "/home/thakkar/anaconda3/envs/ciena/lib/python3.8/site-packages/kedro/framework/session/session.py", line 338, in run
    save_version = run_id = self.store["session_id"]
KeyError: 'session_id'
I didn't understand the first step
1. You'll have to set SESSION_STORE_CLASS as ShelveStore
How do I set this up?
a
My bad, you should change the settings file if using 0.17.6. Take a look at the changes I made in this commit https://github.com/avan-sh/spaceflights-prefect/commit/181984fe06669012c02963f999e311e90eaeaf5c
d
@User I am unable to create flows for specific pipelines. This currently takes in all the nodes as it is even after passing the pipeline name through
-p
,
--pipeline
command on the terminal. What should I do?
a
@User , that is strange. Can you test with the latest change I made, I changed the script so that each pipeline would have it's own flow name.
d
Still doesn't work. It is taking up all the tasks from all pipelines
d
I'm not super familiar with Prefect
can you run with breakpoints?
a
Yes, you can also cross check if all pipelines are returning diff things?
Copy code
from kedro.framework.project import pipelines
from pathlib import Path
from kedro.framework.startup import bootstrap_project

project_path = Path.cwd()
metadata = bootstrap_project(project_path)
pipeline_names = ["__default__", "dp", "ds"]
for pipeline_name in pipeline_names:
    print(f"PIPELINE - {pipeline_name}")
    pipeline = pipelines.get(pipeline_name)
    for node, parent_nodes in pipeline.node_dependencies.items():
        print(f"|-{node.name}")
        for parent in parent_nodes:
            print(f"|-------{parent.name}")
    print("")
d
@User sorry my bad. The tasks in the default pipeline were being added to the new pipeline too. Now the issue that I have is this, the data which is present in the memorydataset while executing a pipeline is not available for processing.
ValueError: Pipeline input(s) {'data_joins.join_1.left_preprocessed', 'data_joins.join_1.right_preprocessed'} not found in the DataCatalog
This runs perfectly fine when using
kedro run --pipeline "Data Join"
and also with the default prefect python file shared on the docs. The modification you suggested has this issue as of now
I'm guessing this is happening because MemoryDataSet is not being used
a
Yes, you'll need to add all your catalog items to conf, now each node will read from disk on every run. That's the reason I mentioned its not the best solution
d
@datajoely any help on this front?
d
The easiest solution is to provide catalog entires for every item
and then remove the ones you don't need carefully
d
Since these are not declared and used by modular pipelines, it'd be really difficult to write those entries
The code given here: https://kedro.readthedocs.io/en/stable/10_deployment/05_prefect.html works with datasets present in MemoryDataSet. The issue with this code is that it is not able to save the versioned datasets because the session is only initiated once when the run is created and when it is rerun, it throws an error. @User Should I go ahead and create an issue on github for this?
d
I'm not sure I understand the issue entirely
is it only when versioning is turned on?
it works fine without?
d
Yes, it causes issues when versioning is turned on to save the datasets when the pipeline is run
It works completely fine without it
I'll write the issue properly over here for you to go through. The code available here https://kedro.readthedocs.io/en/stable/10_deployment/05_prefect.html works fine with when versioning is not enabled for any datasets in the catalog.yml file. When the flow is registered for the first time, it takes the timestamp from when the session was created for the prefect flow. When this prefect flow is scheduled to run, it works completely fine. But when this flow is scheduled to run for the 2nd time, it gives an error
Copy code
kedro.io.core.DataSetError: Save path `/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl/2022-02-17T20.20.56.877Z/Master_table.pkl` for PickleDataSet(backend=<module 'pickle' from '/home/thakkar/anaconda3/envs/kedro_project/lib/python3.8/pickle.py'>, filepath=/home/thakkar/Work/kedro_project/data/03_primary/Master_table.pkl, load_args={}, protocol=file, save_args={}, version=Version(load=None, save='2022-02-17T20.20.56.877Z')) must not exist if versioning is enabled.
What @User and I suspect is happening behind the scenes is that the session is initialized only once for the prefect flow and it remains static whereas it should be dynamic to save the versioned datasets. So for example, when the flow was created the timestamp was 2022-02-20 10am. This flow when ran at 11 am, creates a folder inside the data/02_intermediate/sample.pkl/2022-02-20 10am/sample.pkl Note: The flow was ran at 11 am but still it creates a folder for 10 am Now when the 2nd flow is scheduled, the timestamp is 12pm but the execution fails because the flow runs with the same timestamp, which is 10 am. This fails because the 10 am folder is already present as explained in the error above Hope this clears the issue
d
Yes it does somewhat
thank you
d
Any help on this front?
a
Sorry, I thought you got it working. Another option is for you to break your pipeline into sections that make sense to write out to disk. As discussed here: https://github.com/Galileo-Galilei/kedro-mlflow/issues/44#issuecomment-772076223, running individual nodes as tasks is not the best pattern to follow