flyte: [BUG] Propeller Panic On K8s Array Task (datacatalog implicated)
Describe the bug
We ran a map task over a Python task that takes in a FlyteFile as input, and returns an int:
@task(
requests=Resources(cpu="12", mem="25Gi", ephemeral_storage="325Gi"),
limits=Resources(cpu="14", mem="25Gi"),
retries=3,
cache=True,
cache_version="1.0",
)
def rf_germline_mappable(rf_germline_input_file: FlyteFile) -> int:
We have no special overrides on the map task, just:
map_task(
rf_germline_mappable, concurrency=constants.FLYTE_MAP_TASK_DEFAULT_CONCURRENCY
)(rf_germline_input_file=comparison_sets).with_overrides(
requests=Resources(cpu="12", mem="25Gi", ephemeral_storage="325Gi"),
limits=Resources(cpu="14", mem="25Gi"),
)
When this runs (the first time), it fails (no pods are spun up, it fails all in flyte propeller before the map task launches at all.
It fails when we resume as well.
None of the map tasks in the workflow before this seem to have this issue. The task that prepares the List[FlyteFiles] before this one seems to complete fine. We checked manually, and the FlyteFiles in question appear to all correctly exist in the S3 bucket that Flyte is using.
When this happens, we see a bunch of warnings in data catalog pod logs that look like:
Dataset does not exist key: {Project:relative-finder Name:flyte_task-relative_finder.workflows.rf_germline.mapper_rf_germline_mappable_2 Domain:development Version:1.0-85909EZ4-PmK8qF6C UUID:}, err missing entity of type Dataset with identifier project:"relative-finder" name:"flyte_task-relative_finder.workflows.rf_germline.mapper_rf_germline_mappable_2" domain:"development" version:"1.0-85909EZ4-PmK8qF6C"
The stack trace is:
Workflow[relative-finder:development:relative_finder.workflows.relative_finder.relative_finder_wf] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 760 [running]:
runtime/debug.Stack()
/usr/local/go/src/runtime/debug/stack.go:24 +0x65
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:375 +0xfe
panic({0x1f3c580, 0x3952540})
/usr/local/go/src/runtime/panic.go:838 +0x207
github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)
/go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.4/bitarray/bitset.go:33
github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x2796db0, 0xc01870b290}, {0x27a2700?, 0xc00116c4d0?}, 0xc002130240, 0x23cf0a8)
/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.8/go/tasks/plugins/array/core/metadata.go:33 +0x1e1
github.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7fe260ad1090, 0xc00098afc0}, {{0x2789d50, 0xc0018ca0b0}}, {{0x2789d50, 0xc0018ca160}}}, {0x2796db0, 0xc01870b290}, {0x27a2700, 0xc00116c4d0})
/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.8/go/tasks/plugins/array/k8s/executor.go:94 +0x225
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x0?, {0x2796db0, 0xc01870b050}, {0x2799298?, 0xc0007daf00?}, 0x0?)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:382 +0x178
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x27970f8, 0xc0011919b0}, {0x27848f0, 0xc000d37da0}, 0xc00143ccc0, 0xc00143ccf0, 0xc00143cd20, {0x2798818, 0xc001710000}, 0xc0018522c0, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:384 +0x9a
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x27970f8, 0xc0011919b0}, {0x27848f0, 0xc000d37da0}, 0xc00143ccc0, 0xc00143ccf0, 0xc00143cd20, {0x2798818, 0xc001710000}, 0xc0018522c0, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:617 +0x182b
github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x279a148, 0xc000a5cdd0}, {{0xc000b258c0, {{...}, 0x0}, {0xc0009e8440, 0x4, 0x4}}, {0xc000b258e0, {{...}, ...}, ...}, ...}, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:70 +0xd8
github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x279a148, 0xc000a5cdd0}, {{0xc000b258c0, {{...}, 0x0}, {0xc0009e8440, 0x4, 0x4}}, {0xc000b258e0, {{...}, ...}, ...}, ...}, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:220 +0x9d0
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc00131e240, {0x2796db0, 0xc01870aba0}, {0x2798698, 0xc00143e000}, 0xc001082600, {0x27ab1b8?, 0xc001e981a0?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:382 +0x157
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc00131e240, {0x2796db0, 0xc01870aba0}, 0xc001082600, {0x2798698?, 0xc00143e000?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:512 +0x227
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc00131e240, {0x2796db0, 0xc01870aba0}, {0x7fe2606e34d0, 0xc0050f5490}, 0xc001082600, {0x2798698?, 0xc00143e000})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:736 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc00131e240, {0x2796db0, 0xc01870a420}, {0x27a63e8, 0xc01ac29400}, {0x7fe2606e34d0, 0xc0050f5490}, {0x2784a30?, 0xc018156b60?}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:934 +0x705
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22f4f2d?, {0x2796db0, 0xc01870a420}, {0x27a63e8, 0xc01ac29400}, {0x7fe2606e34d0, 0xc0050f5490?}, {0x2784a30?, 0xc018156b60}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc00131e240, {0x2796db0, 0xc01870a420}, {0x27a63e8, 0xc01ac29400}, {0x7fe2606e34d0, 0xc0050f5490}, {0x2784a30?, 0xc018156b60?}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22f4f2d?, {0x2796db0, 0xc01870a420}, {0x27a63e8, 0xc01ac29400}, {0x7fe2606e34d0, 0xc0050f5490?}, {0x2784a30?, 0xc018156b60}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc00131e240, {0x2796db0, 0xc01870a420}, {0x27a63e8, 0xc01ac29400}, {0x7fe2606e34d0, 0xc0050f5490}, {0x2784a30?, 0xc018156b60?}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow.(*subworkflowHandler).handleSubWorkflow(0xc000490c68, {0x2796db0, 0xc01870a420}, {0x27a4130, 0xc001082540}, {0x27a1b40, 0xc0050f5490}, {0x2784a30, 0xc018156b60})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go:74 +0x334
github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow.(*subworkflowHandler).CheckSubWorkflowStatus(0xc00af11c50?, {0x2796db0, 0xc01870a420}, {0x27a4130?, 0xc001082540?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go:226 +0x3f1
github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow.(*workflowNodeHandler).Handle(0xc000490c40, {0x2796db0, 0xc01870a420}, {0x27a4130?, 0xc001082540?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/handler.go:91 +0x1690
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc00131e240, {0x2796db0, 0xc01870a420}, {0x2798758, 0xc000490c40}, 0xc001082540, {0x27ab1b8?, 0xc01bb53d40?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:382 +0x157
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc00131e240, {0x2796db0, 0xc01870a420}, 0xc001082540, {0x2798758?, 0xc000490c40?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:512 +0x227
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc00131e240, {0x2796db0, 0xc01870a420}, {0x277d258, 0xc002cae500}, 0xc001082540, {0x2798758?, 0xc000490c40})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:736 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc00131e240, {0x2796db0, 0xc01870a0c0}, {0x27a63e8, 0xc01ac29360}, {0x277d258, 0xc002cae500}, {0x277d280?, 0xc002cae500?}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:934 +0x705
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22f4f2d?, {0x2796db0, 0xc01870a0c0}, {0x27a63e8, 0xc01ac29360}, {0x277d258, 0xc002cae500?}, {0x277d280?, 0xc002cae500}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc00131e240, {0x2796db0, 0xc01870a0c0}, {0x27a63e8, 0xc01ac29360}, {0x277d258, 0xc002cae500}, {0x277d280?, 0xc002cae500?}, {0x27a3810, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc000491e30, {0x2796db0, 0xc01870a0c0}, 0xc002cae500)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:147 +0x1b3
github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc000491e30, {0x2796db0, 0xc01870a0c0}, 0xc002cae500)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:393 +0x40f
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc00145f0e0, {0x2796db0, 0xc01870a0c0}, 0xc010bf3848, 0x1e51040?)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:130 +0x18e
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc00145f0e0, {0x2796db0, 0xc0186b5230}, 0xc002156f00)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:131 +0x459
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc00145f0e0, {0x2796db0, 0xc0186b5230}, {0xc004980330, 0x19}, {0xc00498034a, 0x8})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:205 +0x86d
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc00189ac60, 0xc010bf3f28, {0x1e51040?, 0xc003db8440})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:88 +0x510
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc00189ac60, {0x2796db0, 0xc0186b5230})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:99 +0xf1
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x2796db0?, {0x2796db0, 0xc0007cda10})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:115 +0xbd
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:150 +0x59
created by github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:147 +0x285
]
Expected behavior
No panci, code recognizes that the cache is empty and begins the task, caching the results.
Additional context to reproduce
We are on EKS, using managed node groups.
K8s version v1.22.10-eks-84b4fe6
Datacatalog cr.flyte.org/flyteorg/datacatalog-release:v1.1.0
Flyte propeller cr.flyte.org/flyteorg/flytepropeller:v1.1.24 (we have this up a bit higher for some more recent bugfixes that aren’t on a proper release yet)
Flyte admin cr.flyte.org/flyteorg/flyteadmin-release:v1.1.0
Screenshots
Unusual data catalog warning:

Are you sure this issue hasn’t been raised already?
- Yes
Have you read the Code of Conduct?
- Yes
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 20 (6 by maintainers)
Also, linking this previous issue for visibility.
I wouldn’t cut yourself too short, this future should work the same as most. In this code we’re basically offloading cache lookups to a backend process which maintains an LRU cache of lookup results. So each time through, we just check if all the cache lookups in the background completed - and if so we start executing the tasks which have not been cached. So without bugs, there should be nowhere for an infinite loop to happen as long as the intermediate caches are large enough to handle in-flight cache lookups.
I think this is probably the only workaround for now. As you mentioned, there will be more overhead with dynamic tasks (which is why map tasks exist) but it should work. This overhead manifests as (1) execution performance where the dynamic requires an additional Pod to dynamically compile the underlying DAG and (2) increased etcd storage for node status’ - map tasks use the bitarrays to store statuses of each individual task which is significantly more concise than using dynamic tasks. etcd has a pretty hard limit on data sizes (1.5M) so dynamic tasks aren’t able to scale as large as map tasks. There isn’t a hard limit on the number of nodes because it’s contingent on things like the node id length, etc - but generally a dynamic task will be unable to scale to 10k fanout.
We’re on GCP btw. We have a map task of 1020 items and also cached.
Small difference, the stack trace wasn’t showing up in the portal, but we found it in the FlyteWorkflow k8s object
Stack trace
code: Workflow abort failed kind: SYSTEM message: "Workflow[acorn:production:flyte.acorn.ingest.ingest_workflow] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 800 [running]:\nruntime/debug.Stack()\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x65\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:375 +0xfe\npanic({0x1f45600, 0x3959500})\n\t/usr/local/go/src/runtime/panic.go:838 +0x207\ngithub.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)\n\t/go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.4/bitarray/bitset.go:33\ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x279ca70, 0xc00f76edb0}, {0x27a8360?, 0xc00e7bfad0?}, 0xc00ce95680, 0x23d7cd8)\n\t/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.5/go/tasks/plugins/array/core/metadata.go:33 +0x1e1\ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7fbc408a1980, 0xc000aa7340}, {{0x278fa10, 0xc000951810}}, {{0x278fa10, 0xc000951b80}}}, {0x279ca70, 0xc00f76edb0}, {0x27a8360, 0xc00e7bfad0})\n\t/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.5/go/tasks/plugins/array/k8s/executor.go:94 +0x225\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x7fbc3fd21400?, {0x279ca70, 0xc00f76eb70}, {0x279ef58?, 0xc0003f6b40?}, 0x280?)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:382 +0x178\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x279cdb8, 0xc0011ed950}, {0x278a5a8, 0xc0013f4980}, 0xc0013c5650, 0xc0013c5680, 0xc0013c56b0, {0x279e4d8, 0xc001c10000}, 0xc000150840, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:384 +0x9a\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x279cdb8, 0xc0011ed950}, {0x278a5a8, 0xc0013f4980}, 0xc0013c5650, 0xc0013c5680, 0xc0013c56b0, {0x279e4d8, 0xc001c10000}, 0xc000150840, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:616 +0x182b\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x279fe08, 0xc0008cdee0}, {{0xc00125f970, {{...}, 0x0}, {0xc00005cc80, 0x4, 0x4}}, {0xc00125f990, {{...}, ...}, ...}, ...}, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:70 +0xd8\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x279fe08, 0xc0008cdee0}, {{0xc00125f970, {{...}, 0x0}, {0xc00005cc80, 0x4, 0x4}}, {0xc00125f990, {{...}, ...}, ...}, ...}, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:220 +0x9d0\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc0011cf8c0, {0x279ca70, 0xc00f76e6c0}, {0x279e358, 0xc00122e280}, 0xc00c518e40, {0x27b0d58?, 0xc00cbc45b0?})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:382 +0x157\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc0011cf8c0, {0x279ca70, 0xc00f76e6c0}, 0xc00c518e40, {0x279e358?, 0xc00122e280?})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:512 +0x227\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc0011cf8c0, {0x279ca70, 0xc00f76e6c0}, {0x2782f10, 0xc00cee2000}, 0xc00c518e40, {0x279e358?, 0xc00122e280})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:736 +0x3c5\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0011cf8c0, {0x279ca70, 0xc00f76e150}, {0x27ac048, 0xc00db88280}, {0x2782f10, 0xc00cee2000}, {0x2782f38?, 0xc00cee2000?}, {0x27a9470, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:934 +0x705\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdce6?, {0x279ca70, 0xc00f76e150}, {0x27ac048, 0xc00db88280}, {0x2782f10, 0xc00cee2000?}, {0x2782f38?, 0xc00cee2000}, {0x27a9470, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0011cf8c0, {0x279ca70, 0xc00f76e150}, {0x27ac048, 0xc00db88280}, {0x2782f10, 0xc00cee2000}, {0x2782f38?, 0xc00cee2000?}, {0x27a9470, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdce6?, {0x279ca70, 0xc00f76e150}, {0x27ac048, 0xc00db88280}, {0x2782f10, 0xc00cee2000?}, {0x2782f38?, 0xc00cee2000}, {0x27a9470, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0011cf8c0, {0x279ca70, 0xc00f76e150}, {0x27ac048, 0xc00db88280}, {0x2782f10, 0xc00cee2000}, {0x2782f38?, 0xc00cee2000?}, {0x27a9470, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdce6?, {0x279ca70, 0xc00f76e150}, {0x27ac048, 0xc00db88280}, {0x2782f10, 0xc00cee2000?}, {0x2782f38?, 0xc00cee2000}, {0x27a9470, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5\ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0011cf8c0, {0x279ca70, 0xc00f76e150}, {0x27ac048, 0xc00db88280}, {0x2782f10, 0xc00cee2000}, {0x2782f38?, 0xc00cee2000?}, {0x27a9470, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935\ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc000410d20, {0x279ca70, 0xc00f76e150}, 0xc00cee2000)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:147 +0x1b3\ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc000410d20, {0x279ca70, 0xc00f76e150}, 0xc00cee2000)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:393 +0x40f\ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc0013fda70, {0x279ca70, 0xc00f76e150}, 0xc003f1f848, 0x1e5a060?)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:130 +0x18e\ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc0013fda70, {0x279ca70, 0xc00f9add10}, 0xc006b4d900)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:131 +0x459\ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc0013fda70, {0x279ca70, 0xc00f9add10}, {0xc00cb35020, 0xa}, {0xc00cb3502b, 0x14})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:205 +0x86d\ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc000b698c0, 0xc003f1ff28, {0x1e5a060?, 0xc00ed85a30})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:88 +0x510\ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc000b698c0, {0x279ca70, 0xc00f9add10})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:99 +0xf1\ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x279ca70?, {0x279ca70, 0xc001596660})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:115 +0xbd\ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:150 +0x59\ncreated by github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:147 +0x285\n]"We just ran into this today too - trying to figure out if it’s the same issue! (cc: @bstadlbauer, @niliayu although both are ooo)
@CalvinLeather I submitted a PR that will fix this issue. Basically, we only initialize the external resources once the cache lookup has completed. I tested locally be changing the cache lookup return value to a random one and verified the phase changes in the map task along with correct initialization of external resources.
I suppose trying a dynamic task could also be a workaround, but I suspect the fact that this only happens with large inputs (seems like it anyway) suggests that the overhead for a dynamic will be high/problematic.
I may try that later today since thats a “not too horrible” thing to test out
@CalvinLeather alright, I’m diving into this all afternoon. Here are my initial thoughts, to mitigate performance issues where a FlytePropeller worker is blocked on cache lookups over 100’s of items (like in large fanout map tasks) there is a mechanism to perform these lookups in the background. This is started here for map tasks. and then we check the status below. What I suspect is happening is that we fallback to the ResponseNotReady below. In this case the IndexesToCache array is not set on the state and it is nil like you suspected. I’m going to run a few tests to confirm this.