https://kedro.org/ logo
Join the conversationJoin Discord
Channels
advanced-need-help
announcements
beginners-need-help
introductions
job-posting
plugins-integrations
random
resources
welcome
Powered by Linen
advanced-need-help
  • m

    Mirko

    04/26/2022, 8:14 PM
    Hi all, I am porting an existing project to kedro which reads a lot of data from tables that are hosted on Databricks. A lot of the functions look like this:
    python
    def do_something(db_name: str, table_name: str):
        spark.sql("SELECT <some complicated expression> FROM {:s}.{:s}".\
            format(db_name, table_name))
        
        # write results to another table.
    I am wondering what the best way to convert this to kedro is. I can load the table as a spark DataFrame using
    kedro.extras.datasets.spark.SparkDataSet
    , but I would like to avoid rewriting all of the SQL queries in the DataFrame API. Does it make sense to do something like this:
    python
    def my_node(my_table: DataFrame) -> DataFrame:
        my_table.createOrReplaceTempView("tmp_table")
    
        # The SQL query is just copied from the function above
        result = spark.sql("SELECT <some complicated expression> FROM tmp_table"
        spark.catalog.dropTempView("tmp_table")
    
        return result
    Creating the temporary view seems like a bit of a hack to me, but I can't think of a better way that allows me to aovoid rewriting the SQL queries in the DataFrame API. I'm also not sure if this has any performance implications.
    d
    • 2
    • 15
  • d

    datajoely

    04/27/2022, 9:57 AM
    SQL api in Kedro
  • r

    Rjify

    04/28/2022, 8:47 PM
    Hello all, is it possible to run three nodes in parallel in a pipeline in kedro?
  • d

    datajoely

    04/28/2022, 8:49 PM
    if you type
    kedro run --help
    you will see how to configure the
    ParallelRunner
  • n

    noklam

    04/28/2022, 8:50 PM
    kedro run -r ParallelRunner
  • r

    Rafał

    04/28/2022, 9:09 PM
    Hello, I have a question regarding PartitionedDataSet. Assume that some node A returns the dictionary which is saved as PartitionedDataset B. I have observed that if I call pipeline several time and I return different dictionaries in node A (assuming some parameters have changes) then the dataset is B is beeing incremented (I know this is not IncrementalDataSet). So the old key files exists. Actually this is ok for me. So here is my question. Is it possible to tell the node A to compute only keys that do not exists in dataset B yet? I am wondering if it is possible to use
    before_node_run
    check the existing keys of B, and what next? How to provide such info to the node?
  • r

    Rafał

    04/28/2022, 9:17 PM
    Also have thought about providing the B into A as input, but
    A node cannot have the same inputs and outputs
  • d

    datajoely

    04/28/2022, 9:21 PM
    The hook approach would likely work, a custom dataset may also
  • r

    Rafał

    04/28/2022, 9:23 PM
    But how to get access to dataset B key values in the node A ? As far as I undersand the node returns the data that kedro uses for
    B.save(data)
    .
  • r

    Rafał

    04/28/2022, 9:33 PM
    Found : https://kedro.readthedocs.io/en/stable/extend_kedro/hooks.html?highlight=before_node_run#modify-node-inputs-using-before-node-run-hook
  • u

    user

    05/02/2022, 10:27 PM
    Setup a base dir for the Data Catalog in Kedro https://stackoverflow.com/questions/72093004/setup-a-base-dir-for-the-data-catalog-in-kedro
  • u

    user

    05/03/2022, 12:45 PM
    How to inject the information about load version into Kedro node? https://stackoverflow.com/questions/72099566/how-to-inject-the-information-about-load-version-into-kedro-node
  • u

    user

    05/04/2022, 7:31 PM
    How to write to HDFS using kedro https://stackoverflow.com/questions/72118483/how-to-write-to-hdfs-using-kedro
  • u

    user

    05/06/2022, 8:10 AM
    How to pull data from a paginated JSON API using kedro (APIDataSet)? https://stackoverflow.com/questions/72138166/how-to-pull-data-from-a-paginated-json-api-using-kedro-apidataset
  • b

    beats-like-a-helix

    05/08/2022, 8:50 PM
    Is it possible to get custom datasets working with s3fs?
  • t

    Tsakagur

    05/09/2022, 3:30 PM
    Hi there, I am running kedro through a Python script with CLI arguments and options. I am currently overwriting some simple parameters from my configuration files and it works correctly. I would like to know if it is possible to do something similar to change some catalog entries. Here is a look at my current Python script.
  • t

    Tsakagur

    05/09/2022, 3:30 PM
    python
    def main(
        pipeline_name: str = typer.Option(default=None),
        tag: str = typer.Option(default=None),
        env: str = typer.Option(default="pipelines-aml"),
        scope: Optional[Scope] = typer.Option(default=None, help= "Must be present if one of the following CLI option is."),
        train_start_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
        train_end_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
        val_start_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
        val_end_date: datetime = typer.Option(default=None, formats=[DATE_FORMAT]),
    ) -> None:
        """
        Pico model training.
        Emulates ``kedro run`` command.
    
        """
        # Kedro session arguments
        session_kwargs = {"env": env}
        if scope and (train_start_date or train_end_date or val_start_date or val_end_date):
            extra_params = {}
            if train_start_date:
                extra_params[f"{scope}_train_start_date"] = train_start_date
            if train_end_date:
                extra_params[f"{scope}_train_end_date"] = train_end_date
            if val_start_date:
                extra_params[f"{scope}_val_start_date"] = val_start_date
            if val_end_date:
                extra_params[f"{scope}_val_end_date"] = val_end_date
            session_kwargs["extra_params"] = extra_params
        print(f'{session_kwargs=}')
    
        # Kedro run arguments
        run_kwargs = {}
        if pipeline_name:
            run_kwargs["pipeline_name"] = pipeline_name
        if tag:
            run_kwargs["tags"] = [tag]
        print(f'{run_kwargs=}')
    
        metadata = bootstrap_project(Path.cwd())
        with KedroSession.create(metadata.package_name, project_path=Path.cwd(), **session_kwargs) as session:
            session.run(**run_kwargs)
  • d

    datajoely

    05/09/2022, 3:42 PM
    Hi this possible via hooks, the route you're going down is much harder
  • t

    Tsakagur

    05/09/2022, 3:45 PM
    Any link to some resource / example that might help?
  • y

    Yetunde

    05/09/2022, 4:03 PM
    Sure! Here you go: https://kedro.readthedocs.io/en/stable/extend_kedro/hooks.html
  • t

    Tsakagur

    05/09/2022, 4:07 PM
    I've read the resource and I don't see how it helps my case
  • t

    Tsakagur

    05/09/2022, 4:09 PM
    What I want is to do something like :
    python my_run_script.py --catalog_entry_1 value1 --catalog_entry_2 value2
    Where catalog_entry_1 is something that is set in my base catalog but with a different value. For example I have this :
    yaml
    model_type_validation_simulations:
      <<: *azml_model
      name: model_type
      dirpath: data/06_models/model_type_validation_simulations/
      model_version: 129
    And I want to overwrite the
    model_version
    or the
    name
  • w

    williamc

    05/09/2022, 5:45 PM
    So I'm trying to save a
    tensorflow.keras.layers.TextVectorization
    object to a
    MemoryDataset
    and I get the following error:
    kedro.io.core.DataSetError: Failed while saving data to data set MemoryDataSet().
    Should only create a single instance of _DefaultDistributionStrategy
    Any ideas what this is about?
  • d

    datajoely

    05/09/2022, 5:47 PM
    Have a look at the assign option of the Memory dataset
  • w

    williamc

    05/09/2022, 5:49 PM
    Oh that's right, it's on the non-dataframe spark objects tutorial stuff. I'll try it out, thanks!
  • w

    Wit

    05/10/2022, 7:58 PM
    I create a derived class from KedroContext (in settings.py) and then set CONTEXT_CLASS to this class. In the derived class I create a method _get_catalog where I dynamicaly create datasets like below:
    catalog.add(data_set_name=train_name,
                        data_set=CSVDataSet(
                                    filepath=f"{base_data_path}/folds/{run_params['train']}.csv",
                                    load_args={"sep":","},
                                    credentials=credentials))
  • w

    Wit

    05/10/2022, 8:00 PM
    How can you modify catalog with hooks? Which hook should I use?
    after_catalog_created
    ?
  • d

    datajoely

    05/10/2022, 8:01 PM
    Yes this is the correct way to get access to live catalog object
  • d

    datajoely

    05/10/2022, 8:01 PM
    In 0.18.1 we've just introduced after session created which gives you access to the config loader even earlier in the lifecycle
  • d

    datajoely

    05/10/2022, 8:01 PM
    But I'm not 100% sure that's preferable in this case
Powered by Linen
Title
d

datajoely

05/10/2022, 8:01 PM
But I'm not 100% sure that's preferable in this case
View count: 4