https://kedro.org/ logo
Join the conversationJoin Discord
Channels
advanced-need-help
announcements
beginners-need-help
introductions
job-posting
plugins-integrations
random
resources
welcome
Powered by Linen
advanced-need-help
  • a

    antheas

    08/30/2022, 6:35 AM
    Here's a code snippet with versioned dataset support and layer support for kedro viz
  • a

    azrael

    09/01/2022, 3:11 PM
    Hey guys, How do I access the path for the catalog defined dataset from within the kedro project?
  • w

    williamc

    09/01/2022, 9:25 PM
    Hey guys, let's assume I'm working with this object that I need to configure across pipeline steps, a bit like what you do with the builder pattern in OOP. It is actually a powerpoint presentation and I'm adding slides to it in different pipeline nodes. As a consequence I'm both receiving the presentation object as node parameter and returning it from each node. Is there a way around the
    A node cannot have the same inputs and outputs
    error, or would you suggest a different approach? Thanks!
  • d

    datajoely

    09/02/2022, 7:30 AM
    Cool usecase! Use different output names, MemoryDataSets are cheap and ephemeral.
  • d

    datajoely

    09/02/2022, 7:31 AM
    Hooks are a half answer, but in truth it's a hard rule that kedro nodes should be pure python functions with no knowledge of io
  • p

    PetitLepton

    09/04/2022, 9:09 AM
    You can have two datasets, named differently, but pointing to the same file on disk for example. This may be a bit tedious, but you can ease the pain by using a Jinja template to generate the list of datasets in the catalog. As an alternative to writing in the same file, you could maybe write each slide independently on disk, like a partitioned dataset, and have a final node collecting all the slides and merging them into the final presentation.
  • w

    williamc

    09/04/2022, 9:52 PM
    Yeah I was concerned that using multiple MemoryDataSets was going to be wasteful but that's the solution I ended going with, thanks for clarifying!
  • w

    williamc

    09/04/2022, 9:52 PM
    Thank you, the partitioned dataset suggestion looks interesting
  • v

    venncit

    09/07/2022, 8:41 AM
    Hi guys, we want to use the Xcom functionality of airflow with our kedro projects. For some reason the KedroOperator returns the last log line as return_value rather than the actual return value. Locally we use a MemoryDataSet. Any ideas on how to push the return value of a Kedro node instead of the log with xcom?
  • n

    noklam

    09/07/2022, 10:51 PM
    I don't think the Operator is returning anything at the moment, if you are using MemoryDataset, you may consider combine these nodes into a pipeline or even modular pipeline.
  • n

    noklam

    09/07/2022, 10:52 PM
    So you don't have to pass them around in airflow
  • v

    venncit

    09/08/2022, 7:50 AM
    Yes, normally excuting a pipeline instead of node would work indeed. In our case we need to check if the outcome of a node is True or False and that will trigger the executing of another pipeline. For that we need to capture the output and xcom seems the only way of passing such info into a next task. Unless there is another solid option
  • d

    datajoely

    09/08/2022, 8:03 AM
    Environment variables feel much simpler here
  • t

    ToniMaroni

    09/09/2022, 8:41 AM
    Hi team, what is the recommended way to deal with a multiple data sources / one pipeline scenario? I find a lot of Kedro examples where one pipeline is applied to one single data source, but how do you go about running the same pipeline on multiple data sources and then combining the transformed data sources in a merging node? Do I need create a bash script that would run the pipeline on all the different sources, and then run another pipeline that combine the outputs of those runs, or can this be done within the Kedro framework? Thanks!
    n
    a
    d
    • 4
    • 36
  • n

    noklam

    09/09/2022, 12:43 PM
    Hi team what is the recommended way to
  • u

    user

    09/14/2022, 5:51 AM
    How to do SQL like querying parquet the files in kedro https://stackoverflow.com/questions/73711962/how-to-do-sql-like-querying-parquet-the-files-in-kedro
  • c

    Carlo Gonzalez

    09/14/2022, 2:53 PM
    Hi everyone. Is there a way to measure how much time it takes to run every node in my pipeline? Context: I have an MVP pipeline version that I want to optimise, in order to prioritise bottle necks I want to have a simple way to compare running time of each node. I Try using hooks (StatsD) but it does not work or maybe I did it wrong. Thanks in advance!
  • d

    datajoely

    09/14/2022, 2:54 PM
    Did you use kedro hooks for that?
  • c

    Carlo Gonzalez

    09/14/2022, 3:02 PM
    yes, I tried but i have not success
  • d

    datajoely

    09/14/2022, 3:03 PM
    The examples in the docs show a basic memory usage and Grafana examples
  • d

    datajoely

    09/14/2022, 3:03 PM
    Is that not what youre looking for?
  • c

    Carlo Gonzalez

    09/14/2022, 3:04 PM
    It is exactly what I want but the docs only have the code for the StatsD and does not say how I conect that to Grafana. I search on the web but i didn't find more info about how to doit
  • u

    user

    09/17/2022, 11:02 AM
    DataSetError in Docker Kedro deployment https://stackoverflow.com/questions/73754239/dataseterror-in-docker-kedro-deployment
  • c

    Carlo Gonzalez

    09/23/2022, 3:53 PM
    Hi everyone, I need to know if is posible to pass as parameter a python function from parameters.yml. Context: I have a node that performs a groupby and have multiple agg functions. Lets say that one of those agg functions is pandas.Series.nunique that can't be pass as string. I want to have the agg functions as parameter to use this function on different datasets with different agregations. Thanks in advance!
  • d

    datajoely

    09/23/2022, 3:54 PM
    We've held off doing this in the config loader directly, the best way to achieve this is to use importlib within the node
  • d

    datajoely

    09/23/2022, 3:56 PM
    Here is a slightly old example https://github.com/datajoely/modular-spaceflights/blob/main/src/modular_spaceflights/pipelines/modelling/nodes.py
  • c

    Carlo Gonzalez

    09/23/2022, 3:58 PM
    Thanks!
  • r

    rafael.gildin

    09/23/2022, 6:43 PM
    How can i create a custom runner to this problem, please ? https://github.com/kedro-org/kedro/issues/503
  • a

    antheas

    09/24/2022, 11:25 AM
    If you know the functions in advance, having a simple if elif set of statements may be better. Or a match statement if you're using 3.10 You can also do the import in the statement so you only load the function required. It also allows you to use a custom call signature per function, instead of requiring a common interface Also, if you log your parameters to something like mlflow you might find this approach cleaner
  • a

    antheas

    09/24/2022, 11:42 AM
    If you don't require multithreading, and this is a caveat because I found the parallel runner has some logging and stack trace issues, take a look at the following file: https://github.com/kedro-org/kedro/blob/main/kedro/runner/sequential_runner.py You can extend sequential runner and override the _run function. You need to change the except statement on line 72 so it doesnt reraise. Then by setting a bool to True if the except statement is reached, you can add the outputs of the failed node into a str set, 81-83 loop over them already. You can then check at line 69 if any of the node's inputs are in the failed output list. If they are, skip running the node. That's the gist of it, then just add logging statements as required, handle printing stack traces in a way that's parseable, and handle load counts correctly in each case. Edit: the pipeline.nodes list is topologically sorted, that means that the nodes are sorted by their requirements. Ie if node n + 1 fails it can't cause node n to fail, only node n + #
    r
    • 2
    • 10
Powered by Linen
Title
a

antheas

09/24/2022, 11:42 AM
If you don't require multithreading, and this is a caveat because I found the parallel runner has some logging and stack trace issues, take a look at the following file: https://github.com/kedro-org/kedro/blob/main/kedro/runner/sequential_runner.py You can extend sequential runner and override the _run function. You need to change the except statement on line 72 so it doesnt reraise. Then by setting a bool to True if the except statement is reached, you can add the outputs of the failed node into a str set, 81-83 loop over them already. You can then check at line 69 if any of the node's inputs are in the failed output list. If they are, skip running the node. That's the gist of it, then just add logging statements as required, handle printing stack traces in a way that's parseable, and handle load counts correctly in each case. Edit: the pipeline.nodes list is topologically sorted, that means that the nodes are sorted by their requirements. Ie if node n + 1 fails it can't cause node n to fail, only node n + #
r

rafael.gildin

09/27/2022, 2:54 PM
But if I put 1/0 inside nodes.py the error appears anyway. Can i control that to not raise?
class CustomSequentialRunner2(SequentialRunner):
    def _run(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog,
        hook_manager: PluginManager,
        session_id: str = None,
    ) -> None:
        nodes = pipeline.nodes
        done_nodes = set()
        failed_nodes = set()
        load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
        
        for exec_index, node in enumerate(nodes):
            try:
                run_node(node, catalog, hook_manager, self._is_async, session_id)
                done_nodes.add(node)
            except Exception:
                failed_nodes.add(node)

            # decrement load counts and release any data sets we've finished with
            for data_set in node.inputs:
                load_counts[data_set] -= 1
                if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
                    catalog.release(data_set)
            for data_set in node.outputs:
                if data_set not in failed_nodes:
                    if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
                        catalog.release(data_set)

            self._logger.info(
                "Completed %d out of %d tasks", exec_index + 1, len(nodes)
            )
a

antheas

09/30/2022, 3:18 PM
you're not skipping running invalid nodes as I said. You're supposed to skip nodes whose inputs were produced by nodes that have crashed/were skipped That means having a set of failed inputs that you update from node.outputs every time a node fails OR a node is skipped and that you check before you run each node
*were supposed to be produced
r

rafael.gildin

10/06/2022, 12:47 PM
Ok. But if I have some exception like 1/0 inside nodes.py the pipeline still get an error. So do I need to set try except in nodes.py too?
@antheas
a

antheas

10/06/2022, 12:55 PM
You shouldn't be getting Io errors. It's one thing for the node to crash because of an error and another thing to not have data. You shouldn't be trying to run nodes that don't have valid inputs
r

rafael.gildin

10/20/2022, 9:13 PM
Since I couldn’t develop a custom runner, I have to accept no data as input
Example : 2 pipelines in parallel, if one presents an error, I don’t want to stop the other pipeline.
Is there any doc to help me developing the custom runner?
View count: 1