WolVez
08/20/2021, 2:06 PMuser
08/22/2021, 11:06 PMdatajoely
08/23/2021, 8:21 AMjcasanuevam
08/23/2021, 11:11 AMjcasanuevam
08/23/2021, 11:12 AMjcasanuevam
08/23/2021, 11:23 AMjcasanuevam
08/23/2021, 12:03 PMdatajoely
08/23/2021, 12:04 PMdatajoely
08/23/2021, 12:04 PMwaylonwalker
08/28/2021, 5:22 PMkedro pipeline package/pull
. TLDR, how do I update pulled pipelines?
kedro new # proj1
kedro new # proj2
cd proj1
kedro pipeline create newpipe
# make some nodes
kedro pipeline package newpipe
cd src/dist
python -m http.server
# hosting dist directory at 8000
# new terminal
cd proj2
kedro pipeline pull http://localhost:8000/newpipe-0.1-py3-none-any.whl
At this point everything is working. My question is how do I update this pipeline
cd proj1
# update newpipe code
# update version in __init__.py
kedro pipeline package newpipe
cd proj2
kedro pipeline pull http://localhost:8000/newpipe-0.2-py3-none-any.whl
❯ kedro pipeline pull http://localhost:8000/newpipe-0.2-py3-none-any.whl
2021-08-28 12:11:02,473 - root - INFO - Registered CLI hooks from 1 installed plugin(s): kedro-telemetry-0.1.2
2021-08-28 12:11:02,476 - kedro_telemetry.plugin - INFO - You have opted into product usage analytics.
HTTPFileSystem requires "requests" and "aiohttp" to be installed
Trying to use 'pip download'...
/proj2/.venv/bin/python -m pip download --no-deps --dest /tmp/tmpucm6o5r2 http://localhost:8000/newpipe-0.2-py3-none-any.whl
Collecting newpipe==0.2
Downloading http://localhost:8000/newpipe-0.2-py3-none-any.whl (6.0 kB)
Saved /tmp/tmpucm6o5r2/newpipe-0.2-py3-none-any.whl
Successfully downloaded newpipe
Creating `/proj2/conf/base/parameters`:
Creating `/proj2/conf/base/parameters/newpipe.yml`: SKIPPED (already exists)
Creating `/proj2/src/tests/pipelines/newpipe/__init__.py`: SKIPPED (already exists)
Creating `/proj2/src/tests/pipelines/newpipe/test_pipeline.py`: SKIPPED (already exists)
Creating `/proj2/src/proj2/pipelines/newpipe/nodes.py`: SKIPPED (already exists)
Creating `/proj2/src/proj2/pipelines/newpipe/README.md`: SKIPPED (already exists)
Creating `/proj2/src/proj2/pipelines/newpipe/__init__.py`: SKIPPED (already exists)
Creating `/proj2/src/proj2/pipelines/newpipe/pipeline.py`: SKIPPED (already exists)
Arnaldo
08/30/2021, 1:44 PMkedro pipeline pull
, @UserArnaldo
08/30/2021, 1:44 PMArnaldo
08/30/2021, 2:04 PMDeutöic
09/03/2021, 1:15 PMDeutöic
09/03/2021, 1:15 PMdatajoely
09/03/2021, 1:15 PMdatajoely
09/03/2021, 1:21 PMende
09/04/2021, 9:41 PMdatajoely
09/05/2021, 11:19 AMMalaguth
09/05/2021, 11:59 PMdatajoely
09/06/2021, 8:10 AMdatajoely
09/06/2021, 8:10 AMMalaguth
09/06/2021, 1:13 PMMalaguth
09/06/2021, 1:38 PMdatajoely
09/06/2021, 1:38 PMMalaguth
09/06/2021, 1:38 PMdatajoely
09/06/2021, 1:38 PMMalaguth
09/06/2021, 1:39 PMdatajoely
09/06/2021, 1:40 PMMalaguth
09/06/2021, 1:48 PMMalaguth
09/06/2021, 1:48 PMuser
09/07/2021, 12:25 PMdatajoely
09/07/2021, 1:51 PM"kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet". (The code works if I change the type for pandas with a small part of the data)
Malaguth
09/07/2021, 1:54 PMdatajoely
09/07/2021, 1:54 PMMalaguth
09/07/2021, 2:11 PMsh
2021-09-07 11:10:58,913 - kedro.runner.sequential_runner - WARNING - There are 1 nodes that have not run.
You can resume the pipeline run by adding the following argument to your previous command:
--from-nodes "format_als_recomendations"
2021-09-07 11:10:59,449 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
Traceback (most recent call last):
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/bin/kedro", line 8, in <module>
sys.exit(main())
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 265, in main
cli_collection()
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 210, in main
super().main(
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/malaguth/Workspace/Git/recommendation-engine/src/martins_advisor/cli.py", line 160, in run
session.run(
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/session/session.py", line 408, in run
run_result = runner.run(filtered_pipeline, catalog, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 106, in run
self._run(pipeline, catalog, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/sequential_runner.py", line 90, in _run
run_node(node, catalog, self._is_async, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 218, in run_node
node = _run_node_sequential(node, catalog, run_id)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 313, in _run_node_sequential
catalog.save(name, data)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/data_catalog.py", line 449, in save
func(data)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 636, in save
load_version = self.resolve_load_version()
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 575, in resolve_load_version
return self._fetch_latest_load_version()
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/cachetools/decorators.py", line 73, in wrapper
v = method(self, *args, **kwargs)
File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 558, in _fetch_latest_load_version
raise VersionNotFoundError(f"Did not find any versions for {self}")
kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet(file_format=csv, filepath=/mnt/advisor-dev/data/07_model_output/als/recommendation, load_args={}, save_args={}, version=Version(load=None, save='2021-09-07T14.07.50.215Z'))
datajoely
09/08/2021, 8:16 AMMalaguth
09/08/2021, 1:54 PMantony.milne
09/09/2021, 8:21 AMkedro run
command you're using? 🙂Malaguth
09/11/2021, 9:12 PMyaml
_spark: &spark_parquet
type: spark.SparkDataSet
file_format: "parquet"
_spark_overwrite: &spark_overwrite
<<: *spark_parquet
save_args:
mode: "overwrite"
SaleRefinedData:
<<: *spark_overwrite
filepath: ${spark_prefix}/data/03_refined/sale/
layer: refined
SaleFeatureData:
<<: *spark_parquet
filepath: ${spark_prefix}/data/04_feature/sale/
versioned: true
layer: feature
Node:
python
def aggregate_sale_ratings(sdf: spark.DataFrame) -> spark.DataFrame:
window = (
Window
.partitionBy(SaleDataModel.SKU, SaleDataModel.CLIENT_ID)
.orderBy(SaleDataModel.DATE)
)
sdf = (
sdf
.withColumn(SaleFeatures.COUNT_RATING, F.dense_rank().over(window))
.withColumn(SaleFeatures.BINARY_RATING, F.lit(1))
)
return sdf
Command:
bash
kedro run --pipeline sale_refined_to_feature --env dev
Note:
I use Databricks Connect to run the pipeline remote on an Azure Databricks cluster.