https://kedro.org/ logo
#advanced-need-help
Title
# advanced-need-help
s

SandyShocks™

10/18/2021, 11:31 PM
I plucked it out of my custom pyspark _save mothod. I wanted to test if my save args were being respected.
d

datajoely

10/19/2021, 7:31 AM
Hi @SandyShocks™ so does it still not work?
s

SandyShocks™

10/19/2021, 1:23 PM
So this is the type of catalog in the scenario where it failed
data.write.save(save_path, self._file_format, **self._save_args)
d

datajoely

10/19/2021, 1:24 PM
what type of object is
data
s

SandyShocks™

10/19/2021, 1:24 PM
this is how we were doing the save in my _save method
spark dataset
data.write.save( path=save_path, format=self._file_format, mode=self._save_args.get("mode"), **options)
this is how I'm changing it now
partition_col = self._save_args.get("partitionBy") options = self._save_args.get("option")
data.write \ .option("replaceWhere", f"{partition_col} >= '{start_date}'") \ .save( path=save_path, format=self._file_format, mode=self._save_args.get("mode"), partitionBy=partition_col, **options)
d

datajoely

10/19/2021, 1:44 PM
what error do you get?
Is it still the column merge error as above
s

SandyShocks™

10/19/2021, 1:45 PM
Tough to say
I ran the save explicitly on my jupyter notebook as I showed in the pic
So now the target schema is overwritten, so my new implementation doesn't complain as well
d

datajoely

10/19/2021, 1:47 PM
from our side it looks like it should work
s

SandyShocks™

10/19/2021, 1:47 PM
Although this exact issue happened once before, and running the command explicitly fixed target and then it ran okay
d

datajoely

10/19/2021, 1:47 PM
in general we try not to change too much about the load/save methods and delegate to the underlying implementation
s

SandyShocks™

10/19/2021, 1:48 PM
Correct
d

datajoely

10/19/2021, 1:48 PM
> command explicitly fixed target what do you mean by this?
s

SandyShocks™

10/19/2021, 1:48 PM
Cell #9
d

datajoely

10/19/2021, 1:51 PM
so this isn't a Kedro issue - it's a Spark issue with conflicting types of both tables
can you do a cast before?
s

SandyShocks™

10/19/2021, 1:51 PM
One sec, I have a question regarding that
my work laptop and work VDI don't allow Discord, that makes chatting here very complicated if I wanna pull some examples
d

datajoely

10/19/2021, 1:55 PM
That's okay - but I do want to make clear that the AnalysisException is purely a Spark one so has to do with the data frame content rather than how Kedro is writing it
s

SandyShocks™

10/19/2021, 1:55 PM
def save(self, path=None, format=None, mode=None, partitionBy=None, **options): """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. If ``format`` is not specified, the default data source configured by ``spark.sql.sources.default`` will be used. .. versionadded:: 1.4.0 Parameters ---------- path : str, optional the path in a Hadoop supported file system format : str, optional the format used to save mode : str, optional specifies the behavior of the save operation when data already exists. * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ exists. partitionBy : list, optional names of partitioning columns **options : dict all other string options Examples -------- > df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data')) > """ > self.mode(mode).options(**options) > if partitionBy is not None: > self.partitionBy(partitionBy) > if format is not None: > self.format(format) > if path is None: > self._jwrite.save() > else: > self._jwrite.save(path)
d

datajoely

10/19/2021, 1:55 PM
so in theory
withColumn('column_name', F.cast('string'))
on both sides will allow the merge
s

SandyShocks™

10/19/2021, 1:55 PM
that's pyspark.sql readwriter.py
that method is what is being called behind save. it takes mode and applies .mode(mode), it takes partitionBy and applies .partitionBy(partitionBy) and same with options
save_args: mode: "overwrite" option: overwriteSchema: true
d

datajoely

10/19/2021, 1:58 PM
right, but I think this is a within node business logic issue
s

SandyShocks™

10/19/2021, 1:59 PM
So doing a data.write.save(save_path, self._file_format, **self._save_args) would put mode and partitionBy from catalog into options
d

datajoely

10/19/2021, 1:59 PM
I'm not sure spark can do a merge with columns of diff types even with
overwriteSchema: true
I think you need to do an explicit cast in the node
s

SandyShocks™

10/19/2021, 1:59 PM
I wish I could do a cast easily
But that node copies data 1-1 and overwrites target tables. They are small tables so that's how we defined the logic.
Maybe source schema could be applied to target
d

datajoely

10/19/2021, 2:06 PM
yes I think that's the right call
s

SandyShocks™

10/19/2021, 2:20 PM
sorry, read target schema and apply to the data. But that might be complicated with how all catalogs are designed to be abstracted away from node
d

datajoely

10/19/2021, 2:55 PM
as a rule I'm hesitant to put too much transformation logic in the catalog, I feel that should live in node
s

SandyShocks™

10/19/2021, 4:50 PM
great point you brought up there. Question: would I be able to access input and output catalog info line save args, etc from inside node? Also is it possible to update save_args from inside the node? I've only seen catalog.add() functionality
d

datajoely

10/19/2021, 4:50 PM
No we specifically don't provide that to users
declarative pointers to data live in the catalog. Business logic such as transformations live in the node in python
3 Views