So, I'm using this file to use the same pipeline f...
# beginners-need-help
d
So, I'm using this file to use the same pipeline for train and test inputs. There's an error that pops up
Copy code
Error: Failed to map datasets and/or parameters: train
I don't know what to do on this front
d
Hi @User our modular pipeline docs are currently being overhauled so this should be better explained in the future (You can look at the two open PRs if you want to) The wrapper method (
from kedro.pipeline.modular_pipeline import pipeline
) is a bit tricky to get the hang of before it clicks. The
inputs
and
outputs
arguments are designed to be overrides which I think you've got the hang of when it comes to the
outputs
argument , however your inputs
train
and
test
aren't mapped to
data
So think you need to add
inputs = {"data": "train|test" }
I also have a fully representative project that takes advantage of modular pipelines here https://github.com/datajoely/modular-spaceflights
d
Hi @User , I just did that
And it works
Copy code
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline

from .nodes import (
    clean_column_names,
    clean_time_cols,
    clean_tertiary
)

def create_pipeline(**kwargs):
    cleaning_pipe = Pipeline(
        [
            node(
                func=clean_column_names,
                inputs="data",
                outputs="data_cols_cleaned",
                name="data_col_clean",
            ),
            node(
                func=clean_time_cols,
                inputs="data_cols_cleaned",
                outputs="data_time_cleaned",
                name="data_time_clean",
            ),
            node(
                func=clean_tertiary,
                inputs="data_time_cleaned",
                outputs="data_cleaned",
                name="data_tertiary_clean",
            ),
        ]
    )

    mod_pipe = Pipeline(
        [
            pipeline(
                pipe = cleaning_pipe,
                inputs = {"data": "train"},
                outputs = {"data_cleaned": "train_cleaned"},
                namespace = "train_pipe",
            ), 
            pipeline(
                pipe = cleaning_pipe,
                inputs = {"data": "test"},
                outputs = {"data_cleaned": "test_cleaned"},
                namespace = "test_pipe",
            )
        ]
    )

    return mod_pipe
d
also using using
kedro viz --autoreload
is a huge time saver when working with namesapces
it really brings it to life
🦾
d
I didn't know about this
d
It's new! hence us updating the docs
d
Does it automatically see the changes and upload it on the webpage?
d
yeah so it scans for changes to the codebase and refreshes when it sees it
d
This is great! I'm really excited πŸ™‚
d
Good luck!
d
Thanks @User 😁
I'm gonna be a regular here and I want to make use of this tool to it's maximum, so yeah see you around πŸ˜†
d
Modular pipelines are the feature I'm most excited about, but we've done a terrible job documenting and marketing them so far. The new Viz update really brings them to life so any feedback on what we could have done better is very much welcome.
d
@User Can namespaces be run individually? I'm asking this because in your example for modular-spaceflights you have used namespaces to shrink it down on one single block. I want to run individual pipelines inside these namespaces and don't know how to
d
No namespaces can't be run individually, out of the box. Two ways I would recommend - add tags to the nodes based on the namespace or register the modular pipeline
pipeline_registry.py
and then you can do
kedro run --pipeline {name}
d
What would be a wise project decision? Having multiple modular pipelines setup individually and then merge them all together in the pipeline_registry.py with namespaces or just tag individual nodes with tags whilst keeping the number of modular pipelines to a lesser count?
d
I think this is very much something for you to decide
the namespaces are independent of how the pipelines get executed
they are just to organise your work
if you look at my demo project...
kedro run --pipeline "Modelling stage"
would run the namespaces below
Copy code
train_evaluation
|_ random_forest
|_ linear_regression
so in this example the top level namespace is equivalent to the registered "Modelling stage" pipeline but serve different purposes
We could also look to add namespaces to the run arguments in the future, this is the first time someone has asked for it
In general we're also looking to overhaul the syntax for how the run command works, so this is useful
d
This is because I'm thinking of creating a project with kind of independent reconfigurable elements and this would definitely help. As of now, the tag argument helps to tackle the problem. Also, glad I could help 😁
@User If my modular pipeline has 2 inputs then how can I map it to the inputs? I am using
Copy code
inputs = {"data": "train", "table_name": "train"}
It gives the following error
Copy code
kedro.framework.cli.utils.KedroCliError: 'list' object has no attribute 'data_sets'
This is the code
d
Hi @User there are a couple of things wrong with your pipeline code, but I'm also not sure where that error is being called from
so first and most important thing - you don't need to retrieve your paramters like tht
to use the
tables
key you simply provide
params:tables
as an argument to an
input
argument
regarding the CLI error - could you post more of the stack trace as I'm not sure what is causing that error
d
So to give you a background, I am trying to generate a profile of the data using Pandas Profiling in a Pipeline For this, I have 2 datasets: 1. Train 2. Test For future purposes it can have n number of datasets. But for now we're sticking to these two The list of tables available can be found in this file which is situated in
Copy code
conf/base/parameters/raw_data_load.yml
file
This file got generated when I created raw_data_load modular pipeline The aim of this pipeline was to iterate through the tables available in
Copy code
conf/base/parameters/raw_data_load.yml
file and then execute the pipeline For this, I have the following files
Copy code
node.py
and
Copy code
pipeline.py
but for some reason in my code it is not taking train as a string value to name the file and it is causing issues
d
Okay I would achieve this another way
do you want to profile every dataset or is it just specific ones?
d
All the ones present in the parameters file. For now what is happening is, since train and test are declared in the catalog.yml file
Copy code
create_descriptives([train,train]) -> json_train
is shown as the node output, meaning that I am unable to name the file as
Copy code
profile.to_file(f"data/descriptives/{table_name}.html")
because table_name here is a dataframe for some reason and not a string
d
Okay - we don't encourage you using parameters outside of the default pattern i.e. you shouldn't declare your own config loader outside of the one that exists behind the scenes
so I would do this two different ways
1. You could create a one node
pandas_profiling
modular pipeline that you keep declaring every time you want to profile a dataset. I this scenario you declare the list of catalog entries you want to profile in python not YAML and create a list from there.
Copy code
python
complete_profiling_pipeline = sum([pipeline(profiler, inputs=ds, outputs=ds+"_profiled") for ds in ["ds1", "ds2", "ds3"]])
2. You use Kedro's lifecycle hooks to do your profiling
d
Okay, I'll look into this
d
yeah it's a subtle difference, but we feel dynamic pipelines get a bit messy
d
My aim with this particular project was to have the yml files editable by the non tech folks and not the source code. Do you think that there's some other way?
d
As in they decide which datasets get profiled?
and do you want them to run the profiling on demand or as part of a regular run?
d
I want to run this particular modular pipeline once they update the raw_data_load.yml file
You mentioned that I should not use the ConfigLoader in my code but if I don't use that then how can I iterate through the entries available in the
Copy code
raw_data_load.yml
file? This piece of code
d
Okay maybe I'd suggest a slight tweak then
d
Copy code
table_pipelines = [
        pipeline(
            pipe = base_pipeline(),
            inputs = {
                "data": f"{table_name}", 
                "table_name": str(f"{table_name}"),
            },
            outputs = {
                "json_data": f"json_{table_name}"
            },
            namespace = f"{table_name}",
        )
        for table_name in sql_data
    ]
d
Okay let's try and get your original approach working
could you post the full stack trace?
d
Should I share a Github repo directly with you?
That would be much better to replicate the stack trace
d
Just paste the stack trace that produced the
kedro.framework.cli.utils.KedroCliError: 'list' object has no attribute 'data_sets'
error?
because I'm not sure why that's being called
d
This is done
d
Is there no more?
d
I'll share the repo in 10 mins. Let me just push that, you'll understand it in a much better way
d
ok
d
so first suggestion you don't have to do read/write with pandas profiling
you could simply return a Python Dictionary from this function
and save via the kedro
json.JSONDataSet
which is doing the sampe thing
d
I am planning to do that. But the html file is important to be saved
Since the code doesn't execute beyond that point
d
you can apparently do the
profile.json
and
profile.html
accessors
and return both python objects
and write the HTML as
text.TextDataSet
the other question is when you run the pipeline
could you post the full errro
d
Here you go
d
> No such file or directory: 'config_minimal.yml
> No such file or directory: 'data/descriptives/
I think your filepaths are at the wrong working directory
you can breakpoint and work out where you are
d
If you see the create_descriptives function it expects one dataframe and the other one as a string variable, as of now kedro is passing both of these as a catalog entry
d
ah I see what you want
so there is a hacky way of donig this - but you're going quite far off piste
d
If you check the io.catalog too there train is loaded 2 times as well
I just want to pass the string value
d
Also how do i replace the sql_data with params, because that's where my input tables are
d
so you can do something called the partial application of a node
but I want to be clear you're not using Kedro in the way we encourage you to do it
I would look at using hooks as you can access the catalog dynamically there
rather than trying to build a pipeline
d
Okay πŸ˜” . I'm trying to take inspiration from the modular-spaceflights code and I'm trying to write in that way
Okay
d
No worries! It's a tricky one, we really encourage your inputs and outputs to live in Python
d
But yeah, thanks for your help @User 😁
d
there is a subtle distinction between what should be static and what is configuration
there reason we're so annoying about this is that we believe it's a lot more maintianable and readable in the long run
d
@User I can't really understand how to go about this problem. I want to just write a list of tables that I want to extract from the SQL database and then run a profiler to save the html profiles and the data in PKL format inside the data/02_intermediate folder. I am attaching the flowchart for reference. If you could help me with this it would set up a big foundation for the project that I am currently working on. I really want this to be a modular pipeline based on the project structure and I have no proper experience. I'd really appreciate if you could help me with this
All i want to do is load the list of tables to be saved from the sql server and then save them with their filename which I get from the yml file. That's itπŸ˜…
d
Okay @User Step 1. Update
create_descriptives
so that it returns two outputs: -
profile.to_html()
->
str
- SQL data ->
pd.DataFrame
Step 2. Wrap that in a regular
Pipeline
object
Step 3. Load from configuration the catalog entries you want to profile (somewhere in your
create_pipeline()
method
Step 4. Loop over the results of step 3 and modular instances of Step 2 using
pipeline()
wrapper method overriding the correct inputs and outputs
You're already doing step 4 correctly, I think your current issues are in the actual node you've defined in your existing step 1 and 2
One question is the PKL file the SQL data or the JSON profiling data?
@User I actually sat down and had a go fixing your project
here is the PR
I'd say that the
after_catalog_created
hook I've put in there is not best practice, but would work for your purposes
j
@Dhaval @datajoely thank you for having this discussion. Very helpful
d
@datajoely Thanks a lot for this. It's 2am here as of now, I'll go through this in the morning tomorrow . Also, another thing, i was able to achieve this on my own too, I'll share the nodes.py and pipeline.py file with you as well. Based on your inputs for best practices and the way Kedro works I've found a different solution. Let me know if that's works too. And again, thanks for your help @datajoely
j
This would be an awesome example to share via the docs
d
Yeah I’m in two minds if the hook implementation is good practice or not
On one hand it’s neat, but it’s less explicit than we usually like
d
@User I have added the PR here for my version here: https://github.com/DhavalThkkar/test_project/pull/2 I've gone through your implementation and it is pretty neat. Got to learn a lot about the Hook functionality based on your code. I'd really appreciate if you could go through my PR and share your feedback Also, I have highlighted one weird error that I faced in the
Copy code
pipelines.py
file Let me know if you have any questions. Looking forward to your response
u
@User I took another approach to dynamic pipelines that I've found useful. Adding catalog entries via hooks added a bit of friction to my workflow and I've found adding the following to be more user-friendly: 1. Create a task that runs before any pipeline. The task runs a file called
create-catalog.py
2. This task replaces the bottom section of my catalog file with the current information that I list out in conf/base/parameters
d
@User Wouldn't you need to reinitialise the catalog files? I am guessing all of the catalog entries get initialised while
Copy code
kedro run
command is used. So how do you tackle that?
u
Exactly
u
You make a preLaunchTask that updates the catalog
u
preLaunchTask is attached with kedro run
u
Which means it runs your catalog builder task before kedro run starts
u
which means that when kedro run happens, it uses the updated catalog files. the updated catalog files all exist on your machine before kedro run even starts basically
d
@User ,can you share a repo with this use case, it'll be easier for me to replicate
u
I am unable to share my repo directly but happy to answer more questions
d
Where do I create these files. I am unable to understand that
u
Are you using Visual Studio Code?
u
If so, then you can open tasks and launch using Command + Shift + P then typing what you need
u
tasks
u
launch
u
Which files are you referring to when you asked "Where do I create these files?"
d
Now see, as per what you have mentioned, you said that you can create these catalog entries on the fly while the pipelines are being run. As of now, I am doing this in the nodes.py file of a pipeline
Copy code
def create_descriptives(
    data: pd.DataFrame, 
    parameters: Dict
): 
    file_name = parameters["file_name"]

    # Create path for saving descriptives
    path = create_folder(file_type="data", folder = "descriptives")

    # Create the profiling report
    profile = ProfileReport(
        data, title=f"{file_name} Profiling Report", 
        config_file="config_minimal.yml"
    )

    # Save the report as an HTML file and JSON for further usage
    profile.to_file(f"{path}/{file_name}.html") 
    
    json_data = json.loads(profile.to_json())

    with open(f"./data/02_intermediate/{file_name}.json", "w") as file:
        json.dump(json_data, file)

    data.to_pickle(f"./data/02_intermediate/{file_name}.pkl")

    return json_data, file_name
The other approach that datajoely gave was of hooks but it wasn't that useable for my usecase. Your idea seems to go inline with what I am trying to achieve but I just can't understand the flow of things that you've mentioned. Hence, I asked for an example πŸ˜…
u
I did NOT say that "you can create these catalog entries on the fly while the pipelines are being run". I said that you can run a script before every run that updates your catalog file.
u
The caps on NOT is for emphasis, not meaning to be shouting etc
d
Oh, okay. Got it πŸ˜†
u
So what are you looping through, file_names?
d
Yes, it is a list of tables that are present on the database
u
Basically what I do is, I save the list of the things that I need to loop through in a file in conf/base/parameters/tables.yml
u
Let's say you have a list of tables called: [users, events, activity]
u
I would create a script that loops through
conf/base/parameters/tables.yml
and writes out all the steps for my catalog.yml to have the following files:
u
at the bottom of `catalog.yml`:
u
I would generate entries like this
u
Copy code
#################### AUTOGENERATED #################
table_profile_results_for_users:
  filepath: data/02_intermediate/users.json
  type: json.JSONDataSet
table_profile_pickle_for_users:
  filepath: data/02_intermediate/users.pkl
  type: python.PickleDataSet
table_profile_results_for_events:
  filepath: data/02_intermediate/events.json
  type: json.JSONDataSet
table_profile_pickle_for_events:
  filepath: data/02_intermediate/events.pkl
  type: python.PickleDataSet
table_profile_results_for_activity:
  filepath: data/02_intermediate/activity.json
  type: json.JSONDataSet
table_profile_pickle_for_activity:
  filepath: data/02_intermediate/activity.pkl
  type: python.PickleDataSet
u
then in my create_pipeline i would loop through the same
conf/base/parameters/tables.yml
data, and for each table I would run a node with the inputs and outputs as follows
u
Copy code
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
import pandas as pd


def create_descriptives(
    table_data: pd.DataFrame,
):
    # Create the profiling report
    profile = ProfileReport(table_data)

    return profile.to_json(), profile.to_json().to_pickle()


def create_pipeline():
    ##
    # Loop through users, events, activity
    db_tables = []  # fill this by looping through your parameters file

    pipeline_nodes = []
    for table in db_tables:
        pipeline_nodes += [
            node(
                func=create_descriptives,
                inputs={"data": f"data_for_db_table_{table}"},
                outputs=[
                    f"table_profile_results_for_{table}",
                    f"table_profile_pickle_for_{table}",
                ],
            )
        ]
    return pipeline(sum(pipeline_nodes))
u
I'm not exactly sure what inputs you're using as the input for this function
u
but this is essentially what I have been doing
u
I have a script that dynamically adds to the end of my catalog.yml
u
and then can create the nodes based on my parameters file
71 Views