I plucked it out of my custom pyspark _save mothod...
# advanced-need-help
s
I plucked it out of my custom pyspark _save mothod. I wanted to test if my save args were being respected.
d
Hi @SandyShocks™ so does it still not work?
s
So this is the type of catalog in the scenario where it failed
data.write.save(save_path, self._file_format, **self._save_args)
d
what type of object is
data
s
this is how we were doing the save in my _save method
spark dataset
data.write.save( path=save_path, format=self._file_format, mode=self._save_args.get("mode"), **options)
this is how I'm changing it now
partition_col = self._save_args.get("partitionBy") options = self._save_args.get("option")
data.write \ .option("replaceWhere", f"{partition_col} >= '{start_date}'") \ .save( path=save_path, format=self._file_format, mode=self._save_args.get("mode"), partitionBy=partition_col, **options)
d
what error do you get?
Is it still the column merge error as above
s
Tough to say
I ran the save explicitly on my jupyter notebook as I showed in the pic
So now the target schema is overwritten, so my new implementation doesn't complain as well
d
from our side it looks like it should work
s
Although this exact issue happened once before, and running the command explicitly fixed target and then it ran okay
d
in general we try not to change too much about the load/save methods and delegate to the underlying implementation
s
Correct
d
> command explicitly fixed target what do you mean by this?
s
Cell #9
d
so this isn't a Kedro issue - it's a Spark issue with conflicting types of both tables
can you do a cast before?
s
One sec, I have a question regarding that
my work laptop and work VDI don't allow Discord, that makes chatting here very complicated if I wanna pull some examples
d
That's okay - but I do want to make clear that the AnalysisException is purely a Spark one so has to do with the data frame content rather than how Kedro is writing it
s
def save(self, path=None, format=None, mode=None, partitionBy=None, **options): """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. If ``format`` is not specified, the default data source configured by ``spark.sql.sources.default`` will be used. .. versionadded:: 1.4.0 Parameters ---------- path : str, optional the path in a Hadoop supported file system format : str, optional the format used to save mode : str, optional specifies the behavior of the save operation when data already exists. * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ exists. partitionBy : list, optional names of partitioning columns **options : dict all other string options Examples -------- > df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data')) > """ > self.mode(mode).options(**options) > if partitionBy is not None: > self.partitionBy(partitionBy) > if format is not None: > self.format(format) > if path is None: > self._jwrite.save() > else: > self._jwrite.save(path)
d
so in theory
withColumn('column_name', F.cast('string'))
on both sides will allow the merge
s
that's pyspark.sql readwriter.py
that method is what is being called behind save. it takes mode and applies .mode(mode), it takes partitionBy and applies .partitionBy(partitionBy) and same with options
save_args: mode: "overwrite" option: overwriteSchema: true
d
right, but I think this is a within node business logic issue
s
So doing a data.write.save(save_path, self._file_format, **self._save_args) would put mode and partitionBy from catalog into options
d
I'm not sure spark can do a merge with columns of diff types even with
overwriteSchema: true
I think you need to do an explicit cast in the node
s
I wish I could do a cast easily
But that node copies data 1-1 and overwrites target tables. They are small tables so that's how we defined the logic.
Maybe source schema could be applied to target
d
yes I think that's the right call
s
sorry, read target schema and apply to the data. But that might be complicated with how all catalogs are designed to be abstracted away from node
d
as a rule I'm hesitant to put too much transformation logic in the catalog, I feel that should live in node
s
great point you brought up there. Question: would I be able to access input and output catalog info line save args, etc from inside node? Also is it possible to update save_args from inside the node? I've only seen catalog.add() functionality
d
No we specifically don't provide that to users
declarative pointers to data live in the catalog. Business logic such as transformations live in the node in python
3 Views