ChainYo
01/13/2022, 10:16 AMdatajoely
01/13/2022, 10:50 AMChainYo
01/13/2022, 10:51 AMdatajoely
01/13/2022, 10:54 AMpython
class APICatalogHooks:
@hook_impl
def after_catalog_created(self, catalog, conf_catalog, conf_creds, feed_dict, save_version, load_versions, run_id):
"""This is an advanced use of the catalog hook so that we create the right
catalog entries at runtime based on the inputs to the `params:api_stuff`. The alternative
to this is that your non-technical users would have to create the three output
dataset entries in the catalog for every input they declare
"""
api_datasets_to_create = feed_dict['params:api_stuff']
for dataset in api_datasets_to_create:
new_entry = {
f"{dataset['compare_1']}_vs_{dataset['compare_2']}_market_chart" : APIDataSet(url=f'https://api.coingecko.com...{params}...'),
}
catalog.add(new_entry, replace=True)
settings.py
ChainYo
01/13/2022, 10:58 AMdatajoely
01/13/2022, 10:59 AMMemoryDataSets
for every input/output in the pipeline definition NOT in the catalogChainYo
01/13/2022, 11:00 AMsettings.py
so when I run the kedro project it will do the jobdatajoely
01/13/2022, 11:01 AMMemoryDataSets
with the real references to the APIDataSet
instancesChainYo
01/13/2022, 11:02 AMdatajoely
01/13/2022, 11:02 AMChainYo
01/13/2022, 11:04 AM(self, catalog, conf_catalog, conf_creds, feed_dict, save_version, load_versions, run_id)
datajoely
01/13/2022, 11:05 AMbefore_node_run
has a function signature available that exposes certain things - in this case the catlaogChainYo
01/13/2022, 11:23 AMfeed_dict['params:api_stuff']
If my comprehension is correct, I create a file in ~/conf/base/feed_dict.yml
with this inside :
yml
params:
- bitcoin
- ethereum
- chiliz
...
datajoely
01/13/2022, 11:23 AMparameters.yaml
looks likecrypto_currencies:
- bitcoin
- ethereum
- chiliz
feed_dict['params:crypto_currencies']
and you would get a python list of the 3 strings in that keyChainYo
01/13/2022, 11:25 AMdatajoely
01/13/2022, 11:25 AMfeed_dict
is a bit of an old nameChainYo
01/13/2022, 11:26 AMadd_feed_dict()
right now ?datajoely
01/13/2022, 11:26 AMChainYo
01/13/2022, 11:30 AMdatajoely
01/13/2022, 11:32 AMcombinations
a interable and the number 2 it will generate every paircombinations
returns a generatorlist
does that for readability here, but you could do a for loop tooitertools
is genuinely one of the best bits of the standard librarypermutations
ChainYo
01/13/2022, 11:34 AMdatajoely
01/13/2022, 11:35 AMChainYo
01/13/2022, 1:03 PMdatajoely
01/13/2022, 1:11 PMfeed_dict
kwargbreakpoint()
in the hook bodyChainYo
01/13/2022, 1:13 PMfeed_dict
to my function argsdatajoely
01/13/2022, 1:14 PMChainYo
01/13/2022, 1:14 PMfeed_dict
btwdatajoely
01/13/2022, 1:14 PMChainYo
01/13/2022, 3:43 PMcatalog.add()
line but fixed it with the source codepython
catalog.add(
data_set_name=f"{dataset[0]}_vs_{dataset[1]}_market_chart",
data_set=APIDataSet(
url=f"https://api.coingecko.com/api/v3/coins/{dataset[0]}/market_chart?vs_currency={dataset[1]}&days=max&interval=daily"
),
replace=True
)
new_entry
dict doesn't work the kedro run pipeline
raise an error because it's missing a name for the dataset, so I removed new_entry
and added both inputs directly into the add()
function 🙂datajoely
01/13/2022, 4:02 PMuser
01/13/2022, 4:22 PMChainYo
01/13/2022, 4:46 PMin order to create the nodes of my pipeline
👀datajoely
01/13/2022, 4:57 PMChainYo
01/13/2022, 5:26 PMuser
01/14/2022, 1:48 AMChainYo
01/15/2022, 5:04 PMkedro run --params key:value key2:value ...
kedro run --params currency:bitcoin,compare:usd
How do I format the node inputs ?
I have tried :
python
node(
func=format_market_chart_to_dataframe,
inputs="params:currency_vs_params:compare_market_chart",
outputs="fetched_params:currency_vs_params:compare_market_chart",
name="fetched_data_node",
),
bash
ValueError: Pipeline input(s) {'params:currency_vs_params:compare_market_chart'} not found in the DataCatalog
I also tried by adding single quote inside multi-quoteKedro
is so awesome I want to use it at his full potential 😄datajoely
01/16/2022, 4:11 PMChainYo
01/17/2022, 7:01 PMpython
class APICatalogHooks:
@hook_impl
def after_catalog_created(
self,
catalog: DataCatalog,
conf_catalog: Dict[str, Any],
conf_creds: Dict[str, Any],
feed_dict: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
) -> None:
"""
This hook is called after the catalog is created. It creates one entry in the catalog per crypto currency
listed in the config file.
"""
currency = feed_dict["params:currency"]
compare = feed_dict["params:compare"]
catalog.add(
data_set_name=f"inputs_market_chart",
data_set=APIDataSet(
url=f"https://api.coingecko.com/api/v3/coins/{currency}/market_chart?vs_currency={compare}&days=max&interval=daily"
),
replace=True
)
kedro run --params currency:bitcoin,compare:usd
python
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=fetch_data_to_dataframe,
inputs=["params:currency", "params:compare"],
outputs="fetched_market_chart",
name="fetching_data_node",
),
]
)
datajoely
01/17/2022, 7:04 PMChainYo
01/17/2022, 7:05 PMdatajoely
01/17/2022, 7:05 PMChainYo
01/17/2022, 7:05 PMdatajoely
01/17/2022, 7:06 PM