antheas
09/24/2022, 11:42 AMrafael.gildin
09/27/2022, 2:54 PMclass CustomSequentialRunner2(SequentialRunner):
def _run(
self,
pipeline: Pipeline,
catalog: DataCatalog,
hook_manager: PluginManager,
session_id: str = None,
) -> None:
nodes = pipeline.nodes
done_nodes = set()
failed_nodes = set()
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
for exec_index, node in enumerate(nodes):
try:
run_node(node, catalog, hook_manager, self._is_async, session_id)
done_nodes.add(node)
except Exception:
failed_nodes.add(node)
# decrement load counts and release any data sets we've finished with
for data_set in node.inputs:
load_counts[data_set] -= 1
if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
catalog.release(data_set)
for data_set in node.outputs:
if data_set not in failed_nodes:
if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
catalog.release(data_set)
self._logger.info(
"Completed %d out of %d tasks", exec_index + 1, len(nodes)
)
antheas
09/30/2022, 3:18 PMrafael.gildin
10/06/2022, 12:47 PMantheas
10/06/2022, 12:55 PMrafael.gildin
10/20/2022, 9:13 PM