datajoely
12/14/2021, 2:56 PMYAMLDataSet
for the parameters you generate at runtime for safe keeping, then either an automatic or manual process outside of Kedro could mirror those in you actual paramerters.yml
NC
12/14/2021, 2:58 PMdatajoely
12/14/2021, 2:58 PMdatajoely
12/14/2021, 2:58 PMNC
12/14/2021, 2:58 PMj c h a r l e s
12/14/2021, 6:08 PMRroger
12/15/2021, 11:07 PMkedro viz
but āthere is no active Kedro sessionā. Does anyone know how to make it work? I had managed to run it successfully before, just not sure what has changed since then.datajoely
12/16/2021, 8:47 AMczix
12/16/2021, 1:26 PMdatajoely
12/16/2021, 1:48 PMpython
my_node_func() -> Tuple[int]:
return tuple([1,2])
czix
12/16/2021, 2:03 PMpython
Pipeline([
node(func=my_node_func, input=None, output=["a","b"])
])
Or am I wrong?datajoely
12/16/2021, 2:06 PMczix
12/16/2021, 2:08 PMoutput=["ab"]
?datajoely
12/16/2021, 2:10 PMoutput="a"
and "a"
would store a tupleczix
12/16/2021, 2:11 PMdatajoely
12/16/2021, 2:15 PMdatajoely
12/16/2021, 2:17 PMcatalog.yml
yaml
a:
type: MemoryDataSet
copy_mode: copy
czix
12/16/2021, 2:27 PMDhaval
12/16/2021, 4:23 PMdatajoely
12/16/2021, 4:29 PMDhaval
12/16/2021, 4:30 PMRroger
12/16/2021, 9:21 PMRroger
12/17/2021, 12:54 AMdatajoely
12/17/2021, 7:35 AMkedro.extras.datasets.pandas.SQLQueryDataSet
kedro.extras.datasets.pandas.SQLTableDataSet
kedro.extras.datasets.spark.SparkJDBCDataSet
kedro.extras.datasets.spark.SparkHiveDataSet
RRoger
12/17/2021, 9:25 PMdata_ingestion/pipeline.py
node(
name="upload_to_db",
func=lambda x: x,
input="shuttles",
output="shuttles_table",
),
in catalog_01_raw.yml
shuttles_table:
type: pandas.SQLTableDataSet
table_name: shuttles
credentials: postgres
save_args:
if_exists: replace
but the log shows that shuttles_table
is a MemoryDataSet
2021-12-18 08:24:36,993 - kedro.pipeline.node - INFO - Running node: <lambda>([shuttles]) -> [data_ingestion.shuttles_table]
2021-12-18 08:24:36,993 - kedro.io.data_catalog - INFO - Saving data to `data_ingestion.shuttles_table` (MemoryDataSet)...
And the table is not created in the database.RRoger
12/17/2021, 9:37 PMnew_ingestion_pipeline
Pipeline
. I didn't realise that creating another function just to add a namespace to an existing Pipeline
is done.RRoger
12/17/2021, 9:53 PMB
) is dependent on a previous node (node A
) having uploaded to a database (e.g. some_table
as pandas.SQLTableDataSet
) and I use some_table
as the input for B
, does B
automatically try to download some_table
to memory (if not already in memory)? I would not like the data downloaded if:
- the data is large, hence most of the pipeline time is spent on downloading
- `B`'s code is to run SQL queries without ever requiring the data locallydatajoely
12/18/2021, 8:48 AMdatajoely
12/18/2021, 8:48 AMdatajoely
12/18/2021, 8:49 AMdatajoely
12/18/2021, 8:49 AM