Hi team, what is the recommended way to deal with ...
# advanced-need-help
t
Hi team, what is the recommended way to deal with a multiple data sources / one pipeline scenario? I find a lot of Kedro examples where one pipeline is applied to one single data source, but how do you go about running the same pipeline on multiple data sources and then combining the transformed data sources in a merging node? Do I need create a bash script that would run the pipeline on all the different sources, and then run another pipeline that combine the outputs of those runs, or can this be done within the Kedro framework? Thanks!
n
The node accepts arbitary number of inputs, what's stopping you to merging multiple data source?
t
I want to run a pipeline on each data source and then merge the outputs of all pipeline runs together. The pipeline is quite complex and is built in a way to handle only one data source. If I would want to rewrite each node to handle multiple sources, I am afraid it would become prohibitively complex. It feels more natural to write a pipeline that implements the data processing well, and then run the pipeline for each source in isolation. I guess an alternative from having nodes handling multiple data sources would be to create a pipeline builder that build the same pipeline for each source (with their own namespace) and add a merging node at the end. What is your take on that aproach? Is it Kedroic enough?
n
I think modular pipeline (namespace) is the right thing to do
essentially you are just running the same pipeline with a different catalog, right?
t
There is however the issue with this approach that I would have to hard code the sources in the pipeline definition, when I would like to have the sources defined in the configuration. Also from the Kedro viz perspective things will get messy...
yes exactly
n
I don't think it necessarily mess up
kedro-viz
The principle of static pipeline is easy to understand and debug - i.e. when you look at your
kedro-viz
diagram, you should be able to understand the flow
It depends on how complex the things that you are trying to do, it may works just with Jinja to generate the
catalog.yml
with some additional loops
t
My catalog is automatically generated with a hook based on the sources defined in a configuration. Maybe I could also use a hook to expand my pipeline to all sources.
a
Maybe my usecase will help you I have a pipeline that runs for multiple datasets. I define a different version of it for every dataset with a namespaced name The pipeline only needs to know the input and output catalog names and the names of data sources. This I define in python. The data catalog gets built by a hook that has access to runtime params for the specifics Kedro Viz helps a lot which wouldn't be possible to use with a hook. I mean yeah there's duplication there. But in the end it's only a dictionary with dataset to input pairs. And it allows me to work both with kedro Viz and to use the kedro catalog in ipython. Because everything is namespaced for every algorithm, dataset Ex. cars.cnnv4.model brings up the model that uses cnnv4 with the dataset cars. Whereas with using a hook if it was named model it would return whatever
t
Thanks for sharing your use case @antheas ! Does it mean that one kedro run will the pipeline on all of the sources that you configured, or do you have to trigger multiple kedro runs?
a
I'm doing multiple runs right now I have split my pipeline into an dataset ingestion part and a prediction (synthesis in my case) part The dataset ingestion is only namespaced to the dataset The prediction pipeline is also namespaced to the algorithm That means I can run the pipeline cars.ingest to ingest the cars dataset And then start running cars.cnnv4.pred (pred suffixed pipelines skip ingestion) or any another alg to run the prediction part with different algorithms. In addition, I version the dataset nodes in the . namespace so I keep artifacts when I do hyperparameter tuning Since everything is namespaced correctly I could just add the pipelines together and run them as one. But I don't need to since there's no final step yet.
For my usecase I want to run the same pipeline multiple times only changing the seed And then plot the results with a confidence interval I think for my case a post processing step will be better However in your case if you have multiple ingestion pipelines I would try to merge them I would create namespaced pipeline sets for each datasource that can run on their own (ex.
ingest.<ds>
) in the
<ds>
namespace with a
prediction
pipeline that sources them and then merge them together to a complete pipeline That way you can run all pipelines independently to make sure they are correct, access all the artifacts using ipython and view them in kedro viz
Look up the code I posted for the dataset hooks in 30 Aug for inspiration so that your catalog hook can always produce the same datasets, with versioning support
t
Very nice, I also implemented a very similar hook in order to generate my catalog. I think I will also go the modular pipeline way namespaced on source name. The only thing is, I most often don't want to include all sources in my processing, so i would like that the pipeline factory has access to my configuration files in order to fetch what subset of sources I want to consider. The limitation though is that the pipeline factory
register_pipeline
is context agnostic and it seems that there is no way neither to hook into it
a
For my datasets each one has a different number of tables and names and dependencies between them So the pipeline is bespoke for each of them In your case, if your pipeline always has the same input structure you may look into a custom partitioned dataset and into modding its checkpoint function
In my case I have a const.py file that lists the names of datasets and algorithms I want pipelines for It's good enough for what I'm doing
t
makes sense, I will try to follow this approach for now. Thanks again for sharing your experience with similar scenarios
Hi @antheas , I've started with the refactoring for my multiple source pipelines, following the namespace approach that you described. I find the namespaced config parameters ("params:xyz") quite useful for this purpose, and I am curious if you are also using this and how you structure your configuration. More precisely, if I come back to your cars.cnn4.train example, do you follow the same namespace ordering (cars -> cnn4 -> train) or do you start the namespacing with the pipeline namespace, i.e. train -> cars -> cnn4? Personally I find that using the later makes the configuration structure cleaner, because each pipeline (ingest, feature engineering, train_eval, infer, ...) has it's own isolated parameter space. Thx
a
Yes I do that as well. I found that it's more useful for the top namespace to refer to the dataset, the second namespace to refer to the pipeline/algorithm, and the third to split the algorithmic process So I have
cars.ingest
,
cars.cnn4.train
,
cars.cnn4.measure
which are combined by
cars.cnn4
So I visualize`cars.cnn4` with kedro viz and use it when I change something in my ingest process to regenerate the files. When I tune hyperparameters I run
cars.cnn4.train
, when I'm testing metrics I do
cars.cnn4.measure
to skip training I don't do feature engineering in my case so I lump everything with ingest, you can split that more if you want But I think you'll find jumping from
ingest.cars
to
cnnv4.cars
harder than the opposite, especially when it comes to namespacing your datasets/nodes. For example I namespace with
<dataset>.<split or algo>.<table>
and do that with the nodes respectively. So it's natural to name the pipelines after that. They group better in viz as well, since every node/dataset in dataset X is under the namespace X
As for parameters for me it's more complicated. I don't use kedro for that anymore. I only use it to generate the top level dictionary parameters, then feed that to a custom data structure I made that holds my hyperparameters And I essentially namespace like this:
Copy code
xml

alg:
    <insert your overrides here>

default:
    ...

<dataset>:

    tables:
        <table>:
            ....

    algs:
        <alg1>:
            ...
        <alg2>:
            ...
Each of my datasets gets a top level node in the xml file with its name and a full set of hyperparameters. In it, an
algs
tag allows tuning per algorithm This is in addition to the
default
tag, which provides the defaults for the project so I don't have to list them per dataset. I essentially merge the
default
dictionary with the
<dataset>
dictionary to form my hyperparameters when I start. I also allow for the top level to do overrides, so if I insert
alg.lr = 0.2
then the current algorithm will be sent the learning rate 0.2 With a custom cli I then do
kedro p cars.cnnv4.train alg.lr = 0.2
to run my project with a hyperparameter override each time. I also store an xml version of the hyperparameters in mlflow for future review If you want some code snippets for ex. the dictionary merging ask. In my code the following are merged to the same dict:
Copy code
alg.lr: 0.2

alg:
    lr: 0.2
t
Thanks a lot @antheas for the very detailed answer! I am also trying to solve the tradeoff between avoiding copy pasting configurations for each namespace, and the need to be able to override all configuration for the purpose of hyperparameters tuning For avoiding copy pasting, it is convenient to have a top level configuration that can be overloaded by namespace specific values such as:
Copy code
modelling:
  model1:
    default_options:
      batch_size: 128
      lr = 1
  cars:
    model1:
      custom_options:
        lr = 2
in this example lr = 2 takes precedence over lr = 1, and batch_size needs not be defined in the namespace specific However for hyperparameter tuning, the opposite is more true, the top level should take precedence
There are multiple ways to assemble the parameters/options based on the namespace, such as programmatically by building the catalog with code. An alternative which I have been trying is to use parameter mappings in pipelines factories such as:
Copy code
def create_pipeline(
    ingestion_ns: str,
    features_ns: str,
    input: str,
    has_custom_options: bool = False,
) -> Pipeline:

    parameters = {
        f"{features_ns}.default_options": f"{features_ns}.default_options",
    }
    if not has_custom_options:
        parameters[f"{features_ns}.custom_options"] = f"{features_ns}.default_options"

    return pipeline(
        pipeline(
            pipeline(
                nodes,
                namespace=features_ns,
                inputs={
                    "input": "input",
                },
            ),
            inputs={
                "input": "input",
            },
            namespace=ingestion_ns,
            parameters=parameters,
        ),
        inputs={
            "input": input,
        },
        namespace="feature_engineering",
    )
In the above strategy, the nodes expect some default and custom options that are mapped inside the factory to the proper location in the parameter space. If there are no custom_options in the catalog then the default_options are mapped also to the custom_options. custom and default options are then merged in each nodes using a node decorator
So many ways to do it, it's hard to make a choice. I will try to use as much as possible the tools of the framework, but I am definitely interested in some code snippets if you have, they are worth thousands words 🙂
a
Here's the functions that merge the hyperparameters. I pass the name of the dataset I use to the node* and then call
get_params_for_pipe()
. I feed the resulting dictionary to my Metadata structure to fit the hyperparameters. Written for python 3.10 annotations.
Copy code
python
def merge_two_dicts(a: dict, b: dict):
    """Recursively merges dictionaries a, b by prioritizing b."""

    ak = set(a.keys())
    bk = set(b.keys())
    out = {}

    for k in ak - bk:
        out[k] = a[k]
    for k in bk - ak:
        out[k] = b[k]

    for k in ak.intersection(bk):
        if isinstance(a[k], dict) and isinstance(b[k], dict):
            out[k] = merge_two_dicts(a[k], b[k])
        else:
            out[k] = b[k]

    return out


def merge_dicts(*ds: dict):
    out = {}
    for d in ds:
        out = merge_two_dicts(out, d)

    return out


def get_params_for_pipe(name: str, params: dict):
    """Returns the parameters for the provided pipeline by merging
    the nodes `default`, `<view>` and the top level one in one dictionary.

    This allows the user to set default values for all views in the `default`
    namespace, view specific overriding params in the `<view>` namespace and
    override any of them using the `--params` argument without having to use
    the parameter namespace"""
    view = name.split(".")[0]
    return merge_dicts(params.get("default", {}), params.get(view, {}), params)
*now you might ask, feed a static value to a node with kedro? How to do that? I think that's one of the biggest oversights in kedro. Some times I need to pass in a fixed string to a node that's not based on a parameter and I can't. Use a lambda you say, well, two issues with that. 1) lambdas have no name so kedro viz will show $lambda as the node name. 2) lambdas can't be serialized, so ParallelRunner will crash. 3) If you use a
functools.partial
and you change the
__name__
it won't persist through serialization and deserialization, mutating the name of the node and crashing ParallelRunner. So I made
gen_closure
for that. Took a good afternoon to get all the issues sorted. Works just like
partial
does, but you can also feed in
_fn
to change the function name. I also included a little lazy load function that you can replace node function `import __`s with. Kedro, by making node funcs be actual functions, requires you load the node's module which might include pytorch, xgboost, jax, tf or . Making kedro help/ipython take 6s to load in my case (now 3), and if you use multiple gpu frameworks (one each run) probably causes ram issues/kedro to crash. So if you feel like optimizing your startup time (esp. with parallel runner where the main process doesn't need to load libraries/most processes don't need gpus), you can use that to help you.
@datajoely you might find some of this useful.
Copy code
python
from functools import partial
from itertools import chain
from typing import Callable, TypeVar

from ...utils import get_params_for_pipe

A = TypeVar("A")


def list_unique(*args: list[A]) -> list[A]:
    return list(dict.fromkeys(chain(*args)))


class gen_closure(partial):
    """Creates a closure for function `fun`, by passing the positional arguments
    provided in this function to `fun` before the ones given to the function and
    by passing the sum of named arguments given to both functions.

    The closure retains the original function name. If desired, it can
    be renamed using the `_fn` parameter. If fn contains `%s`, it will be
    replaced with the function name"""

    def __new__(cls, func, /, *args, _fn: str | None = None, **keywords):
        self = super().__new__(cls, func, *args, **keywords)

        if _fn:
            self.__name__ = _fn.replace("%s", func.__name__)
        else:
            self.__name__ = func.__name__

        return self

def _lazy_execute(anchor: str, module: str, fun: str, *args, **kwargs):
    from importlib import import_module

    module = import_module(module, anchor)

    return getattr(module, fun)(*args, **kwargs)


def lazy_load(anchor, module: str, funs: list[str] | str):
    if isinstance(funs, str):
        return gen_closure(_lazy_execute, anchor, module, funs, _fn=funs)
    return (gen_closure(_lazy_execute, anchor, module, fun, _fn=fun) for fun in funs)
d
This is cool! Do you fancy doing a show and tell on GitHub discussions?
a
Possibly, how would a show and tell on Github discussions work
d
It's like a short blog post where people can comment. But I think it's a really good idea the community should see and discuss 💪
a
if you can send me a template/examples I can try to draft something tomorrow. I can do one for
gen_closure
, and one for parameters
d
I guess we'd love to hear your rationale, the problem it solves and what you would like from kedro in the future if this was natively supported
t
@antheas have you tried with a closure of the sort:
Copy code
python
from functools import wraps
def closure(fn, **closure_kwargs):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        return fn(*args, **{**closure_kwargs, **kwargs})
    return wrapper
node_fn_closure = closure(node_fn, node_fn_kwarg_1=10)
a
Yes. That's what the gen_closure does. Exactly the same. But it's better because your code will crash if you try to launch a process with it. The only problem with my code is that I don't wrap the function, so the documentation does not pass through In the beginning my snippet looked like yours. Then parallel runner complained
3 Views