flowpipe: Graph evaluation mode 'multiprocessing' with graph inside node causes PicklingError

Describe the bug We are wrapping graphs into nodes. This way we can time the execution of parts of the program better since we create the graphs dynamically according to yaml configuration files. If the outermost graph is evaluated in “linear” or “threading” mode everything works fine. But using “multiprocessing” mode throws an Exception, a PicklingError to be more certain.

To Reproduce Minimal example showing the behaviour:

from flowpipe import Graph, Node

@Node(outputs=["dummy"])
def normal_node(dummy):
    print("normal_node", flush=True)
    return {"dummy": dummy}

@Node(outputs=["dummy"])
def container_node(inner_graph, dummy):
    print(f"--- {inner_graph.name} ---", flush=True)
    inner_graph.evaluate(mode="linear")
    return {"dummy": dummy}

inner_graph = Graph(name="inner graph")
normal_node(name="inner graph node", graph=inner_graph)
container_graph = Graph(name="container graph")
container_node(name="container graph node", inner_graph=inner_graph, graph=container_graph)

print("=== linear ===")
container_graph.evaluate(mode="linear") # works
print("=== threading ===")
container_graph.evaluate(mode="threading") # works
print("=== multiprocessing ===")
container_graph.evaluate(mode="multiprocessing") # raises PicklingError

Output:

=== linear ===
--- inner graph ---
normal_node
=== threading ===
--- inner graph ---
normal_node
=== multiprocessing ===
Traceback (most recent call last):
  File "/workspaces/tests etc/test/flowpipe_container_parallel_minimal.py", line 24, in <module>
    container_graph.evaluate(mode="multiprocessing") # raises PicklingError
  File "/usr/local/lib/python3.10/site-packages/flowpipe/graph.py", line 321, in evaluate
    evaluator.evaluate(graph=self, skip_clean=skip_clean)
  File "/usr/local/lib/python3.10/site-packages/flowpipe/evaluator.py", line 49, in evaluate
    self._evaluate_nodes(nodes)
  File "/usr/local/lib/python3.10/site-packages/flowpipe/evaluator.py", line 182, in _evaluate_nodes
    nodes_data[node.identifier] = node.to_json()
  File "<string>", line 2, in __setitem__
  File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function normal_node at 0x7f8c7c8c7ac0>: it's not the same object as __main__.normal_node

Expected behavior Successfull execution without Exceptions 😃

Suggestions for Solution No clue…

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 17 (7 by maintainers)

Most upvoted comments

Thanks for the example and sorry for the long silence, I will try to find some time this or next week to look into this.

Any updates available on this issue?