NC
12/14/2021, 2:58 PMj c h a r l e s
12/14/2021, 6:08 PMRroger
12/15/2021, 11:07 PMkedro viz
but “there is no active Kedro session”. Does anyone know how to make it work? I had managed to run it successfully before, just not sure what has changed since then.datajoely
12/16/2021, 8:47 AMczix
12/16/2021, 1:26 PMdatajoely
12/16/2021, 1:48 PMpython
my_node_func() -> Tuple[int]:
return tuple([1,2])
czix
12/16/2021, 2:03 PMpython
Pipeline([
node(func=my_node_func, input=None, output=["a","b"])
])
Or am I wrong?datajoely
12/16/2021, 2:06 PMczix
12/16/2021, 2:08 PMoutput=["ab"]
?datajoely
12/16/2021, 2:10 PMoutput="a"
and "a"
would store a tupleczix
12/16/2021, 2:11 PMdatajoely
12/16/2021, 2:15 PMdatajoely
12/16/2021, 2:17 PMcatalog.yml
yaml
a:
type: MemoryDataSet
copy_mode: copy
czix
12/16/2021, 2:27 PMDhaval
12/16/2021, 4:23 PMdatajoely
12/16/2021, 4:29 PMDhaval
12/16/2021, 4:30 PMRroger
12/16/2021, 9:21 PMRroger
12/17/2021, 12:54 AMdatajoely
12/17/2021, 7:35 AMkedro.extras.datasets.pandas.SQLQueryDataSet
kedro.extras.datasets.pandas.SQLTableDataSet
kedro.extras.datasets.spark.SparkJDBCDataSet
kedro.extras.datasets.spark.SparkHiveDataSet
RRoger
12/17/2021, 9:25 PMdata_ingestion/pipeline.py
node(
name="upload_to_db",
func=lambda x: x,
input="shuttles",
output="shuttles_table",
),
in catalog_01_raw.yml
shuttles_table:
type: pandas.SQLTableDataSet
table_name: shuttles
credentials: postgres
save_args:
if_exists: replace
but the log shows that shuttles_table
is a MemoryDataSet
2021-12-18 08:24:36,993 - kedro.pipeline.node - INFO - Running node: <lambda>([shuttles]) -> [data_ingestion.shuttles_table]
2021-12-18 08:24:36,993 - kedro.io.data_catalog - INFO - Saving data to `data_ingestion.shuttles_table` (MemoryDataSet)...
And the table is not created in the database.RRoger
12/17/2021, 9:37 PMnew_ingestion_pipeline
Pipeline
. I didn't realise that creating another function just to add a namespace to an existing Pipeline
is done.RRoger
12/17/2021, 9:53 PMB
) is dependent on a previous node (node A
) having uploaded to a database (e.g. some_table
as pandas.SQLTableDataSet
) and I use some_table
as the input for B
, does B
automatically try to download some_table
to memory (if not already in memory)? I would not like the data downloaded if:
- the data is large, hence most of the pipeline time is spent on downloading
- `B`'s code is to run SQL queries without ever requiring the data locallydatajoely
12/18/2021, 8:48 AMdatajoely
12/18/2021, 8:48 AMdatajoely
12/18/2021, 8:49 AMfanzipei
12/19/2021, 3:36 AMdatajoely
12/19/2021, 10:03 AMfanzipei
12/19/2021, 1:15 PMexample_iris_data_gz:
type: pandas.CSVDataSet
filepath: data/02_intermediate/iris.csv.gz
load_args:
header: null
compression: gzip
save_args:
index: null
compression: gzip
and add a node which load the example_iris_data and export the example_iris_data_gz. Here I added the new node
def compression(df):
return df
and added it to the pipeline as:
node(
compression,
'example_iris_data',
'example_iris_data_gz',
name='compression'
)
Then run
kedro run --from-nodes='compression'
There is a warning message as:
C:\Users\fanzi\anaconda3\envs\kedro\lib\site-packages\pandas\io\common.py:609: RuntimeWarning: compression has no effect when passing a non-binary object as input.
ioargs = _get_filepath_or_buffer(
Finally get a iris.csv.gz file that is actually only a text file.datajoely
12/19/2021, 7:00 PM