datajoely
02/01/2022, 4:36 PMamos
02/01/2022, 11:07 PMamos
02/01/2022, 11:14 PMdrdebian
02/02/2022, 1:05 AMdrdebian
02/02/2022, 1:06 AMdrdebian
02/02/2022, 1:08 AMdrdebian
02/02/2022, 1:09 AMantony.milne
02/02/2022, 8:00 AMpandas.CSVDataSet
over sftp was just added to our docs a couple of days ago - see Example 16 here: https://kedro.readthedocs.io/en/latest/05_data/01_data_catalog.html#using-the-data-catalog-with-the-yaml-api
It should be straightforward to combine this with PartitionedDataSet
by following the example here https://kedro.readthedocs.io/en/latest/kedro.io.PartitionedDataSet.html#kedro.io.PartitionedDataSet. You'd end up with something like this:
my_dataset:
credentials: cluster_credentials
path: sftp:///path/to/remote_cluster/
filename_suffix: .csv
type: PartitionedDataSet
dataset:
type: pandas.CSVDataSet
antony.milne
02/02/2022, 8:04 AMfilename_suffix
to a more general filename_pattern
but I don't think it ever got implemented. If there's interest in adding this we can certainly think again about it.
For now, your options would be:
* change your filenames to match a suffix pattern like *_this_is_it.csv (note in can be more than just the .csv file extension)
* subclass PartitionedDataSet
to make your own custom dataset type that implements filename_pattern
instead of filename_suffix
. This would be straightforward but a bit annoying given partition.endswith(self._filename_suffix)
seems to appear in several methodsantony.milne
02/02/2022, 9:54 AMfs_args: open_args_save: encoding
(and similarly for load
).Isaac89
02/02/2022, 9:59 AMantony.milne
02/02/2022, 10:07 AMsrc/project_name/hooks.py
. If you have a lot of hooks then you might like to make a new package src/project_name/hooks/
and break them into multiple files in thereantony.milne
02/02/2022, 10:08 AMantony.milne
02/02/2022, 10:13 AMbefore_pipeline_run
, after_pipeline_run
, on_pipeline_error
) have both run_params
and pipeline
available to them as arguments. Hence you could do a conditional hook like this:
class Hooks:
@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]) -> None:
if run_params["pipeline_name"] == "data_science":
# code that only runs when you call `kedro run --pipeline=data_science`
Alternatively you can use pipeline
which will contain the actual Pipeline
object (i.e. collection of nodes) that kedro is going to execute in that kedro run
antony.milne
02/02/2022, 10:16 AMbefore_node_run
) then you'll need to make the pipeline name accessible to your hook by saving the relevant information to the class somehow, e.g.
class Hooks:
@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]) -> None:
self.pipeline_name = run_params["pipeline_name"]
@hook_impl
def before_node_run(self) -> None:
if self.pipeline_name == "data_science":
# code that only runs when you call `kedro run --pipeline=data_science`
Isaac89
02/02/2022, 11:36 AMamos
02/02/2022, 4:38 PMkhern
02/03/2022, 3:15 AMfrom pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.sql("SELECT * FROM ....")
But I got the error:
ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1641884194120_477874 to YARN : Application rejected by queue placement policy
limdauto
02/03/2022, 9:48 AMmr.robot
02/03/2022, 5:00 PMdatajoely
02/03/2022, 5:01 PMmr.robot
02/03/2022, 5:01 PMmr.robot
02/03/2022, 5:08 PMdatajoely
02/03/2022, 5:11 PMdatajoely
02/03/2022, 5:11 PMmr.robot
02/03/2022, 5:11 PMmr.robot
02/03/2022, 5:14 PMantony.milne
02/03/2022, 5:16 PMpip show setuptools pip
and paste it here?antony.milne
02/03/2022, 5:18 PMmr.robot
02/03/2022, 5:19 PM