The deploy code provided by Kedro in the deploy.py...
# advanced-need-help
k
The deploy code provided by Kedro in the deploy.py file itself groups the nodes and then assigns it to run in parallel mode. def _convert_kedro_pipeline_to_step_functions_state_machine(self) -> None: """Convert Kedro pipeline into an AWS Step Functions State Machine""" definition = sfn.Pass(self, "Start") for i, group in enumerate(self.pipeline.grouped_nodes, 1): group_name = f"Group {i}" sfn_state = sfn.**Parallel**(self, group_name) for node in group: sfn_task = self._convert_kedro_node_to_sfn_task(node) sfn_state.branch(sfn_task) definition = definition.next(sfn_state) sfn.StateMachine( self, self.project_name, definition=definition, timeout=core.Duration.seconds(5 * 60), ) Is there a specific reason why the code is setup to deploy the nodes in Parallel mode in Lambda when it is known that AWS Lambda does not support parallel processsing.
d
So there docs may be a little out of date, but
sfn.Parallel
isn't actually Kedro it's this actually AWS side https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-parallel-state.html
and I think that's just for creating parallel branches on the step function DAG (not my specialty, just the way I read it)
Now the question I have - in the code you're deploying are the nodes doing any multiprocessing? Or more likely are any of the DS libraries using multiple threads / processes to train models
k
No the whole code runs in sequential manner and I confirmed it in kedro info.logs as well . All the nodes run sequentially and I would prefer it that way . There is no multi threading involved anywhere.
d
Hi @User I'm not sure if this would work, but in
settings.py
would you mind setting the session store to this
Copy code
python
from kedro.framework.session.store import BaseSessionStore
SESSION_STORE_CLASS = BaseSessionStore
There is a
multiprocessing.lock()
in the
ShelveSessionStore
used for experiment tracking and this may break on lambda
@User did this fix things?
k
@User I was away for few days. I made these changes and it failed with the same error. The documentation provided by Kedro instructs to include a lambda_handler.py file which is supposed to handle this issue specifically. I have included that file in my image but I think it does not work . This is the code mentioned in the file by Kedro: from unittest.mock import patch def handler(event, context): from kedro.framework.project import configure_project configure_project("spaceflights_steps_function") node_to_run = event["node_name"] # Since _multiprocessing.SemLock is not implemented on lambda yet, # we mock it out so we could import the session. This has no impact on the correctness # of the pipeline, as each Lambda function runs a single Kedro node, hence no need for Lock # during import. For more information, please see this StackOverflow discussion: # https://stackoverflow.com/questions/34005930/multiprocessing-semlock-is-not-implemented-when-running-on-aws-lambda with patch("multiprocessing.Lock"): from kedro.framework.session import KedroSession with KedroSession.create(env="aws") as session: session.run(node_names=[node_to_run])