Malaguth
09/06/2021, 1:48 PMuser
09/07/2021, 12:25 PMdatajoely
09/07/2021, 1:51 PM"kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet". (The code works if I change the type for pandas with a small part of the data)
Malaguth
09/07/2021, 1:54 PMdatajoely
09/07/2021, 1:54 PMMalaguth
09/07/2021, 2:11 PMsh
2021-09-07 11:10:58,913 - kedro.runner.sequential_runner - WARNING - There are 1 nodes that have not run.
You can resume the pipeline run by adding the following argument to your previous command:
--from-nodes "format_als_recomendations"
2021-09-07 11:10:59,449 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
Traceback (most recent call last):
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/bin/kedro", line 8, in <module>
sys.exit(main())
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 265, in main
cli_collection()
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 210, in main
super().main(
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/malaguth/Workspace/Git/recommendation-engine/src/martins_advisor/cli.py", line 160, in run
session.run(
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/session/session.py", line 408, in run
run_result = runner.run(filtered_pipeline, catalog, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 106, in run
self._run(pipeline, catalog, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/sequential_runner.py", line 90, in _run
run_node(node, catalog, self._is_async, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 218, in run_node
node = _run_node_sequential(node, catalog, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 313, in _run_node_sequential
catalog.save(name, data)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/data_catalog.py", line 449, in save
func(data)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 636, in save
load_version = self.resolve_load_version()
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 575, in resolve_load_version
return self._fetch_latest_load_version()
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/cachetools/decorators.py", line 73, in wrapper
v = method(self, *args, **kwargs)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 558, in _fetch_latest_load_version
raise VersionNotFoundError(f"Did not find any versions for {self}")
kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet(file_format=csv, filepath=/mnt/advisor-dev/data/07_model_output/als/recommendation, load_args={}, save_args={}, version=Version(load=None, save='2021-09-07T14.07.50.215Z'))
datajoely
09/08/2021, 8:16 AMMalaguth
09/08/2021, 1:54 PMantony.milne
09/09/2021, 8:21 AMkedro run
command you're using? 🙂Malaguth
09/11/2021, 9:12 PMyaml
_spark: &spark_parquet
type: spark.SparkDataSet
file_format: "parquet"
_spark_overwrite: &spark_overwrite
<<: *spark_parquet
save_args:
mode: "overwrite"
SaleRefinedData:
<<: *spark_overwrite
filepath: ${spark_prefix}/data/03_refined/sale/
layer: refined
SaleFeatureData:
<<: *spark_parquet
filepath: ${spark_prefix}/data/04_feature/sale/
versioned: true
layer: feature
Node:
python
def aggregate_sale_ratings(sdf: spark.DataFrame) -> spark.DataFrame:
window = (
Window
.partitionBy(SaleDataModel.SKU, SaleDataModel.CLIENT_ID)
.orderBy(SaleDataModel.DATE)
)
sdf = (
sdf
.withColumn(SaleFeatures.COUNT_RATING, F.dense_rank().over(window))
.withColumn(SaleFeatures.BINARY_RATING, F.lit(1))
)
return sdf
Command:
bash
kedro run --pipeline sale_refined_to_feature --env dev
Note:
I use Databricks Connect to run the pipeline remote on an Azure Databricks cluster.