ggerog
02/09/2022, 4:52 PMbefore_node_run
into a variable. I mostly just use hooks to get the side-effects.datajoely
02/09/2022, 4:53 PMinputs
dict to a new valuedatajoely
02/09/2022, 4:54 PMdatajoely
02/09/2022, 4:54 PMggerog
02/09/2022, 4:55 PMbefore_node_run
, can be used as an input?datajoely
02/09/2022, 4:56 PMinputs
provided to the nodeggerog
02/09/2022, 4:58 PMbefore_node_run
datajoely
02/09/2022, 4:59 PMdatajoely
02/09/2022, 4:59 PMggerog
02/09/2022, 5:02 PMdatajoely
02/09/2022, 5:17 PMdatajoely
02/09/2022, 5:17 PMggerog
02/09/2022, 5:22 PMdatajoely
02/09/2022, 5:38 PMdatajoely
02/09/2022, 5:38 PMdatajoely
02/09/2022, 5:39 PMggerog
02/10/2022, 8:11 AMIsaac89
02/10/2022, 3:22 PMantony.milne
02/10/2022, 3:30 PMsettings.py
) but possibly there's something else going on that's obliterating the attribute. Let me try it out myself and see if it worksantony.milne
02/10/2022, 3:35 PMdatajoely
02/10/2022, 3:36 PMIsaac89
02/10/2022, 3:47 PMIsaac89
02/10/2022, 3:48 PMIsaac89
02/10/2022, 3:55 PMczix
02/11/2022, 1:16 PMget_current_session()
method is removed, how do I get the current session when running?datajoely
02/11/2022, 1:30 PMIsaac89
02/11/2022, 10:41 PMRRoger
02/12/2022, 4:56 AMDATASET_EXPECTATION_MAPPING
is defined in the class itself:
class DataValidationHooks:
# Map expectation to dataset
DATASET_EXPECTATION_MAPPING = {
"companies": "raw_companies_dataset_expectation",
"preprocessed_companies": "preprocessed_companies_dataset_expectation",
}
...
Is it possible to define this in the parameters yml? before_node_run
and after_node_run
doesn't seem to pass in the context
.datajoely
02/13/2022, 5:00 PMcatalog
objectmjmare
02/15/2022, 11:15 AM{% for table in openac_tables %}
{{ table }}:
layer: primary
type: pandas.ParquetDataSet
filepath: data/03_primary/{{table}}.parquet
save_args:
from_pandas:
preserve_index: False
{% endfor %}
{% for table in openac_tables %}
profile_{{ table }}:
layer: qa
type: ac_pipelines.datasets.ProfilingDataSet
filepath: data/08_reporting/profiles/{{table}}.html
{% endfor %}
and then generate nodes in the pipeline:
def create_pipeline(**kwargs):
from kedro.config import ConfigLoader
conf_paths = ["conf/base", "conf/local"]
conf_loader = ConfigLoader(conf_paths)
table_names = conf_loader.get('*globals.yml')['openac_tables']
return Pipeline([
node(func=lambda x: x,
inputs=tn,
outputs=f'profile_{tn}',
name=f'profile_{tn}',
)
for tn in table_names
])
It works. But it feels hacky.
It could be improved if I could get the default config_loader from somewhere. I had some success with:
from kedro.framework.session import get_current_session
session = get_current_session()
context = session.load_context()
table_names = context.config_loader.get('*globals.yml')['openac_tables']
but that confuses Kedro viz (Error: There is no active Kedro session.)
More substantial improvement would be if the Pipeline/Node could be dynamically parametrized (at runtime). Don't know if that is the right term. I want to feed a variable number of Datasets to a pipeline )or node).
I'm probably doing something wrong, so suggestions are welcome.