Dhaval
12/22/2021, 10:03 AMError: Failed to map datasets and/or parameters: train
I don't know what to do on this frontdatajoely
12/22/2021, 11:33 AMfrom kedro.pipeline.modular_pipeline import pipeline
) is a bit tricky to get the hang of before it clicks.
The inputs
and outputs
arguments are designed to be overrides which I think you've got the hang of when it comes to the outputs
argument , however your inputs train
and test
aren't mapped to data
So think you need to add inputs = {"data": "train|test" }
I also have a fully representative project that takes advantage of modular pipelines here
https://github.com/datajoely/modular-spaceflightsDhaval
12/22/2021, 11:33 AMfrom kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline
from .nodes import (
clean_column_names,
clean_time_cols,
clean_tertiary
)
def create_pipeline(**kwargs):
cleaning_pipe = Pipeline(
[
node(
func=clean_column_names,
inputs="data",
outputs="data_cols_cleaned",
name="data_col_clean",
),
node(
func=clean_time_cols,
inputs="data_cols_cleaned",
outputs="data_time_cleaned",
name="data_time_clean",
),
node(
func=clean_tertiary,
inputs="data_time_cleaned",
outputs="data_cleaned",
name="data_tertiary_clean",
),
]
)
mod_pipe = Pipeline(
[
pipeline(
pipe = cleaning_pipe,
inputs = {"data": "train"},
outputs = {"data_cleaned": "train_cleaned"},
namespace = "train_pipe",
),
pipeline(
pipe = cleaning_pipe,
inputs = {"data": "test"},
outputs = {"data_cleaned": "test_cleaned"},
namespace = "test_pipe",
)
]
)
return mod_pipe
datajoely
12/22/2021, 11:33 AMkedro viz --autoreload
is a huge time saver when working with namesapcesDhaval
12/22/2021, 11:33 AMdatajoely
12/22/2021, 11:34 AMDhaval
12/22/2021, 11:34 AMdatajoely
12/22/2021, 11:34 AMDhaval
12/22/2021, 11:34 AMdatajoely
12/22/2021, 11:34 AMDhaval
12/22/2021, 11:35 AMdatajoely
12/22/2021, 11:37 AMDhaval
12/23/2021, 7:53 AMdatajoely
12/23/2021, 12:00 PMpipeline_registry.py
and then you can do kedro run --pipeline {name}
Dhaval
12/23/2021, 3:26 PMdatajoely
12/23/2021, 4:00 PMkedro run --pipeline "Modelling stage"
would run the namespaces below
train_evaluation
|_ random_forest
|_ linear_regression
Dhaval
12/23/2021, 5:20 PMinputs = {"data": "train", "table_name": "train"}
It gives the following error kedro.framework.cli.utils.KedroCliError: 'list' object has no attribute 'data_sets'
datajoely
12/29/2021, 11:18 AMtables
key you simply provide params:tables
as an argument to an input
argumentDhaval
12/29/2021, 11:32 AMconf/base/parameters/raw_data_load.yml
fileconf/base/parameters/raw_data_load.yml
file and then execute the pipeline
For this, I have the following files node.py
and pipeline.py
but for some reason in my code it is not taking train as a string value to name the file and it is causing issuesdatajoely
12/29/2021, 11:34 AMDhaval
12/29/2021, 11:37 AMcreate_descriptives([train,train]) -> json_train
is shown as the node output, meaning that I am unable to name the file as profile.to_file(f"data/descriptives/{table_name}.html")
because table_name here is a dataframe for some reason and not a stringdatajoely
12/29/2021, 11:38 AMpandas_profiling
modular pipeline that you keep declaring every time you want to profile a dataset. I this scenario you declare the list of catalog entries you want to profile in python not YAML and create a list from there.
python
complete_profiling_pipeline = sum([pipeline(profiler, inputs=ds, outputs=ds+"_profiled") for ds in ["ds1", "ds2", "ds3"]])
Dhaval
12/29/2021, 11:44 AMdatajoely
12/29/2021, 11:45 AMDhaval
12/29/2021, 11:45 AMdatajoely
12/29/2021, 11:46 AMDhaval
12/29/2021, 11:47 AMraw_data_load.yml
file?
This piece of codedatajoely
12/29/2021, 11:48 AMDhaval
12/29/2021, 11:48 AMtable_pipelines = [
pipeline(
pipe = base_pipeline(),
inputs = {
"data": f"{table_name}",
"table_name": str(f"{table_name}"),
},
outputs = {
"json_data": f"json_{table_name}"
},
namespace = f"{table_name}",
)
for table_name in sql_data
]
datajoely
12/29/2021, 11:49 AMDhaval
12/29/2021, 11:49 AMdatajoely
12/29/2021, 11:50 AMkedro.framework.cli.utils.KedroCliError: 'list' object has no attribute 'data_sets'
error?Dhaval
12/29/2021, 11:50 AMdatajoely
12/29/2021, 11:50 AMDhaval
12/29/2021, 11:51 AMdatajoely
12/29/2021, 11:51 AMDhaval
12/29/2021, 12:04 PMdatajoely
12/29/2021, 12:05 PMjson.JSONDataSet
which is doing the sampe thingDhaval
12/29/2021, 12:06 PMdatajoely
12/29/2021, 12:07 PMprofile.json
and profile.html
accessorstext.TextDataSet
Dhaval
12/29/2021, 12:09 PMdatajoely
12/29/2021, 12:09 PMDhaval
12/29/2021, 12:10 PMdatajoely
12/29/2021, 12:11 PMDhaval
12/29/2021, 12:11 PMdatajoely
12/29/2021, 12:12 PMDhaval
12/29/2021, 12:12 PMdatajoely
12/29/2021, 12:12 PMDhaval
12/29/2021, 12:14 PMdatajoely
12/29/2021, 12:15 PMDhaval
12/29/2021, 12:15 PMdatajoely
12/29/2021, 12:15 PMDhaval
12/29/2021, 2:49 PMdatajoely
12/29/2021, 5:36 PMcreate_descriptives
so that it returns two outputs:
- profile.to_html()
-> str
- SQL data -> pd.DataFrame
Pipeline
objectcreate_pipeline()
methodpipeline()
wrapper method overriding the correct inputs and outputsafter_catalog_created
hook I've put in there is not best practice, but would work for your purposesj c h a r l e s
12/29/2021, 8:10 PMDhaval
12/29/2021, 8:21 PMj c h a r l e s
12/29/2021, 9:11 PMdatajoely
12/29/2021, 10:18 PMDhaval
12/30/2021, 7:49 AMpipelines.py
file
Let me know if you have any questions. Looking forward to your responseuser
01/04/2022, 2:35 AMcreate-catalog.py
2. This task replaces the bottom section of my catalog file with the current information that I list out in conf/base/parametersDhaval
01/04/2022, 7:06 AMkedro run
command is used. So how do you tackle that?user
01/04/2022, 7:17 AMuser
01/04/2022, 7:17 AMuser
01/04/2022, 7:17 AMuser
01/04/2022, 7:17 AMuser
01/04/2022, 7:19 AMDhaval
01/04/2022, 8:00 AMuser
01/04/2022, 8:01 AMDhaval
01/04/2022, 8:11 AMuser
01/04/2022, 8:13 AMuser
01/04/2022, 8:14 AMuser
01/04/2022, 8:14 AMuser
01/04/2022, 8:15 AMuser
01/04/2022, 8:17 AMDhaval
01/04/2022, 8:18 AMdef create_descriptives(
data: pd.DataFrame,
parameters: Dict
):
file_name = parameters["file_name"]
# Create path for saving descriptives
path = create_folder(file_type="data", folder = "descriptives")
# Create the profiling report
profile = ProfileReport(
data, title=f"{file_name} Profiling Report",
config_file="config_minimal.yml"
)
# Save the report as an HTML file and JSON for further usage
profile.to_file(f"{path}/{file_name}.html")
json_data = json.loads(profile.to_json())
with open(f"./data/02_intermediate/{file_name}.json", "w") as file:
json.dump(json_data, file)
data.to_pickle(f"./data/02_intermediate/{file_name}.pkl")
return json_data, file_name
user
01/04/2022, 8:20 AMuser
01/04/2022, 8:21 AMDhaval
01/04/2022, 8:21 AMuser
01/04/2022, 8:22 AMDhaval
01/04/2022, 8:23 AMuser
01/04/2022, 8:23 AMuser
01/04/2022, 8:23 AMuser
01/04/2022, 8:24 AMconf/base/parameters/tables.yml
and writes out all the steps for my catalog.yml to have the following files:user
01/04/2022, 8:25 AMuser
01/04/2022, 8:25 AMuser
01/04/2022, 8:27 AM#################### AUTOGENERATED #################
table_profile_results_for_users:
filepath: data/02_intermediate/users.json
type: json.JSONDataSet
table_profile_pickle_for_users:
filepath: data/02_intermediate/users.pkl
type: python.PickleDataSet
table_profile_results_for_events:
filepath: data/02_intermediate/events.json
type: json.JSONDataSet
table_profile_pickle_for_events:
filepath: data/02_intermediate/events.pkl
type: python.PickleDataSet
table_profile_results_for_activity:
filepath: data/02_intermediate/activity.json
type: json.JSONDataSet
table_profile_pickle_for_activity:
filepath: data/02_intermediate/activity.pkl
type: python.PickleDataSet
user
01/04/2022, 8:28 AMconf/base/parameters/tables.yml
data, and for each table I would run a node with the inputs and outputs as followsuser
01/04/2022, 8:37 AMfrom kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
import pandas as pd
def create_descriptives(
table_data: pd.DataFrame,
):
# Create the profiling report
profile = ProfileReport(table_data)
return profile.to_json(), profile.to_json().to_pickle()
def create_pipeline():
##
# Loop through users, events, activity
db_tables = [] # fill this by looping through your parameters file
pipeline_nodes = []
for table in db_tables:
pipeline_nodes += [
node(
func=create_descriptives,
inputs={"data": f"data_for_db_table_{table}"},
outputs=[
f"table_profile_results_for_{table}",
f"table_profile_pickle_for_{table}",
],
)
]
return pipeline(sum(pipeline_nodes))
user
01/04/2022, 8:38 AMuser
01/04/2022, 8:38 AMuser
01/04/2022, 8:38 AMuser
01/04/2022, 8:39 AM