If you don't require multithreading, and this is a...
# advanced-need-help
a
If you don't require multithreading, and this is a caveat because I found the parallel runner has some logging and stack trace issues, take a look at the following file: https://github.com/kedro-org/kedro/blob/main/kedro/runner/sequential_runner.py You can extend sequential runner and override the _run function. You need to change the except statement on line 72 so it doesnt reraise. Then by setting a bool to True if the except statement is reached, you can add the outputs of the failed node into a str set, 81-83 loop over them already. You can then check at line 69 if any of the node's inputs are in the failed output list. If they are, skip running the node. That's the gist of it, then just add logging statements as required, handle printing stack traces in a way that's parseable, and handle load counts correctly in each case. Edit: the pipeline.nodes list is topologically sorted, that means that the nodes are sorted by their requirements. Ie if node n + 1 fails it can't cause node n to fail, only node n + #
r
But if I put 1/0 inside nodes.py the error appears anyway. Can i control that to not raise?
Copy code
class CustomSequentialRunner2(SequentialRunner):
    def _run(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog,
        hook_manager: PluginManager,
        session_id: str = None,
    ) -> None:
        nodes = pipeline.nodes
        done_nodes = set()
        failed_nodes = set()
        load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
        
        for exec_index, node in enumerate(nodes):
            try:
                run_node(node, catalog, hook_manager, self._is_async, session_id)
                done_nodes.add(node)
            except Exception:
                failed_nodes.add(node)

            # decrement load counts and release any data sets we've finished with
            for data_set in node.inputs:
                load_counts[data_set] -= 1
                if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
                    catalog.release(data_set)
            for data_set in node.outputs:
                if data_set not in failed_nodes:
                    if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
                        catalog.release(data_set)

            self._logger.info(
                "Completed %d out of %d tasks", exec_index + 1, len(nodes)
            )
a
you're not skipping running invalid nodes as I said. You're supposed to skip nodes whose inputs were produced by nodes that have crashed/were skipped That means having a set of failed inputs that you update from node.outputs every time a node fails OR a node is skipped and that you check before you run each node
*were supposed to be produced
r
Ok. But if I have some exception like 1/0 inside nodes.py the pipeline still get an error. So do I need to set try except in nodes.py too?
@antheas
a
You shouldn't be getting Io errors. It's one thing for the node to crash because of an error and another thing to not have data. You shouldn't be trying to run nodes that don't have valid inputs
r
Since I couldn’t develop a custom runner, I have to accept no data as input
Example : 2 pipelines in parallel, if one presents an error, I don’t want to stop the other pipeline.
Is there any doc to help me developing the custom runner?