mjmare
02/15/2022, 11:15 AM{% for table in openac_tables %}
{{ table }}:
layer: primary
type: pandas.ParquetDataSet
filepath: data/03_primary/{{table}}.parquet
save_args:
from_pandas:
preserve_index: False
{% endfor %}
{% for table in openac_tables %}
profile_{{ table }}:
layer: qa
type: ac_pipelines.datasets.ProfilingDataSet
filepath: data/08_reporting/profiles/{{table}}.html
{% endfor %}
and then generate nodes in the pipeline:
def create_pipeline(**kwargs):
from kedro.config import ConfigLoader
conf_paths = ["conf/base", "conf/local"]
conf_loader = ConfigLoader(conf_paths)
table_names = conf_loader.get('*globals.yml')['openac_tables']
return Pipeline([
node(func=lambda x: x,
inputs=tn,
outputs=f'profile_{tn}',
name=f'profile_{tn}',
)
for tn in table_names
])
It works. But it feels hacky.
It could be improved if I could get the default config_loader from somewhere. I had some success with:
from kedro.framework.session import get_current_session
session = get_current_session()
context = session.load_context()
table_names = context.config_loader.get('*globals.yml')['openac_tables']
but that confuses Kedro viz (Error: There is no active Kedro session.)
More substantial improvement would be if the Pipeline/Node could be dynamically parametrized (at runtime). Don't know if that is the right term. I want to feed a variable number of Datasets to a pipeline )or node).
I'm probably doing something wrong, so suggestions are welcome.datajoely
02/15/2022, 12:29 PMbefore_pipeline_run
hook gives you access to pretty much everything you could ever want:
https://kedro.readthedocs.io/en/latest/kedro.framework.hooks.specs.PipelineSpecs.html#kedro.framework.hooks.specs.PipelineSpecs.before_pipeline_runmjmare
02/15/2022, 12:45 PMIsaac89
02/15/2022, 12:45 PMdatajoely
02/15/2022, 12:48 PMMemoryDataSet
output references with a persisted equivalent using the code API in after_pipeline_created
hookmjmare
02/15/2022, 1:04 PMdatajoely
02/15/2022, 1:05 PMparamters
argumentnamespace
is what you're looking formjmare
02/15/2022, 1:08 PMdatajoely
02/15/2022, 1:10 PMmjmare
02/15/2022, 1:11 PMdatajoely
02/15/2022, 1:11 PMmjmare
02/15/2022, 1:12 PMdatajoely
02/15/2022, 1:12 PMcatalog
objectparameters
overridemjmare
02/15/2022, 1:15 PMdatajoely
02/15/2022, 1:15 PMcatalog.add({‘params:something’:object}
mjmare
02/15/2022, 1:18 PMdatajoely
02/15/2022, 1:39 PMantony.milne
02/15/2022, 1:41 PMDarthGreedius
02/15/2022, 9:24 PMdatajoely
02/15/2022, 9:27 PMDarthGreedius
02/15/2022, 9:28 PMdatajoely
02/15/2022, 9:28 PMTrue
value to just enforce the topological orderkedro run —pipeline a && kedro run —pipeline b
DarthGreedius
02/15/2022, 9:29 PMdatajoely
02/15/2022, 9:29 PMDarthGreedius
02/15/2022, 9:29 PMdatajoely
02/15/2022, 9:29 PMDarthGreedius
02/15/2022, 9:30 PMdatajoely
02/15/2022, 9:31 PMDarthGreedius
02/15/2022, 9:31 PMdatajoely
02/15/2022, 9:32 PMDarthGreedius
02/16/2022, 6:01 PMdatajoely
02/16/2022, 6:30 PMstep
and other_step
are doing?DarthGreedius
02/16/2022, 6:32 PMdatajoely
02/16/2022, 6:33 PMDarthGreedius
02/16/2022, 6:33 PMdatajoely
02/16/2022, 6:35 PMDarthGreedius
02/16/2022, 6:35 PMdatajoely
02/16/2022, 6:35 PMDF
or none
if it doesn't exist rather an error?DarthGreedius
02/16/2022, 6:37 PMdatajoely
02/16/2022, 6:38 PMyaml
requirements_met:
type: kedro.io.MemoryDataSet
DarthGreedius
02/16/2022, 6:38 PMdatajoely
02/16/2022, 6:39 PMDarthGreedius
02/16/2022, 6:39 PMdatajoely
02/16/2022, 6:43 PMbreakpoint()
in and inspect within step_x
nodes?DarthGreedius
02/16/2022, 6:44 PMdatajoely
02/16/2022, 7:06 PMDarthGreedius
02/16/2022, 7:07 PM