Title
#beginners-need-help
m

mjmare

02/15/2022, 11:15 AM
How does one go about when one has a bunch of dataset that need the same treatment? Currently I use a template in catalog.yml to create the input and output Datasets, like so:
{% for table in openac_tables %}
{{ table }}:
  layer: primary
  type: pandas.ParquetDataSet
  filepath: data/03_primary/{{table}}.parquet
  save_args:
    from_pandas:
      preserve_index: False
{% endfor %}

{% for table in openac_tables %}
profile_{{ table }}:
  layer: qa
  type: ac_pipelines.datasets.ProfilingDataSet
  filepath: data/08_reporting/profiles/{{table}}.html
{% endfor %}
and then generate nodes in the pipeline:
def create_pipeline(**kwargs):
    from kedro.config import ConfigLoader

    conf_paths = ["conf/base", "conf/local"]
    conf_loader = ConfigLoader(conf_paths)
    table_names = conf_loader.get('*globals.yml')['openac_tables']

    return Pipeline([
        node(func=lambda x: x,
             inputs=tn,
             outputs=f'profile_{tn}',
             name=f'profile_{tn}',
             )
        for tn in table_names
    ])
It works. But it feels hacky. It could be improved if I could get the default config_loader from somewhere. I had some success with:
from kedro.framework.session import get_current_session

    session = get_current_session()
    context = session.load_context()
    table_names = context.config_loader.get('*globals.yml')['openac_tables']
but that confuses Kedro viz (Error: There is no active Kedro session.) More substantial improvement would be if the Pipeline/Node could be dynamically parametrized (at runtime). Don't know if that is the right term. I want to feed a variable number of Datasets to a pipeline )or node). I'm probably doing something wrong, so suggestions are welcome.
datajoely

datajoely

02/15/2022, 12:29 PM
I'll start by saying the Kedro team aren't fans of dynamic pipelines because it can become super hard to debug
12:32 PM
A while ago we started sourcing ideas on best practice here https://github.com/kedro-org/kedro/discussions/859#discussioncomment-1205270
12:33 PM
where the two points here are relevant:
- Avoid dynamic DAG creation in Kedro unless you really have to - If you have to, ensure that the DAG is structurally immutable and only differs in terms of dataset flow.
12:34 PM
I would also warn against loading your own context like @User suggests, you can get it working but it becomes a pain parallelise and can interferes with some of key parts of Kedro's run lifecycle behind the scenes
12:34 PM
If you really want access to the catalog - the right way to do so is to use lifecycle hooks
12:35 PM
Specifically to this situation - the
before_pipeline_run
hook gives you access to pretty much everything you could ever want:https://kedro.readthedocs.io/en/latest/kedro.framework.hooks.specs.PipelineSpecs.html#kedro.framework.hooks.specs.PipelineSpecs.before_pipeline_run
m

mjmare

02/15/2022, 12:45 PM
@User If creating dynamic pipelines is not the Kedro Way, how would you tackle my usecase: I have a bunch of DataSets that all need the same treatment (say, generate a pandas-profiling report). Do you really manually create all the output DataSets manually? And if you need another treatment, you do it again? That would create tons of Datasets.
Isaac89

Isaac89

02/15/2022, 12:45 PM
Thanks for correcting ! I didn't know I can interfere with Kedro's run lifecycle in this way !
datajoely

datajoely

02/15/2022, 12:48 PM
@User so I have two suggestions: - I would create a pandas profiling modular pipeline which you can instantiate and reuse any number of times https://kedro.readthedocs.io/en/latest/06_nodes_and_pipelines/03_modular_pipelines.html#modular-pipelines - If you need to persist outputs or use them downstream you can actually replace the
MemoryDataSet
output references with a persisted equivalent using the code API in
after_pipeline_created
hook
m

mjmare

02/15/2022, 1:04 PM
@User I had had a look at modular pipelines,but thought that a pipeline of 1 one looked a bit silly, plus parameters are static in Kedro, right? So I cannot generate a series of pipelines (of 1 node each) by looping over my list of DataSets with different parameter values for each. Re API example: useful! There is no way to tag DataSets is there? Would be nice to be able select a subset of of the Datasets in the catalog, and then create additional datasets based on that set.
datajoely

datajoely

02/15/2022, 1:05 PM
1 node pipelines are absolutely supported
1:07 PM
you can override parameters with the modular pipeline syntax
1:07 PM
it has a
paramters
argument
1:07 PM
let me check with tags
1:08 PM
so tags are an attribute of the nodes - you can in theory mutate them in a hook but it's not prettty
1:08 PM
possibly pipeline
namespace
is what you're looking for
m

mjmare

02/15/2022, 1:08 PM
Dataset tagging might be useful, no?
datajoely

datajoely

02/15/2022, 1:10 PM
maybe - but its sort of designed to be immutable
1:10 PM
with 1 node pipelines then namespace may have similar effect
m

mjmare

02/15/2022, 1:11 PM
I think I see what you mean. Thx
datajoely

datajoely

02/15/2022, 1:11 PM
This sample project may have some useful examples
m

mjmare

02/15/2022, 1:12 PM
The parameters argument refers to the names of parameters. So I cannot change the values, unless I setup all differently named parameters?
datajoely

datajoely

02/15/2022, 1:12 PM
so you can mutate the actual values in the
catalog
object
1:13 PM
and then refer those in the
parameters
override
m

mjmare

02/15/2022, 1:15 PM
I do not follow. In the catalog?
datajoely

datajoely

02/15/2022, 1:15 PM
In the hook you can do
1:16 PM
catalog.add({‘params:something’:object}
m

mjmare

02/15/2022, 1:18 PM
Ah I see. You mutate the catalog in the hook before each pipeline (of 1 node). Correct?
1:21 PM
The modular spaceflights project looks very useful. Thanks for that and your tips!
datajoely

datajoely

02/15/2022, 1:39 PM
Yes I think that's how you would do it
1:39 PM
if you're able to share your final implementation that would be great to see
1:39 PM
I'm keen to document some ideas for future people hitting this problem
antony.milne

antony.milne

02/15/2022, 1:41 PM
FYI https://github.com/kedro-org/kedro/issues/750 has quite a thorough discussion of this and some more ideas 🙂
1:42 PM
(not the original question in that GH issue, but a couple of posts down)
d

DarthGreedius

02/15/2022, 9:24 PM
Hi. The title of this thread seems relevant to what I'm looking for.
9:24 PM
Which isn't exactly what's been discussed up until now...
9:24 PM
But I guess, this would be the place to ask.
9:25 PM
I have two pipelines I would like to connect in such a way as the first runs completely before the second
9:27 PM
there is no pre-existing input/output relationship I can use to do it, as they are now
datajoely

datajoely

02/15/2022, 9:27 PM
The easiest way to achieve that is to make a dataset dependency between pipeline A and B
d

DarthGreedius

02/15/2022, 9:28 PM
yes, i gathered as much from the docs
datajoely

datajoely

02/15/2022, 9:28 PM
So you can just pass a
True
value to just enforce the topological order
9:28 PM
The other approach is to get creative with you CLI commands
9:29 PM
kedro run —pipeline a && kedro run —pipeline b
d

DarthGreedius

02/15/2022, 9:29 PM
you mean, just not do it within the kedro/python code at all
datajoely

datajoely

02/15/2022, 9:29 PM
&& will run one completely before the second
d

DarthGreedius

02/15/2022, 9:29 PM
yeah... that seems like the wise way
datajoely

datajoely

02/15/2022, 9:29 PM
I don’t think the object pass through approach is wrong
d

DarthGreedius

02/15/2022, 9:30 PM
I'm case it would require heavy modification to both pipelines
9:30 PM
because they are both extremely parallel
9:30 PM
which is what led me down this train of thought
9:31 PM
*in my case
datajoely

datajoely

02/15/2022, 9:31 PM
You should opt for whatever is the easiest for you to manage you mental model and maintain
d

DarthGreedius

02/15/2022, 9:31 PM
wise council
9:31 PM
thank you
9:32 PM
it's funny how often "just don't do it" is the best way to do something
9:32 PM
lol
datajoely

datajoely

02/15/2022, 9:32 PM
Hahah
9:32 PM
I like the phrase: “write code for other people to read, especially if that person is future you”
9:33 PM
Good luck!
d

DarthGreedius

02/16/2022, 6:01 PM
Hello, again...
6:04 PM
I'm trying to solve the issue I mentioned yesterday by adding the dataset dependency explicitly in the pipelines... so far I've got... In the catalog.yml: requirements_met: type: kedro.io.MemoryDataSet ===== In nodes.py: def gather_reqs(req1 = None, req2 = None) -> bool: if pivot_qry_exist is None: return False if primary_validations is None: return False return True ===== In pipeline.py return Pipeline( [ node( gather_reqs, inputs = [req1, req2], outputs = "requirements_met", name = "check_requirements" ), node ( step, inputs = ["dataset", "requirements_met"] outputs = "out" ), node ( other_step, inputs = ["other_dataset", "requirements_met"] outputs = "other_out" ) ] )
6:05 PM
I've set things up so that the requirements are not really there (req1, req) ... but the nodes still run
6:05 PM
Any idea what I'm missing?
datajoely

datajoely

02/16/2022, 6:30 PM
can you tell me what
step
and
other_step
are doing?
6:30 PM
Am I right in saying you only want one of them to run?
d

DarthGreedius

02/16/2022, 6:32 PM
Hi, they do some feature eng on the dataframe the receive as input
6:33 PM
and I need them all to run
6:33 PM
but I would like them to run only if requirements are met
datajoely

datajoely

02/16/2022, 6:33 PM
okay and why aren't they running?
6:33 PM
do you get an error?
d

DarthGreedius

02/16/2022, 6:33 PM
they are running
6:34 PM
and the requirements aren't met
6:34 PM
as in: req1 and req2 are datasets in the catalog and they do not exist and there is no node in this pipeline to create them
datajoely

datajoely

02/16/2022, 6:35 PM
oh I think I understand what you are trying to do
d

DarthGreedius

02/16/2022, 6:35 PM
i had my other pipeline write out req1, req2... if it finishes ok.
datajoely

datajoely

02/16/2022, 6:35 PM
maybe the right way to do this is to define a custom DataSet that returns an empty
DF
or
none
if it doesn't exist rather an error?
6:35 PM
that would be easier to handle in the node right?
d

DarthGreedius

02/16/2022, 6:37 PM
hmmm... while I was waiting I tried something: I changed the definition of requirements_met in the catalog to an actual file (TextDataSet)
datajoely

datajoely

02/16/2022, 6:38 PM
I just saw this
yaml
requirements_met:
  type: kedro.io.MemoryDataSet
6:38 PM
you don't need to explicitly declare it
d

DarthGreedius

02/16/2022, 6:38 PM
so now none of req1, req2 or requirements_met exist... the pipeline tried to run all the nodes but then finally failed because requirements_met doesn't exist.
6:39 PM
I tried it without declaring it first
datajoely

datajoely

02/16/2022, 6:39 PM
what error did you get?
d

DarthGreedius

02/16/2022, 6:39 PM
the pipeline ran and generated its results normally, completely ignored my attempt at a stopcheck node
6:39 PM
hehehe
datajoely

datajoely

02/16/2022, 6:43 PM
maybe put a
breakpoint()
in and inspect within
step_x
nodes?
d

DarthGreedius

02/16/2022, 6:44 PM
ok, thx.. I'll keep hammering at it
7:05 PM
I think I got it... I had typo.
7:06 PM
I mistyped the dataset in the check_requirements node...
datajoely

datajoely

02/16/2022, 7:06 PM
That's my kind of error!
d

DarthGreedius

02/16/2022, 7:07 PM
if only all things were that simple... indeed.