I plucked it out of my custom pyspark _save mothod. I wanted to test if my save args were being respected.
Hi @SandyShocks™ so does it still not work?
So this is the type of catalog in the scenario where it failed
data.write.save(save_path, self._file_format, **self._save_args)
what type of object is
this is how we were doing the save in my _save method
spark dataset
data.write.save( path=save_path, format=self._file_format, mode=self._save_args.get("mode"), **options)
this is how I'm changing it now
partition_col = self._save_args.get("partitionBy") options = self._save_args.get("option")
data.write \ .option("replaceWhere", f"{partition_col} >= '{start_date}'") \ .save( path=save_path, format=self._file_format, mode=self._save_args.get("mode"), partitionBy=partition_col, **options)
what error do you get?
Is it still the column merge error as above
Tough to say
I ran the save explicitly on my jupyter notebook as I showed in the pic
So now the target schema is overwritten, so my new implementation doesn't complain as well
from our side it looks like it should work
Although this exact issue happened once before, and running the command explicitly fixed target and then it ran okay
in general we try not to change too much about the load/save methods and delegate to the underlying implementation
> command explicitly fixed target what do you mean by this?
Cell #9
so this isn't a Kedro issue - it's a Spark issue with conflicting types of both tables
can you do a cast before?
One sec, I have a question regarding that
my work laptop and work VDI don't allow Discord, that makes chatting here very complicated if I wanna pull some examples
That's okay - but I do want to make clear that the AnalysisException is purely a Spark one so has to do with the data frame content rather than how Kedro is writing it
def save(self, path=None, format=None, mode=None, partitionBy=None, **options): """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. If ``format`` is not specified, the default data source configured by ``spark.sql.sources.default`` will be used. .. versionadded:: 1.4.0 Parameters ---------- path : str, optional the path in a Hadoop supported file system format : str, optional the format used to save mode : str, optional specifies the behavior of the save operation when data already exists. * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ exists. partitionBy : list, optional names of partitioning columns **options : dict all other string options Examples -------- > df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data')) > """ > self.mode(mode).options(**options) > if partitionBy is not None: > self.partitionBy(partitionBy) > if format is not None: > self.format(format) > if path is None: > self._jwrite.save() > else: > self._jwrite.save(path)
so in theory
withColumn('column_name', F.cast('string'))
on both sides will allow the merge
that's pyspark.sql readwriter.py
that method is what is being called behind save. it takes mode and applies .mode(mode), it takes partitionBy and applies .partitionBy(partitionBy) and same with options
save_args: mode: "overwrite" option: overwriteSchema: true
right, but I think this is a within node business logic issue
So doing a data.write.save(save_path, self._file_format, **self._save_args) would put mode and partitionBy from catalog into options
I'm not sure spark can do a merge with columns of diff types even with
overwriteSchema: true
I think you need to do an explicit cast in the node
I wish I could do a cast easily
But that node copies data 1-1 and overwrites target tables. They are small tables so that's how we defined the logic.
Maybe source schema could be applied to target
yes I think that's the right call
sorry, read target schema and apply to the data. But that might be complicated with how all catalogs are designed to be abstracted away from node
as a rule I'm hesitant to put too much transformation logic in the catalog, I feel that should live in node
great point you brought up there. Question: would I be able to access input and output catalog info line save args, etc from inside node? Also is it possible to update save_args from inside the node? I've only seen catalog.add() functionality
No we specifically don't provide that to users
declarative pointers to data live in the catalog. Business logic such as transformations live in the node in python