Mirko
04/26/2022, 8:14 PMpython
def do_something(db_name: str, table_name: str):
spark.sql("SELECT <some complicated expression> FROM {:s}.{:s}".\
format(db_name, table_name))
# write results to another table.
I am wondering what the best way to convert this to kedro is. I can load the table as a spark DataFrame using kedro.extras.datasets.spark.SparkDataSet
, but I would like to avoid rewriting all of the SQL queries in the DataFrame API. Does it make sense to do something like this:
python
def my_node(my_table: DataFrame) -> DataFrame:
my_table.createOrReplaceTempView("tmp_table")
# The SQL query is just copied from the function above
result = spark.sql("SELECT <some complicated expression> FROM tmp_table"
spark.catalog.dropTempView("tmp_table")
return result
Creating the temporary view seems like a bit of a hack to me, but I can't think of a better way that allows me to aovoid rewriting the SQL queries in the DataFrame API. I'm also not sure if this has any performance implications.datajoely
04/27/2022, 9:57 AMRjify
04/28/2022, 8:47 PMdatajoely
04/28/2022, 8:49 PMkedro run --help
you will see how to configure the ParallelRunner
noklam
04/28/2022, 8:50 PMkedro run -r ParallelRunner
Rafał
04/28/2022, 9:09 PMbefore_node_run
check the existing keys of B, and what next? How to provide such info to the node?Rafał
04/28/2022, 9:17 PMA node cannot have the same inputs and outputs
datajoely
04/28/2022, 9:21 PMRafał
04/28/2022, 9:23 PMB.save(data)
.user
05/02/2022, 10:27 PMuser
05/03/2022, 12:45 PMuser
05/04/2022, 7:31 PMuser
05/06/2022, 8:10 AMbeats-like-a-helix
05/08/2022, 8:50 PMTsakagur
05/09/2022, 3:30 PMTsakagur
05/09/2022, 3:30 PMpython
def main(
pipeline_name: str = typer.Option(default=None),
tag: str = typer.Option(default=None),
env: str = typer.Option(default="pipelines-aml"),
scope: Optional[Scope] = typer.Option(default=None, help= "Must be present if one of the following CLI option is."),
train_start_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
train_end_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
val_start_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
val_end_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
) -> None:
"""
Pico model training.
Emulates ``kedro run`` command.
"""
# Kedro session arguments
session_kwargs = {"env": env}
if scope and (train_start_date or train_end_date or val_start_date or val_end_date):
extra_params = {}
if train_start_date:
extra_params[f"{scope}_train_start_date"] = train_start_date
if train_end_date:
extra_params[f"{scope}_train_end_date"] = train_end_date
if val_start_date:
extra_params[f"{scope}_val_start_date"] = val_start_date
if val_end_date:
extra_params[f"{scope}_val_end_date"] = val_end_date
session_kwargs["extra_params"] = extra_params
print(f'{session_kwargs=}')
# Kedro run arguments
run_kwargs = {}
if pipeline_name:
run_kwargs["pipeline_name"] = pipeline_name
if tag:
run_kwargs["tags"] = [tag]
print(f'{run_kwargs=}')
metadata = bootstrap_project(Path.cwd())
with KedroSession.create(metadata.package_name, project_path=Path.cwd(), **session_kwargs) as session:
session.run(**run_kwargs)
datajoely
05/09/2022, 3:42 PMTsakagur
05/09/2022, 3:45 PMYetunde
05/09/2022, 4:03 PMTsakagur
05/09/2022, 4:07 PMTsakagur
05/09/2022, 4:09 PMpython my_run_script.py --catalog_entry_1 value1 --catalog_entry_2 value2
Where catalog_entry_1 is something that is set in my base catalog but with a different value. For example I have this :
yaml
model_type_validation_simulations:
<<: *azml_model
name: model_type
dirpath: data/06_models/model_type_validation_simulations/
model_version: 129
And I want to overwrite the model_version
or the name
williamc
05/09/2022, 5:45 PMtensorflow.keras.layers.TextVectorization
object to a MemoryDataset
and I get the following error:
kedro.io.core.DataSetError: Failed while saving data to data set MemoryDataSet().
Should only create a single instance of _DefaultDistributionStrategy
Any ideas what this is about?datajoely
05/09/2022, 5:47 PMwilliamc
05/09/2022, 5:49 PMWit
05/10/2022, 7:58 PMcatalog.add(data_set_name=train_name,
data_set=CSVDataSet(
filepath=f"{base_data_path}/folds/{run_params['train']}.csv",
load_args={"sep":","},
credentials=credentials))
Wit
05/10/2022, 8:00 PMafter_catalog_created
?datajoely
05/10/2022, 8:01 PMdatajoely
05/10/2022, 8:01 PMdatajoely
05/10/2022, 8:01 PMdatajoely
05/10/2022, 8:01 PM