Title
#beginners-need-help
Malaguth

Malaguth

09/06/2021, 1:48 PM
Yes. I already wipe the directory and run with and without the versioned dataset. (1) With the versioned Spark Data Set, create the timestamp folder and written data on the folder but result in the exception (2) Without a versioned Spark Data Set, everything works as expected Versioned dataset: Alsformattedmodeloutput
u

user

09/07/2021, 12:25 PM
Hi @User could you paste your exception here please?
datajoely

datajoely

09/07/2021, 1:51 PM
This was the original error
"kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet". (The code works if I change the type for pandas with a small part of the data)
Malaguth

Malaguth

09/07/2021, 1:54 PM
I'll run the pipeline and send the full exception
datajoely

datajoely

09/07/2021, 1:54 PM
Thank you
Malaguth

Malaguth

09/07/2021, 2:11 PM
sh
2021-09-07 11:10:58,913 - kedro.runner.sequential_runner - WARNING - There are 1 nodes that have not run.
You can resume the pipeline run by adding the following argument to your previous command:
  --from-nodes "format_als_recomendations"
2021-09-07 11:10:59,449 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
Traceback (most recent call last):
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 265, in main
    cli_collection()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 210, in main
    super().main(
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/malaguth/Workspace/Git/recommendation-engine/src/martins_advisor/cli.py", line 160, in run
    session.run(
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/session/session.py", line 408, in run
    run_result = runner.run(filtered_pipeline, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 106, in run
    self._run(pipeline, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/sequential_runner.py", line 90, in _run
    run_node(node, catalog, self._is_async, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 218, in run_node
    node = _run_node_sequential(node, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 313, in _run_node_sequential
    catalog.save(name, data)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/data_catalog.py", line 449, in save
    func(data)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 636, in save
    load_version = self.resolve_load_version()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 575, in resolve_load_version
    return self._fetch_latest_load_version()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/cachetools/decorators.py", line 73, in wrapper
    v = method(self, *args, **kwargs)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 558, in _fetch_latest_load_version
    raise VersionNotFoundError(f"Did not find any versions for {self}")
kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet(file_format=csv, filepath=/mnt/advisor-dev/data/07_model_output/als/recommendation, load_args={}, save_args={}, version=Version(load=None, save='2021-09-07T14.07.50.215Z'))
2:16 PM
Despite the error, the pipeline creates the files on blob storage
3:49 PM
Let me know if I can help with anything
datajoely

datajoely

09/08/2021, 8:16 AM
Thanks @User we're still looking into this
Malaguth

Malaguth

09/08/2021, 1:54 PM
Ok, thanks. Let me know if I can help
antony.milne

antony.milne

09/09/2021, 8:21 AM
Hi @User, please could you post the relevant part of your catalog.yml file and the
kedro run
command you're using? 🙂
Malaguth

Malaguth

09/11/2021, 9:12 PM
Hi @User, sorry for the delay. I simulate the error in a single node pipeline. Catalog:
yaml
_spark: &spark_parquet
  type: spark.SparkDataSet
  file_format: "parquet"

_spark_overwrite: &spark_overwrite
  <<: *spark_parquet
  save_args:
    mode: "overwrite"

SaleRefinedData:
  <<: *spark_overwrite
  filepath: ${spark_prefix}/data/03_refined/sale/
  layer: refined

SaleFeatureData:
  <<: *spark_parquet
  filepath: ${spark_prefix}/data/04_feature/sale/
  versioned: true
  layer: feature
Node:
python
def aggregate_sale_ratings(sdf: spark.DataFrame) -> spark.DataFrame:
    window = (
        Window
        .partitionBy(SaleDataModel.SKU, SaleDataModel.CLIENT_ID)
        .orderBy(SaleDataModel.DATE)
    )

    sdf = (
        sdf
        .withColumn(SaleFeatures.COUNT_RATING, F.dense_rank().over(window))
        .withColumn(SaleFeatures.BINARY_RATING, F.lit(1))
    )

    return sdf
Command:
bash
kedro run --pipeline sale_refined_to_feature --env dev
Note: I use Databricks Connect to run the pipeline remote on an Azure Databricks cluster.
9:13 PM
Pipeline: