https://kedro.org/ logo
Join the conversationJoin Discord
Channels
advanced-need-help
announcements
beginners-need-help
introductions
job-posting
plugins-integrations
random
resources
welcome
Powered by Linen
beginners-need-help
  • w

    WolVez

    08/20/2021, 2:06 PM
    Dynamic Pipelines.
  • u

    user

    08/22/2021, 11:06 PM
    Hi, I am very new to this. I might be missing something but it seems we can only input as raw data file by file. In the catalog, each entry seems to be only one file. However, my raw data is an entire directory from which I need to load data individually. I was wondering if there was a way to pass into a pipeline a directory as inputs instead of specific catalog entry that are each related to a file? Can we have in the data catalog a directory instead of a file? Sorry if this seems completely obvious...
    d
    • 1
    • 14
  • d

    datajoely

    08/23/2021, 8:21 AM
    Catalog loading by directory
  • j

    jcasanuevam

    08/23/2021, 11:11 AM
    Hi! I'm running a kedro project inside a Docker container (previously I've created the DockerFile of the project and built the image) just for experimenting. My project is using mlflow plugin and the thing is, when I run 'kedro run' inside the docker container I get the following error. All previous nodes run well but the execution stops once it reaches the node where the mlflow artifacts are saved
  • j

    jcasanuevam

    08/23/2021, 11:12 AM
    It seems like it is trying to access to the C:/ disk, but I dont know why and where it is configured to act like this.
  • j

    jcasanuevam

    08/23/2021, 11:23 AM
    BTW, this is my catalog:
  • j

    jcasanuevam

    08/23/2021, 12:03 PM
    SOLVED! I've mounted the image with data of previous experiments in the /data directory. It seems to cause the issue. If the /data directory is empty of files (except the files of the 01_raw directory) it runs with no problems.
  • d

    datajoely

    08/23/2021, 12:04 PM
    Amazing 🎉
  • d

    datajoely

    08/23/2021, 12:04 PM
    well done!
  • w

    waylonwalker

    08/28/2021, 5:22 PM
    I am playing with
    kedro pipeline package/pull
    . TLDR, how do I update pulled pipelines?
    kedro new # proj1
    kedro new # proj2
    
    cd proj1
    kedro pipeline create newpipe
    # make some nodes
    kedro pipeline package newpipe
    cd src/dist
    python -m http.server
    # hosting dist directory at 8000
    
    # new terminal
    cd proj2
    kedro pipeline pull http://localhost:8000/newpipe-0.1-py3-none-any.whl
    At this point everything is working. My question is how do I update this pipeline
    cd proj1
    # update newpipe code
    # update version in __init__.py
    kedro pipeline package newpipe
    
    cd proj2
    kedro pipeline pull http://localhost:8000/newpipe-0.2-py3-none-any.whl
    ❯ kedro pipeline pull http://localhost:8000/newpipe-0.2-py3-none-any.whl
    
    2021-08-28 12:11:02,473 - root - INFO - Registered CLI hooks from 1 installed plugin(s): kedro-telemetry-0.1.2
    2021-08-28 12:11:02,476 - kedro_telemetry.plugin - INFO - You have opted into product usage analytics.
    HTTPFileSystem requires "requests" and "aiohttp" to be installed
    Trying to use 'pip download'...
    /proj2/.venv/bin/python -m pip download --no-deps --dest /tmp/tmpucm6o5r2 http://localhost:8000/newpipe-0.2-py3-none-any.whl
    Collecting newpipe==0.2
      Downloading http://localhost:8000/newpipe-0.2-py3-none-any.whl (6.0 kB)
    Saved /tmp/tmpucm6o5r2/newpipe-0.2-py3-none-any.whl
    Successfully downloaded newpipe
    Creating `/proj2/conf/base/parameters`:
      Creating `/proj2/conf/base/parameters/newpipe.yml`: SKIPPED (already exists)
    Creating `/proj2/src/tests/pipelines/newpipe/__init__.py`: SKIPPED (already exists)
    Creating `/proj2/src/tests/pipelines/newpipe/test_pipeline.py`: SKIPPED (already exists)
    Creating `/proj2/src/proj2/pipelines/newpipe/nodes.py`: SKIPPED (already exists)
    Creating `/proj2/src/proj2/pipelines/newpipe/README.md`: SKIPPED (already exists)
    Creating `/proj2/src/proj2/pipelines/newpipe/__init__.py`: SKIPPED (already exists)
    Creating `/proj2/src/proj2/pipelines/newpipe/pipeline.py`: SKIPPED (already exists)
  • a

    Arnaldo

    08/30/2021, 1:44 PM
    you need to delete the files you want to update before running
    kedro pipeline pull
    , @User
  • a

    Arnaldo

    08/30/2021, 1:44 PM
    as they are skipped by default
  • a

    Arnaldo

    08/30/2021, 2:04 PM
    I think the rationale behind this is to not delete additional code you may have written
  • d

    Deutöic

    09/03/2021, 1:15 PM
    Hello guys! I introduced Kedro to a coworker and when he was testing, the command "kedro new --starter=pandas-iris" was not working. I also tried and got the same erros as it follows:
  • d

    Deutöic

    09/03/2021, 1:15 PM
    message has been deleted
  • d

    datajoely

    09/03/2021, 1:15 PM
    Hey @User is Git installed?
  • d

    datajoely

    09/03/2021, 1:21 PM
    And googling suggests that the certificates on your (corporate?) machines may be funky https://stackoverflow.com/questions/46887802/exit-status-128-while-cloning-the-git-repository-while-getting-go-package-from-g
  • e

    ende

    09/04/2021, 9:41 PM
    Is it generally recommended to group multiple different pipelines in the same repo ?
  • d

    datajoely

    09/05/2021, 11:19 AM
    Yes most projects use a monorepo set up. However our modular pipeline pattern allows you to package/push/pull pipelines between projects. https://kedro.readthedocs.io/en/stable/06_nodes_and_pipelines/03_modular_pipelines.html
  • m

    Malaguth

    09/05/2021, 11:59 PM
    Hello everyone. I'm having a problem saving a versioned spark.SparkDataSet. I'm saving a new dataset and receive the error message "kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet". (The code works if I change the type for pandas with a small part of the data) Does someone already had the same issue or know possible solutions?
  • d

    datajoely

    09/06/2021, 8:10 AM
    @User So this error is returned if Kedro can't find an older version of dataset
  • d

    datajoely

    09/06/2021, 8:10 AM
    Some questions (1) Are you able to write not read (2) Does it work with versioning disabled?
  • m

    Malaguth

    09/06/2021, 1:13 PM
    (1) I'm writing the dataset not reading (2) works with versioning disable
  • m

    Malaguth

    09/06/2021, 1:38 PM
    I didn't expect this error in a write operation. I already use other versioned datasets in the project, but this is the first SparkDataSet.
  • d

    datajoely

    09/06/2021, 1:38 PM
    Yeah it's a little weird
  • m

    Malaguth

    09/06/2021, 1:38 PM
    The other are pickle
  • d

    datajoely

    09/06/2021, 1:38 PM
    Is there any issue if we clear the directory created by the SparkVersionedDataset?
  • m

    Malaguth

    09/06/2021, 1:39 PM
    No problem
  • d

    datajoely

    09/06/2021, 1:40 PM
    then can we wipe that directory and see if that works?
  • m

    Malaguth

    09/06/2021, 1:48 PM
    Yes. I already wipe the directory and run with and without the versioned dataset. (1) With the versioned Spark Data Set, create the timestamp folder and written data on the folder but result in the exception (2) Without a versioned Spark Data Set, everything works as expected Versioned dataset: Alsformattedmodeloutput
    d
    a
    • 3
    • 12
Powered by Linen
Title
m

Malaguth

09/06/2021, 1:48 PM
Yes. I already wipe the directory and run with and without the versioned dataset. (1) With the versioned Spark Data Set, create the timestamp folder and written data on the folder but result in the exception (2) Without a versioned Spark Data Set, everything works as expected Versioned dataset: Alsformattedmodeloutput
u

user

09/07/2021, 12:25 PM
Hi @User could you paste your exception here please?
d

datajoely

09/07/2021, 1:51 PM
This was the original error
"kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet". (The code works if I change the type for pandas with a small part of the data)
m

Malaguth

09/07/2021, 1:54 PM
I'll run the pipeline and send the full exception
d

datajoely

09/07/2021, 1:54 PM
Thank you
m

Malaguth

09/07/2021, 2:11 PM
sh
2021-09-07 11:10:58,913 - kedro.runner.sequential_runner - WARNING - There are 1 nodes that have not run.
You can resume the pipeline run by adding the following argument to your previous command:
  --from-nodes "format_als_recomendations"
2021-09-07 11:10:59,449 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
Traceback (most recent call last):
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 265, in main
    cli_collection()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 210, in main
    super().main(
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/malaguth/Workspace/Git/recommendation-engine/src/martins_advisor/cli.py", line 160, in run
    session.run(
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/framework/session/session.py", line 408, in run
    run_result = runner.run(filtered_pipeline, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 106, in run
    self._run(pipeline, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/sequential_runner.py", line 90, in _run
    run_node(node, catalog, self._is_async, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 218, in run_node
    node = _run_node_sequential(node, catalog, run_id)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/runner/runner.py", line 313, in _run_node_sequential
    catalog.save(name, data)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/data_catalog.py", line 449, in save
    func(data)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 636, in save
    load_version = self.resolve_load_version()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 575, in resolve_load_version
    return self._fetch_latest_load_version()
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/cachetools/decorators.py", line 73, in wrapper
    v = method(self, *args, **kwargs)
  File "/home/malaguth/Workspace/Envs/venv-recsys-dev/lib/python3.8/site-packages/kedro/io/core.py", line 558, in _fetch_latest_load_version
    raise VersionNotFoundError(f"Did not find any versions for {self}")
kedro.io.core.VersionNotFoundError: Did not find any versions for SparkDataSet(file_format=csv, filepath=/mnt/advisor-dev/data/07_model_output/als/recommendation, load_args={}, save_args={}, version=Version(load=None, save='2021-09-07T14.07.50.215Z'))
Despite the error, the pipeline creates the files on blob storage
Let me know if I can help with anything
d

datajoely

09/08/2021, 8:16 AM
Thanks @User we're still looking into this
m

Malaguth

09/08/2021, 1:54 PM
Ok, thanks. Let me know if I can help
a

antony.milne

09/09/2021, 8:21 AM
Hi @User, please could you post the relevant part of your catalog.yml file and the
kedro run
command you're using? 🙂
m

Malaguth

09/11/2021, 9:12 PM
Hi @User, sorry for the delay. I simulate the error in a single node pipeline. Catalog:
yaml
_spark: &spark_parquet
  type: spark.SparkDataSet
  file_format: "parquet"

_spark_overwrite: &spark_overwrite
  <<: *spark_parquet
  save_args:
    mode: "overwrite"

SaleRefinedData:
  <<: *spark_overwrite
  filepath: ${spark_prefix}/data/03_refined/sale/
  layer: refined

SaleFeatureData:
  <<: *spark_parquet
  filepath: ${spark_prefix}/data/04_feature/sale/
  versioned: true
  layer: feature
Node:
python
def aggregate_sale_ratings(sdf: spark.DataFrame) -> spark.DataFrame:
    window = (
        Window
        .partitionBy(SaleDataModel.SKU, SaleDataModel.CLIENT_ID)
        .orderBy(SaleDataModel.DATE)
    )

    sdf = (
        sdf
        .withColumn(SaleFeatures.COUNT_RATING, F.dense_rank().over(window))
        .withColumn(SaleFeatures.BINARY_RATING, F.lit(1))
    )

    return sdf
Command:
bash
kedro run --pipeline sale_refined_to_feature --env dev
Note: I use Databricks Connect to run the pipeline remote on an Azure Databricks cluster.
Pipeline:
View count: 5