Yes. I already wipe the directory and run with and...
# beginners-need-help
m
Yes. I already wipe the directory and run with and without the versioned dataset. (1) With the versioned Spark Data Set, create the timestamp folder and written data on the folder but result in the exception (2) Without a versioned Spark Data Set, everything works as expected Versioned dataset: Alsformattedmodeloutput
u
Hi @User could you paste your exception here please?
d
This was the original error
Copy code
"kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet". (The code works if I change the type for pandas with a small part of the data)
m
I'll run the pipeline and send the full exception
d
Thank you
m
Copy code
sh
2021-09-07 11:10:58,913 - kedro.runner.sequential_runner - WARNING - There are 1 nodes that have not run.
You can resume the pipeline run by adding the following argument to your previous command:
  --from-nodes "format_als_recomendations"
2021-09-07 11:10:59,449 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
Traceback (most recent call last):
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 265, in main
    cli_collection()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 210, in main
    super().main(
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/malaguth/Workspace/Git/recommendation-engine/src/martins_advisor/cli.py", line 160, in run
    session.run(
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/session/session.py", line 408, in run
    run_result = runner.run(filtered_pipeline, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 106, in run
    self._run(pipeline, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/sequential_runner.py", line 90, in _run
    run_node(node, catalog, self._is_async, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 218, in run_node
    node = _run_node_sequential(node, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 313, in _run_node_sequential
    catalog.save(name, data)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/data_catalog.py", line 449, in save
    func(data)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 636, in save
    load_version = self.resolve_load_version()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 575, in resolve_load_version
    return self._fetch_latest_load_version()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/cachetools/decorators.py", line 73, in wrapper
    v = method(self, *args, **kwargs)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 558, in _fetch_latest_load_version
    raise VersionNotFoundError(f"Did not find any versions for {self}")
kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet(file_format=csv, filepath=/mnt/advisor-dev/data/07_model_output/als/recommendation, load_args={}, save_args={}, version=Version(load=None, save='2021-09-07T14.07.50.215Z'))
Despite the error, the pipeline creates the files on blob storage
Let me know if I can help with anything
d
Thanks @User we're still looking into this
m
Ok, thanks. Let me know if I can help
a
Hi @User, please could you post the relevant part of your catalog.yml file and the
kedro run
command you're using? 🙂
m
Hi @User, sorry for the delay. I simulate the error in a single node pipeline. Catalog:
Copy code
yaml
_spark: &spark_parquet
  type: spark.SparkDataSet
  file_format: "parquet"

_spark_overwrite: &spark_overwrite
  <<: *spark_parquet
  save_args:
    mode: "overwrite"

SaleRefinedData:
  <<: *spark_overwrite
  filepath: ${spark_prefix}/data/03_refined/sale/
  layer: refined

SaleFeatureData:
  <<: *spark_parquet
  filepath: ${spark_prefix}/data/04_feature/sale/
  versioned: true
  layer: feature
Node:
Copy code
python
def aggregate_sale_ratings(sdf: spark.DataFrame) -> spark.DataFrame:
    window = (
        Window
        .partitionBy(SaleDataModel.SKU, SaleDataModel.CLIENT_ID)
        .orderBy(SaleDataModel.DATE)
    )

    sdf = (
        sdf
        .withColumn(SaleFeatures.COUNT_RATING, F.dense_rank().over(window))
        .withColumn(SaleFeatures.BINARY_RATING, F.lit(1))
    )

    return sdf
Command:
Copy code
bash
kedro run --pipeline sale_refined_to_feature --env dev
Note: I use Databricks Connect to run the pipeline remote on an Azure Databricks cluster.
Pipeline:
22 Views