How does one go about when one has a bunch of data...
# beginners-need-help
m
How does one go about when one has a bunch of dataset that need the same treatment? Currently I use a template in catalog.yml to create the input and output Datasets, like so:
Copy code
{% for table in openac_tables %}
{{ table }}:
  layer: primary
  type: pandas.ParquetDataSet
  filepath: data/03_primary/{{table}}.parquet
  save_args:
    from_pandas:
      preserve_index: False
{% endfor %}

{% for table in openac_tables %}
profile_{{ table }}:
  layer: qa
  type: ac_pipelines.datasets.ProfilingDataSet
  filepath: data/08_reporting/profiles/{{table}}.html
{% endfor %}
and then generate nodes in the pipeline:
Copy code
def create_pipeline(**kwargs):
    from kedro.config import ConfigLoader

    conf_paths = ["conf/base", "conf/local"]
    conf_loader = ConfigLoader(conf_paths)
    table_names = conf_loader.get('*globals.yml')['openac_tables']

    return Pipeline([
        node(func=lambda x: x,
             inputs=tn,
             outputs=f'profile_{tn}',
             name=f'profile_{tn}',
             )
        for tn in table_names
    ])
It works. But it feels hacky. It could be improved if I could get the default config_loader from somewhere. I had some success with:
Copy code
from kedro.framework.session import get_current_session

    session = get_current_session()
    context = session.load_context()
    table_names = context.config_loader.get('*globals.yml')['openac_tables']
but that confuses Kedro viz (Error: There is no active Kedro session.) More substantial improvement would be if the Pipeline/Node could be dynamically parametrized (at runtime). Don't know if that is the right term. I want to feed a variable number of Datasets to a pipeline )or node). I'm probably doing something wrong, so suggestions are welcome.
d
I'll start by saying the Kedro team aren't fans of dynamic pipelines because it can become super hard to debug
A while ago we started sourcing ideas on best practice here https://github.com/kedro-org/kedro/discussions/859#discussioncomment-1205270
where the two points here are relevant: > - Avoid dynamic DAG creation in Kedro unless you really have to > - If you have to, ensure that the DAG is structurally immutable and only differs in terms of dataset flow.
I would also warn against loading your own context like @User suggests, you can get it working but it becomes a pain parallelise and can interferes with some of key parts of Kedro's run lifecycle behind the scenes
If you really want access to the catalog - the right way to do so is to use lifecycle hooks
Specifically to this situation - the
before_pipeline_run
hook gives you access to pretty much everything you could ever want: https://kedro.readthedocs.io/en/latest/kedro.framework.hooks.specs.PipelineSpecs.html#kedro.framework.hooks.specs.PipelineSpecs.before_pipeline_run
m
@User If creating dynamic pipelines is not the Kedro Way, how would you tackle my usecase: I have a bunch of DataSets that all need the same treatment (say, generate a pandas-profiling report). Do you really manually create all the output DataSets manually? And if you need another treatment, you do it again? That would create tons of Datasets.
i
Thanks for correcting ! I didn't know I can interfere with Kedro's run lifecycle in this way !
d
@User so I have two suggestions: - I would create a pandas profiling modular pipeline which you can instantiate and reuse any number of times https://kedro.readthedocs.io/en/latest/06_nodes_and_pipelines/03_modular_pipelines.html#modular-pipelines - If you need to persist outputs or use them downstream you can actually replace the
MemoryDataSet
output references with a persisted equivalent using the code API in
after_pipeline_created
hook
m
@User I had had a look at modular pipelines,but thought that a pipeline of 1 one looked a bit silly, plus parameters are static in Kedro, right? So I cannot generate a series of pipelines (of 1 node each) by looping over my list of DataSets with different parameter values for each. Re API example: useful! There is no way to tag DataSets is there? Would be nice to be able select a subset of of the Datasets in the catalog, and then create additional datasets based on that set.
d
1 node pipelines are absolutely supported
you can override parameters with the modular pipeline syntax
it has a
paramters
argument
let me check with tags
so tags are an attribute of the nodes - you can in theory mutate them in a hook but it's not prettty
possibly pipeline
namespace
is what you're looking for
m
Dataset tagging might be useful, no?
d
maybe - but its sort of designed to be immutable
with 1 node pipelines then namespace may have similar effect
m
I think I see what you mean. Thx
d
This sample project may have some useful examples
m
The parameters argument refers to the names of parameters. So I cannot change the values, unless I setup all differently named parameters?
d
so you can mutate the actual values in the
catalog
object
and then refer those in the
parameters
override
m
I do not follow. In the catalog?
d
In the hook you can do
catalog.add({‘params:something’:object}
m
Ah I see. You mutate the catalog in the hook before each pipeline (of 1 node). Correct?
The modular spaceflights project looks very useful. Thanks for that and your tips!
d
Yes I think that's how you would do it
if you're able to share your final implementation that would be great to see
I'm keen to document some ideas for future people hitting this problem
a
FYI https://github.com/kedro-org/kedro/issues/750 has quite a thorough discussion of this and some more ideas 🙂
(not the original question in that GH issue, but a couple of posts down)
d
Hi. The title of this thread seems relevant to what I'm looking for.
Which isn't exactly what's been discussed up until now...
But I guess, this would be the place to ask.
I have two pipelines I would like to connect in such a way as the first runs completely before the second
there is no pre-existing input/output relationship I can use to do it, as they are now
d
The easiest way to achieve that is to make a dataset dependency between pipeline A and B
d
yes, i gathered as much from the docs
d
So you can just pass a
True
value to just enforce the topological order
The other approach is to get creative with you CLI commands
kedro run —pipeline a && kedro run —pipeline b
d
you mean, just not do it within the kedro/python code at all
d
&& will run one completely before the second
d
yeah... that seems like the wise way
d
I don’t think the object pass through approach is wrong
d
I'm case it would require heavy modification to both pipelines
because they are both extremely parallel
which is what led me down this train of thought
*in my case
d
You should opt for whatever is the easiest for you to manage you mental model and maintain
d
wise council
thank you
it's funny how often "just don't do it" is the best way to do something
lol
d
Hahah
I like the phrase: “write code for other people to read, especially if that person is future you”
Good luck!
d
Hello, again...
I'm trying to solve the issue I mentioned yesterday by adding the dataset dependency explicitly in the pipelines... so far I've got... In the catalog.yml: requirements_met: type: kedro.io.MemoryDataSet ===== In nodes.py: def gather_reqs(req1 = None, req2 = None) -> bool: if pivot_qry_exist is None: return False if primary_validations is None: return False return True ===== In pipeline.py return Pipeline( [ node( gather_reqs, inputs = [req1, req2], outputs = "requirements_met", name = "check_requirements" ), node ( step, inputs = ["dataset", "requirements_met"] outputs = "out" ), node ( other_step, inputs = ["other_dataset", "requirements_met"] outputs = "other_out" ) ] )
I've set things up so that the requirements are not really there (req1, req) ... but the nodes still run
Any idea what I'm missing?
d
can you tell me what
step
and
other_step
are doing?
Am I right in saying you only want one of them to run?
d
Hi, they do some feature eng on the dataframe the receive as input
and I need them all to run
but I would like them to run only if requirements are met
d
okay and why aren't they running?
do you get an error?
d
they are running
and the requirements aren't met
as in: req1 and req2 are datasets in the catalog and they do not exist and there is no node in this pipeline to create them
d
oh I think I understand what you are trying to do
d
i had my other pipeline write out req1, req2... if it finishes ok.
d
maybe the right way to do this is to define a custom DataSet that returns an empty
DF
or
none
if it doesn't exist rather an error?
that would be easier to handle in the node right?
d
hmmm... while I was waiting I tried something: I changed the definition of requirements_met in the catalog to an actual file (TextDataSet)
d
I just saw this
Copy code
yaml
requirements_met:
  type: kedro.io.MemoryDataSet
you don't need to explicitly declare it
d
so now none of req1, req2 or requirements_met exist... the pipeline tried to run all the nodes but then finally failed because requirements_met doesn't exist.
I tried it without declaring it first
d
what error did you get?
d
the pipeline ran and generated its results normally, completely ignored my attempt at a stopcheck node
hehehe
d
maybe put a
breakpoint()
in and inspect within
step_x
nodes?
d
ok, thx.. I'll keep hammering at it
I think I got it... I had typo.
I mistyped the dataset in the check_requirements node...
d
That's my kind of error!
d
if only all things were that simple... indeed.
2 Views