Title
#beginners-need-help
d

Dhaval

12/22/2021, 10:03 AM
So, I'm using this file to use the same pipeline for train and test inputs. There's an error that pops up
Error: Failed to map datasets and/or parameters: train
I don't know what to do on this front
datajoely

datajoely

12/22/2021, 11:33 AM
Hi @User our modular pipeline docs are currently being overhauled so this should be better explained in the future (You can look at the two open PRs if you want to) The wrapper method (
from kedro.pipeline.modular_pipeline import pipeline
) is a bit tricky to get the hang of before it clicks. The
inputs
and
outputs
arguments are designed to be overrides which I think you've got the hang of when it comes to the
outputs
argument , however your inputs
train
and
test
aren't mapped to
data
So think you need to add
inputs = {"data": "train|test" }
I also have a fully representative project that takes advantage of modular pipelines here https://github.com/datajoely/modular-spaceflights
d

Dhaval

12/22/2021, 11:33 AM
Hi @User , I just did that
11:33 AM
And it works
11:33 AM
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline

from .nodes import (
    clean_column_names,
    clean_time_cols,
    clean_tertiary
)

def create_pipeline(**kwargs):
    cleaning_pipe = Pipeline(
        [
            node(
                func=clean_column_names,
                inputs="data",
                outputs="data_cols_cleaned",
                name="data_col_clean",
            ),
            node(
                func=clean_time_cols,
                inputs="data_cols_cleaned",
                outputs="data_time_cleaned",
                name="data_time_clean",
            ),
            node(
                func=clean_tertiary,
                inputs="data_time_cleaned",
                outputs="data_cleaned",
                name="data_tertiary_clean",
            ),
        ]
    )

    mod_pipe = Pipeline(
        [
            pipeline(
                pipe = cleaning_pipe,
                inputs = {"data": "train"},
                outputs = {"data_cleaned": "train_cleaned"},
                namespace = "train_pipe",
            ), 
            pipeline(
                pipe = cleaning_pipe,
                inputs = {"data": "test"},
                outputs = {"data_cleaned": "test_cleaned"},
                namespace = "test_pipe",
            )
        ]
    )

    return mod_pipe
datajoely

datajoely

12/22/2021, 11:33 AM
also using using
kedro viz --autoreload
is a huge time saver when working with namesapces
11:33 AM
it really brings it to life
11:33 AM
🦾
d

Dhaval

12/22/2021, 11:33 AM
I didn't know about this
datajoely

datajoely

12/22/2021, 11:34 AM
It's new! hence us updating the docs
d

Dhaval

12/22/2021, 11:34 AM
Does it automatically see the changes and upload it on the webpage?
datajoely

datajoely

12/22/2021, 11:34 AM
yeah so it scans for changes to the codebase and refreshes when it sees it
d

Dhaval

12/22/2021, 11:34 AM
This is great! I'm really excited πŸ™‚
datajoely

datajoely

12/22/2021, 11:34 AM
Good luck!
d

Dhaval

12/22/2021, 11:35 AM
Thanks @User 😁
11:36 AM
I'm gonna be a regular here and I want to make use of this tool to it's maximum, so yeah see you around πŸ˜†
datajoely

datajoely

12/22/2021, 11:37 AM
Modular pipelines are the feature I'm most excited about, but we've done a terrible job documenting and marketing them so far. The new Viz update really brings them to life so any feedback on what we could have done better is very much welcome.
d

Dhaval

12/23/2021, 7:53 AM
@User Can namespaces be run individually? I'm asking this because in your example for modular-spaceflights you have used namespaces to shrink it down on one single block. I want to run individual pipelines inside these namespaces and don't know how to
datajoely

datajoely

12/23/2021, 12:00 PM
No namespaces can't be run individually, out of the box. Two ways I would recommend - add tags to the nodes based on the namespace or register the modular pipeline
pipeline_registry.py
and then you can do
kedro run --pipeline {name}
d

Dhaval

12/23/2021, 3:26 PM
What would be a wise project decision? Having multiple modular pipelines setup individually and then merge them all together in the pipeline_registry.py with namespaces or just tag individual nodes with tags whilst keeping the number of modular pipelines to a lesser count?
datajoely

datajoely

12/23/2021, 4:00 PM
I think this is very much something for you to decide
4:00 PM
the namespaces are independent of how the pipelines get executed
4:01 PM
they are just to organise your work
4:03 PM
if you look at my demo project...
kedro run --pipeline "Modelling stage"
would run the namespaces below
train_evaluation
|_ random_forest
|_ linear_regression
4:04 PM
so in this example the top level namespace is equivalent to the registered "Modelling stage" pipeline but serve different purposes
4:04 PM
We could also look to add namespaces to the run arguments in the future, this is the first time someone has asked for it
4:04 PM
In general we're also looking to overhaul the syntax for how the run command works, so this is useful
d

Dhaval

12/23/2021, 5:20 PM
This is because I'm thinking of creating a project with kind of independent reconfigurable elements and this would definitely help. As of now, the tag argument helps to tackle the problem. Also, glad I could help 😁
9:44 AM
@User If my modular pipeline has 2 inputs then how can I map it to the inputs? I am using
inputs = {"data": "train", "table_name": "train"}
It gives the following error
kedro.framework.cli.utils.KedroCliError: 'list' object has no attribute 'data_sets'
9:45 AM
This is the code
datajoely

datajoely

12/29/2021, 11:18 AM
Hi @User there are a couple of things wrong with your pipeline code, but I'm also not sure where that error is being called from
11:18 AM
so first and most important thing - you don't need to retrieve your paramters like tht
11:19 AM
to use the
tables
key you simply provide
params:tables
as an argument to an
input
argument
11:20 AM
regarding the CLI error - could you post more of the stack trace as I'm not sure what is causing that error
d

Dhaval

12/29/2021, 11:32 AM
So to give you a background, I am trying to generate a profile of the data using Pandas Profiling in a Pipeline For this, I have 2 datasets:1. Train 2. Test For future purposes it can have n number of datasets. But for now we're sticking to these two The list of tables available can be found in this file which is situated in
conf/base/parameters/raw_data_load.yml
file
11:33 AM
This file got generated when I created raw_data_load modular pipeline The aim of this pipeline was to iterate through the tables available in
conf/base/parameters/raw_data_load.yml
file and then execute the pipeline For this, I have the following files
node.py
and
pipeline.py
but for some reason in my code it is not taking train as a string value to name the file and it is causing issues
datajoely

datajoely

12/29/2021, 11:34 AM
Okay I would achieve this another way
11:34 AM
do you want to profile every dataset or is it just specific ones?
d

Dhaval

12/29/2021, 11:37 AM
All the ones present in the parameters file. For now what is happening is, since train and test are declared in the catalog.yml file
create_descriptives([train,train]) -> json_train
is shown as the node output, meaning that I am unable to name the file as
profile.to_file(f"data/descriptives/{table_name}.html")
because table_name here is a dataframe for some reason and not a string
datajoely

datajoely

12/29/2021, 11:38 AM
Okay - we don't encourage you using parameters outside of the default pattern i.e. you shouldn't declare your own config loader outside of the one that exists behind the scenes
11:38 AM
so I would do this two different ways
11:41 AM
1. You could create a one node
pandas_profiling
modular pipeline that you keep declaring every time you want to profile a dataset. I this scenario you declare the list of catalog entries you want to profile in python not YAML and create a list from there.
python
complete_profiling_pipeline = sum([pipeline(profiler, inputs=ds, outputs=ds+"_profiled") for ds in ["ds1", "ds2", "ds3"]])
11:42 AM
2. You use Kedro's lifecycle hooks to do your profiling
d

Dhaval

12/29/2021, 11:44 AM
Okay, I'll look into this
datajoely

datajoely

12/29/2021, 11:45 AM
yeah it's a subtle difference, but we feel dynamic pipelines get a bit messy
d

Dhaval

12/29/2021, 11:45 AM
My aim with this particular project was to have the yml files editable by the non tech folks and not the source code. Do you think that there's some other way?
datajoely

datajoely

12/29/2021, 11:46 AM
As in they decide which datasets get profiled?
11:46 AM
and do you want them to run the profiling on demand or as part of a regular run?
d

Dhaval

12/29/2021, 11:47 AM
I want to run this particular modular pipeline once they update the raw_data_load.yml file
11:48 AM
You mentioned that I should not use the ConfigLoader in my code but if I don't use that then how can I iterate through the entries available in the
raw_data_load.yml
file? This piece of code
datajoely

datajoely

12/29/2021, 11:48 AM
Okay maybe I'd suggest a slight tweak then
d

Dhaval

12/29/2021, 11:48 AM
table_pipelines = [
        pipeline(
            pipe = base_pipeline(),
            inputs = {
                "data": f"{table_name}", 
                "table_name": str(f"{table_name}"),
            },
            outputs = {
                "json_data": f"json_{table_name}"
            },
            namespace = f"{table_name}",
        )
        for table_name in sql_data
    ]
datajoely

datajoely

12/29/2021, 11:49 AM
Okay let's try and get your original approach working
11:49 AM
could you post the full stack trace?
d

Dhaval

12/29/2021, 11:49 AM
Should I share a Github repo directly with you?
11:50 AM
That would be much better to replicate the stack trace
datajoely

datajoely

12/29/2021, 11:50 AM
Just paste the stack trace that produced the
kedro.framework.cli.utils.KedroCliError: 'list' object has no attribute 'data_sets'
error?
11:50 AM
because I'm not sure why that's being called
d

Dhaval

12/29/2021, 11:50 AM
This is done
datajoely

datajoely

12/29/2021, 11:50 AM
Is there no more?
d

Dhaval

12/29/2021, 11:51 AM
I'll share the repo in 10 mins. Let me just push that, you'll understand it in a much better way
datajoely

datajoely

12/29/2021, 11:51 AM
ok
datajoely

datajoely

12/29/2021, 12:05 PM
so first suggestion you don't have to do read/write with pandas profiling
12:05 PM
you could simply return a Python Dictionary from this function
12:06 PM
and save via the kedro
json.JSONDataSet
which is doing the sampe thing
d

Dhaval

12/29/2021, 12:06 PM
I am planning to do that. But the html file is important to be saved
12:07 PM
Since the code doesn't execute beyond that point
datajoely

datajoely

12/29/2021, 12:07 PM
you can apparently do the
profile.json
and
profile.html
accessors
12:07 PM
and return both python objects
12:07 PM
and write the HTML as
text.TextDataSet
12:07 PM
the other question is when you run the pipeline
12:08 PM
could you post the full errro
d

Dhaval

12/29/2021, 12:09 PM
Here you go
datajoely

datajoely

12/29/2021, 12:09 PM
No such file or directory: 'config_minimal.yml
12:10 PM
No such file or directory: 'data/descriptives/
12:10 PM
I think your filepaths are at the wrong working directory
12:10 PM
you can breakpoint and work out where you are
d

Dhaval

12/29/2021, 12:10 PM
If you see the create_descriptives function it expects one dataframe and the other one as a string variable, as of now kedro is passing both of these as a catalog entry
datajoely

datajoely

12/29/2021, 12:11 PM
ah I see what you want
12:11 PM
so there is a hacky way of donig this - but you're going quite far off piste
d

Dhaval

12/29/2021, 12:11 PM
If you check the io.catalog too there train is loaded 2 times as well
12:11 PM
I just want to pass the string value
d

Dhaval

12/29/2021, 12:12 PM
Also how do i replace the sql_data with params, because that's where my input tables are
datajoely

datajoely

12/29/2021, 12:12 PM
so you can do something called the partial application of a node
12:12 PM
but I want to be clear you're not using Kedro in the way we encourage you to do it
12:13 PM
I would look at using hooks as you can access the catalog dynamically there
12:13 PM
rather than trying to build a pipeline
d

Dhaval

12/29/2021, 12:14 PM
Okay πŸ˜” . I'm trying to take inspiration from the modular-spaceflights code and I'm trying to write in that way
12:14 PM
Okay
datajoely

datajoely

12/29/2021, 12:15 PM
No worries! It's a tricky one, we really encourage your inputs and outputs to live in Python
d

Dhaval

12/29/2021, 12:15 PM
But yeah, thanks for your help @User 😁
datajoely

datajoely

12/29/2021, 12:15 PM
there is a subtle distinction between what should be static and what is configuration
12:16 PM
there reason we're so annoying about this is that we believe it's a lot more maintianable and readable in the long run
d

Dhaval

12/29/2021, 2:49 PM
@User I can't really understand how to go about this problem. I want to just write a list of tables that I want to extract from the SQL database and then run a profiler to save the html profiles and the data in PKL format inside the data/02_intermediate folder. I am attaching the flowchart for reference. If you could help me with this it would set up a big foundation for the project that I am currently working on. I really want this to be a modular pipeline based on the project structure and I have no proper experience. I'd really appreciate if you could help me with this
2:59 PM
All i want to do is load the list of tables to be saved from the sql server and then save them with their filename which I get from the yml file. That's itπŸ˜…
datajoely

datajoely

12/29/2021, 5:36 PM
Okay @User Step 1. Update
create_descriptives
so that it returns two outputs: -
profile.to_html()
->
str
- SQL data ->
pd.DataFrame
5:37 PM
Step 2. Wrap that in a regular
Pipeline
object
5:37 PM
Step 3. Load from configuration the catalog entries you want to profile (somewhere in your
create_pipeline()
method
5:39 PM
Step 4. Loop over the results of step 3 and modular instances of Step 2 using
pipeline()
wrapper method overriding the correct inputs and outputs
5:40 PM
You're already doing step 4 correctly, I think your current issues are in the actual node you've defined in your existing step 1 and 2
5:41 PM
One question is the PKL file the SQL data or the JSON profiling data?
6:31 PM
@User I actually sat down and had a go fixing your project
6:31 PM
here is the PR
6:31 PM
I'd say that the
after_catalog_created
hook I've put in there is not best practice, but would work for your purposes
j c h a r l e s

j c h a r l e s

12/29/2021, 8:10 PM
@Dhaval @datajoely thank you for having this discussion. Very helpful
d

Dhaval

12/29/2021, 8:21 PM
@datajoely Thanks a lot for this. It's 2am here as of now, I'll go through this in the morning tomorrow . Also, another thing, i was able to achieve this on my own too, I'll share the nodes.py and pipeline.py file with you as well. Based on your inputs for best practices and the way Kedro works I've found a different solution. Let me know if that's works too. And again, thanks for your help @datajoely
j c h a r l e s

j c h a r l e s

12/29/2021, 9:11 PM
This would be an awesome example to share via the docs
datajoely

datajoely

12/29/2021, 10:18 PM
Yeah I’m in two minds if the hook implementation is good practice or not
10:19 PM
On one hand it’s neat, but it’s less explicit than we usually like
d

Dhaval

12/30/2021, 7:49 AM
@User I have added the PR here for my version here: https://github.com/DhavalThkkar/test_project/pull/2 I've gone through your implementation and it is pretty neat. Got to learn a lot about the Hook functionality based on your code. I'd really appreciate if you could go through my PR and share your feedback Also, I have highlighted one weird error that I faced in the
pipelines.py
file Let me know if you have any questions. Looking forward to your response
u

user

01/04/2022, 2:35 AM
@User I took another approach to dynamic pipelines that I've found useful. Adding catalog entries via hooks added a bit of friction to my workflow and I've found adding the following to be more user-friendly:1. Create a task that runs before any pipeline. The task runs a file called
create-catalog.py
2. This task replaces the bottom section of my catalog file with the current information that I list out in conf/base/parameters
2:36 AM
message has been deleted
d

Dhaval

01/04/2022, 7:06 AM
@User Wouldn't you need to reinitialise the catalog files? I am guessing all of the catalog entries get initialised while
kedro run
command is used. So how do you tackle that?
u

user

01/04/2022, 7:17 AM
Exactly
7:17 AM
You make a preLaunchTask that updates the catalog
7:17 AM
preLaunchTask is attached with kedro run
7:17 AM
Which means it runs your catalog builder task before kedro run starts
7:19 AM
which means that when kedro run happens, it uses the updated catalog files. the updated catalog files all exist on your machine before kedro run even starts basically
d

Dhaval

01/04/2022, 8:00 AM
@User ,can you share a repo with this use case, it'll be easier for me to replicate
u

user

01/04/2022, 8:01 AM
I am unable to share my repo directly but happy to answer more questions
d

Dhaval

01/04/2022, 8:11 AM
Where do I create these files. I am unable to understand that
u

user

01/04/2022, 8:13 AM
Are you using Visual Studio Code?
8:14 AM
If so, then you can open tasks and launch using Command + Shift + P then typing what you need
8:14 AM
tasks
8:15 AM
launch
8:17 AM
Which files are you referring to when you asked "Where do I create these files?"
d

Dhaval

01/04/2022, 8:18 AM
Now see, as per what you have mentioned, you said that you can create these catalog entries on the fly while the pipelines are being run. As of now, I am doing this in the nodes.py file of a pipeline
def create_descriptives(
    data: pd.DataFrame, 
    parameters: Dict
): 
    file_name = parameters["file_name"]

    # Create path for saving descriptives
    path = create_folder(file_type="data", folder = "descriptives")

    # Create the profiling report
    profile = ProfileReport(
        data, title=f"{file_name} Profiling Report", 
        config_file="config_minimal.yml"
    )

    # Save the report as an HTML file and JSON for further usage
    profile.to_file(f"{path}/{file_name}.html") 
    
    json_data = json.loads(profile.to_json())

    with open(f"./data/02_intermediate/{file_name}.json", "w") as file:
        json.dump(json_data, file)

    data.to_pickle(f"./data/02_intermediate/{file_name}.pkl")

    return json_data, file_name
8:20 AM
The other approach that datajoely gave was of hooks but it wasn't that useable for my usecase. Your idea seems to go inline with what I am trying to achieve but I just can't understand the flow of things that you've mentioned. Hence, I asked for an example πŸ˜…
u

user

01/04/2022, 8:20 AM
I did NOT say that "you can create these catalog entries on the fly while the pipelines are being run". I said that you can run a script before every run that updates your catalog file.
8:21 AM
The caps on NOT is for emphasis, not meaning to be shouting etc
d

Dhaval

01/04/2022, 8:21 AM
Oh, okay. Got it πŸ˜†
u

user

01/04/2022, 8:22 AM
So what are you looping through, file_names?
d

Dhaval

01/04/2022, 8:23 AM
Yes, it is a list of tables that are present on the database
u

user

01/04/2022, 8:23 AM
Basically what I do is, I save the list of the things that I need to loop through in a file in conf/base/parameters/tables.yml
8:23 AM
Let's say you have a list of tables called: [users, events, activity]
8:24 AM
I would create a script that loops through
conf/base/parameters/tables.yml
and writes out all the steps for my catalog.yml to have the following files:
8:25 AM
at the bottom of catalog.yml:
8:25 AM
I would generate entries like this
8:27 AM
#################### AUTOGENERATED #################
table_profile_results_for_users:
  filepath: data/02_intermediate/users.json
  type: json.JSONDataSet
table_profile_pickle_for_users:
  filepath: data/02_intermediate/users.pkl
  type: python.PickleDataSet
table_profile_results_for_events:
  filepath: data/02_intermediate/events.json
  type: json.JSONDataSet
table_profile_pickle_for_events:
  filepath: data/02_intermediate/events.pkl
  type: python.PickleDataSet
table_profile_results_for_activity:
  filepath: data/02_intermediate/activity.json
  type: json.JSONDataSet
table_profile_pickle_for_activity:
  filepath: data/02_intermediate/activity.pkl
  type: python.PickleDataSet
8:28 AM
then in my create_pipeline i would loop through the same
conf/base/parameters/tables.yml
data, and for each table I would run a node with the inputs and outputs as follows
8:37 AM
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
import pandas as pd


def create_descriptives(
    table_data: pd.DataFrame,
):
    # Create the profiling report
    profile = ProfileReport(table_data)

    return profile.to_json(), profile.to_json().to_pickle()


def create_pipeline():
    ##
    # Loop through users, events, activity
    db_tables = []  # fill this by looping through your parameters file

    pipeline_nodes = []
    for table in db_tables:
        pipeline_nodes += [
            node(
                func=create_descriptives,
                inputs={"data": f"data_for_db_table_{table}"},
                outputs=[
                    f"table_profile_results_for_{table}",
                    f"table_profile_pickle_for_{table}",
                ],
            )
        ]
    return pipeline(sum(pipeline_nodes))
8:38 AM
I'm not exactly sure what inputs you're using as the input for this function
8:38 AM
but this is essentially what I have been doing
8:38 AM
I have a script that dynamically adds to the end of my catalog.yml
8:39 AM
and then can create the nodes based on my parameters file