ToniMaroni
09/09/2022, 8:41 AMnoklam
09/09/2022, 12:43 PMToniMaroni
09/09/2022, 3:17 PMnoklam
09/09/2022, 3:42 PMnoklam
09/09/2022, 3:42 PMToniMaroni
09/09/2022, 3:45 PMToniMaroni
09/09/2022, 3:45 PMnoklam
09/09/2022, 3:47 PMkedro-viz
noklam
09/09/2022, 3:47 PMkedro-viz
diagram, you should be able to understand the flownoklam
09/09/2022, 3:48 PMcatalog.yml
with some additional loopsToniMaroni
09/09/2022, 3:57 PMantheas
09/09/2022, 6:07 PMToniMaroni
09/10/2022, 1:02 PMantheas
09/10/2022, 1:09 PMantheas
09/10/2022, 1:19 PMingest.<ds>
) in the <ds>
namespace with a prediction
pipeline that sources them
and then merge them together to a complete pipeline
That way you can run all pipelines independently to make sure they are correct, access all the artifacts using ipython and view them in kedro vizantheas
09/10/2022, 1:22 PMToniMaroni
09/10/2022, 1:50 PMregister_pipeline
is context agnostic and it seems that there is no way neither to hook into itantheas
09/10/2022, 1:53 PMantheas
09/10/2022, 1:54 PMToniMaroni
09/10/2022, 2:02 PMToniMaroni
10/10/2022, 7:14 AMantheas
10/10/2022, 10:13 PMcars.ingest
, cars.cnn4.train
, cars.cnn4.measure
which are combined by cars.cnn4
So I visualize`cars.cnn4` with kedro viz and use it when I change something in my ingest process to regenerate the files. When I tune hyperparameters I run cars.cnn4.train
, when I'm testing metrics I do cars.cnn4.measure
to skip training
I don't do feature engineering in my case so I lump everything with ingest, you can split that more if you want
But I think you'll find jumping from ingest.cars
to cnnv4.cars
harder than the opposite, especially when it comes to namespacing your datasets/nodes.
For example I namespace with <dataset>.<split or algo>.<table>
and do that with the nodes respectively. So it's natural to name the pipelines after that. They group better in viz as well, since every node/dataset in dataset X is under the namespace Xantheas
10/10/2022, 10:25 PMxml
alg:
<insert your overrides here>
default:
...
<dataset>:
tables:
<table>:
....
algs:
<alg1>:
...
<alg2>:
...
Each of my datasets gets a top level node in the xml file with its name and a full set of hyperparameters. In it, an algs
tag allows tuning per algorithm
This is in addition to the default
tag, which provides the defaults for the project so I don't have to list them per dataset. I essentially merge the default
dictionary with the <dataset>
dictionary to form my hyperparameters when I start.
I also allow for the top level to do overrides, so if I insert alg.lr = 0.2
then the current algorithm will be sent the learning rate 0.2
With a custom cli I then do kedro p cars.cnnv4.train alg.lr = 0.2
to run my project with a hyperparameter override each time. I also store an xml version of the hyperparameters in mlflow for future review
If you want some code snippets for ex. the dictionary merging ask. In my code the following are merged to the same dict:
alg.lr: 0.2
alg:
lr: 0.2
ToniMaroni
10/12/2022, 1:15 PMmodelling:
model1:
default_options:
batch_size: 128
lr = 1
cars:
model1:
custom_options:
lr = 2
in this example lr = 2 takes precedence over lr = 1, and batch_size needs not be defined in the namespace specific
However for hyperparameter tuning, the opposite is more true, the top level should take precedenceToniMaroni
10/12/2022, 1:18 PMdef create_pipeline(
ingestion_ns: str,
features_ns: str,
input: str,
has_custom_options: bool = False,
) -> Pipeline:
parameters = {
f"{features_ns}.default_options": f"{features_ns}.default_options",
}
if not has_custom_options:
parameters[f"{features_ns}.custom_options"] = f"{features_ns}.default_options"
return pipeline(
pipeline(
pipeline(
nodes,
namespace=features_ns,
inputs={
"input": "input",
},
),
inputs={
"input": "input",
},
namespace=ingestion_ns,
parameters=parameters,
),
inputs={
"input": input,
},
namespace="feature_engineering",
)
In the above strategy, the nodes expect some default and custom options that are mapped inside the factory to the proper location in the parameter space. If there are no custom_options in the catalog then the default_options are mapped also to the custom_options. custom and default options are then merged in each nodes using a node decoratorToniMaroni
10/12/2022, 1:21 PMantheas
10/12/2022, 4:49 PMget_params_for_pipe()
. I feed the resulting dictionary to my Metadata structure to fit the hyperparameters.
Written for python 3.10 annotations.
python
def merge_two_dicts(a: dict, b: dict):
"""Recursively merges dictionaries a, b by prioritizing b."""
ak = set(a.keys())
bk = set(b.keys())
out = {}
for k in ak - bk:
out[k] = a[k]
for k in bk - ak:
out[k] = b[k]
for k in ak.intersection(bk):
if isinstance(a[k], dict) and isinstance(b[k], dict):
out[k] = merge_two_dicts(a[k], b[k])
else:
out[k] = b[k]
return out
def merge_dicts(*ds: dict):
out = {}
for d in ds:
out = merge_two_dicts(out, d)
return out
def get_params_for_pipe(name: str, params: dict):
"""Returns the parameters for the provided pipeline by merging
the nodes `default`, `<view>` and the top level one in one dictionary.
This allows the user to set default values for all views in the `default`
namespace, view specific overriding params in the `<view>` namespace and
override any of them using the `--params` argument without having to use
the parameter namespace"""
view = name.split(".")[0]
return merge_dicts(params.get("default", {}), params.get(view, {}), params)
antheas
10/12/2022, 5:04 PMfunctools.partial
and you change the __name__
it won't persist through serialization and deserialization, mutating the name of the node and crashing ParallelRunner.
So I made gen_closure
for that. Took a good afternoon to get all the issues sorted. Works just like partial
does, but you can also feed in _fn
to change the function name.
I also included a little lazy load function that you can replace node function `import __`s with. Kedro, by making node funcs be actual functions, requires you load the node's module which might include pytorch, xgboost, jax, tf or .
Making kedro help/ipython take 6s to load in my case (now 3), and if you use multiple gpu frameworks (one each run) probably causes ram issues/kedro to crash. So if you feel like optimizing your startup time (esp. with parallel runner where the main process doesn't need to load libraries/most processes don't need gpus), you can use that to help you.antheas
10/12/2022, 5:04 PMpython
from functools import partial
from itertools import chain
from typing import Callable, TypeVar
from ...utils import get_params_for_pipe
A = TypeVar("A")
def list_unique(*args: list[A]) -> list[A]:
return list(dict.fromkeys(chain(*args)))
class gen_closure(partial):
"""Creates a closure for function `fun`, by passing the positional arguments
provided in this function to `fun` before the ones given to the function and
by passing the sum of named arguments given to both functions.
The closure retains the original function name. If desired, it can
be renamed using the `_fn` parameter. If fn contains `%s`, it will be
replaced with the function name"""
def __new__(cls, func, /, *args, _fn: str | None = None, **keywords):
self = super().__new__(cls, func, *args, **keywords)
if _fn:
self.__name__ = _fn.replace("%s", func.__name__)
else:
self.__name__ = func.__name__
return self
def _lazy_execute(anchor: str, module: str, fun: str, *args, **kwargs):
from importlib import import_module
module = import_module(module, anchor)
return getattr(module, fun)(*args, **kwargs)
def lazy_load(anchor, module: str, funs: list[str] | str):
if isinstance(funs, str):
return gen_closure(_lazy_execute, anchor, module, funs, _fn=funs)
return (gen_closure(_lazy_execute, anchor, module, fun, _fn=fun) for fun in funs)
datajoely
10/12/2022, 5:05 PMantheas
10/12/2022, 5:06 PMdatajoely
10/12/2022, 5:07 PMantheas
10/12/2022, 5:11 PMgen_closure
, and one for parametersdatajoely
10/12/2022, 5:27 PMdatajoely
10/12/2022, 5:27 PMToniMaroni
10/16/2022, 8:45 AMpython
from functools import wraps
def closure(fn, **closure_kwargs):
@wraps(fn)
def wrapper(*args, **kwargs):
return fn(*args, **{**closure_kwargs, **kwargs})
return wrapper
node_fn_closure = closure(node_fn, node_fn_kwarg_1=10)
antheas
10/16/2022, 8:49 AM