Deep
03/07/2022, 1:42 PMDeep
03/07/2022, 1:44 PMdatajoely
03/07/2022, 1:45 PMdatajoely
03/07/2022, 1:45 PMdf.coalesce(1)
to the last part of your node before it gets returned, that might generate only one filedatajoely
03/07/2022, 1:46 PMdf.write.options(**kwargs).save(filename)
Deep
03/07/2022, 1:46 PMdatajoely
03/07/2022, 1:47 PMdatajoely
03/07/2022, 1:47 PMdatajoely
03/07/2022, 1:47 PMDeep
03/07/2022, 1:47 PMdatajoely
03/07/2022, 1:47 PMdatajoely
03/07/2022, 1:47 PMdbutils
file movement commandsdatajoely
03/07/2022, 1:47 PMDeep
03/07/2022, 1:48 PMDeep
03/07/2022, 1:48 PMdatajoely
03/07/2022, 1:57 PMSparkDataSet
implementation itself. We try to mirror the underlying API as much as possible.
What I would recommend is the two simple ways to add this to yourself.
I think the easiest thing you can do is subclass the SparkDataSet
and then override the save()
method - you can copy the implementation from us and simply add those two lines from the screenshot below to the operation.
You can see how to create a custom dataset here https://kedro.readthedocs.io/en/stable/07_extend_kedro/03_custom_datasets.html
There is also a route to doing this with a hook (https://kedro.readthedocs.io/en/latest/07_extend_kedro/02_hooks.html) but I think the dataset is easierdatajoely
03/07/2022, 2:14 PMDeep
03/07/2022, 2:16 PMDeep
03/08/2022, 5:47 AMread()
not implemented for BaseSessionStore
. Assuming empty store.
The system cannot find the path specified.datajoely
03/08/2022, 10:16 AMdatajoely
03/08/2022, 10:16 AMDeep
03/08/2022, 10:17 AMDeep
03/08/2022, 10:18 AMdatajoely
03/08/2022, 10:20 AMSchoolmeister
03/08/2022, 1:11 PMSchoolmeister
03/08/2022, 1:26 PMtrain_data
and validation_data
outputs to create the subsequent pipelines with?
python
cv_split_pipe = Pipeline(
[
node(
func=nodes.cv_split,
inputs=["data", "params:fold_config"],
outputs=["train_data", "validation_data"], # train_data and validation_data are lists, one index per fold
)
]
)
# get the train_data and validation_data outputs somehow
train_data = []
validation_data = []
# build modular pipeline
pipelines = []
for i, train_set, validation_set in enumerate(zip(train_data, validation_data)):
pipelines.append(
pipeline(
pipe=new_inference_pipeline(),
inputs=[train_set, validation_set],
outputs={"y_pred": f"y_pred_{i}"}
)
)
final_pipeline = sum(pipelines)
datajoely
03/08/2022, 1:42 PMNone
if not necessarydatajoely
03/08/2022, 1:42 PMwilliamc
03/08/2022, 3:10 PMdatajoely
03/08/2022, 3:55 PMdatajoely
03/08/2022, 3:55 PM