ucx-py: Dask-cudf multi partition merge slows down with `ucx`
Dask-cudf multi partition merge slows down with ucx .
Dask-cudf merge seems to slow down with ucx .
Wall time: (15.4 seconds on tcp) vs (37.8 s on ucx) (exp-01)
In the attached example we see a slow down with ucx vs just using tcp .
Wall Times on exp-01
UCX Time
CPU times: user 19.3 s, sys: 1.97 s, total: 21.2 s
Wall time: 38.4 s
2945293
CPU times: user 16.7 s, sys: 1.71 s, total: 18.4 s
Wall time: 37.8 s
2943379
TCP times
CPU times: user 10.8 s, sys: 815 ms, total: 11.6 s
Wall time: 15.7 s
2944022
CPU times: user 10.9 s, sys: 807 ms, total: 11.7 s
Wall time: 15.4 s
2943697
Repro Code:
Helper Function to create distributed dask-cudf frame
import dask_cudf
import cudf
import os
import time
import dask.dataframe as dd
import dask.array as da
from dask_cuda import LocalCUDACluster
from dask.distributed import Client,wait
from dask.utils import parse_bytes
def create_random_data(n_rows=1_000,n_parts = 10, n_keys_index_1=100_000,n_keys_index_2=100,n_keys_index_3=100, col_prefix = 'a'):
chunks = n_rows//n_parts
df = dd.concat([
da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_1'),
da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_2'),
da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_3'),
da.random.randint(0, n_keys_index_1, size=n_rows,chunks = chunks ).to_dask_dataframe(columns= col_prefix + '_0'),
da.random.randint(0, n_keys_index_2, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_1'),
da.random.randint(0, n_keys_index_3, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_2'),
], axis=1).persist()
gdf = df.map_partitions(cudf.from_pandas)
gdf = gdf.persist()
_ = wait(gdf)
return gdf
RMM Setup:
def setup_rmm_pool(client):
client.run(
cudf.set_allocator,
pool=True,
initial_pool_size= parse_bytes("26GB"),
allocator="default"
)
return None
setup_rmm_pool(client)
Merge Code:
The slow down happens on the merge step.
rows_1, parts_1 = 140_176_770, 245
rows_2, parts_2 = 21_004_393, 171
df_1 = create_random_data(n_rows= rows_1, n_parts = parts_1, col_prefix = 'a')
df_2 = create_random_data(n_rows= rows_2, n_parts = parts_2, col_prefix = 'b')
merged_df = df_1.merge(df_2, left_on = ['a_0','a_1','a_2'], right_on = ['b_0','b_1','b_2'])
%time len(merged_df)
Additional Context:
There has been discussion about this on our internal slack channel, please see for more context.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 123 (108 by maintainers)
After a lot more profiling I was able to pinpoint the primary issue with workflows such as this one, see image below:
What happens here is UCX is not always transferring data over NVLink, but over TCP (which incurs in DtoH and HtoD copies), even worse it breaks down each transfer into segments of 8KB, forcing a copy+stream synchronization for many times, making that process extremely slow. The stats of memory copy looks as follows:
As can be noticed, only 0.3% of all time spent transferring memory is happening over NVLink (as seen by
PtoP). There’s also 1.4% of transfer time happening inDtoD(AFAIK, that means transfers are going from one device to another via host, i.e., DtoH on source device + HtoD on target device), which is not clear to me why it happens given this is running on a DGX-2 and all devices should have an NVLink connection due to the NVSwitch.The TCP segment size is configured via
UCX_TCP_TX_SEG_SIZE(default for this is 8KB, as mentioned previously) andUCX_TCP_RX_SEG_SIZE, and I confirmed increasing those reduce the number of copies+synchronization, with an 8MB size reducing merge compute time from ~22 seconds to ~16 seconds (still marginally slower than regular Python sockets). When the segment sizes are increased I noticed that no copies occur in PtoP (meaning 100% of transfers are going over TCP), as seen below:I was not able yet to find how UCX is determining whether a transfer should go over TCP or over NVLink, but I expected that for the example being discussed in this thread 100% of them would go over NVLink, given that we’re not reading from or writing to host explicitly.
I will continue to look for answers here, and I was wondering if @Akshay-Venkatesh perhaps has an idea if we’re doing something wrong or how we could better configure things in case this is a misconfiguration issue.
I think there are some confusions, let me attempt to clarify:
Finally, there has been a general trend assuming a large part of slowdowns comes from memory allocation. While that may be true, I think we need to be extra careful on interpreting our profile reports as all of them are limited in not including all the information we need (i.e., we don’t have any profile that includes C++, CUDA and Python information all at once). For instance, I think one such assumptions is coming from the fact that
rmm.DeviceBufferappears as one of most time-consuming operations, and that is a call coming fromdistributed.protocol.ucx.readwhich hints it’s not only the allocation that takes place, but the copy as well, and the copy may include additional overhead, such as (but not limited to)cuIpcOpenMemHandleif that’s the first time a worker is reading another worker’s memory pool data.While I’m not saying the above is actually happening, we need to be careful to identify exactly what’s going on, since we don’t have the full picture yet, we may be looking at the wrong piece of the puzzle.
Adding the dask reports for ucx/tcp:
dask report with ucx
dask report with tcp
The notable issue in the task graph is that for TCP there is more communication but the workers are fully engaged (no white space):
For UCX, there is less communication workers are not actively working on a particular task (white space):
@jakirkham you asked for reports from Azure, here they are:
TCP UCX + NVLink UCX + NVLink + InfiniBand (without
nv_peer_mem)Wrote a quick test:
Results:
Thanks for following up on this Peter. Agree that we should close this on UCX-PY as there is no action item left on that front.
Might be worth exploring the TCP slow down though as the 2x slow don seems concerning (CC: @randerzander / @beckernick ) .
At this point I think we should start getting some NSight profiles and looking at them to see where we’re spending the time. If we need to add additional nvtx ranges into cuDF / RMM we can.
Not time. Maximum number of active allocations.
Do you happen to have any info about what the largest number of allocations we had alive at any given time is? I think that’s important as well because if we’re just doing 1000 allocations, 1000 deallocations repeatedly we wouldn’t have performance problems.
RMM logging will give you the sizes, device IDs, pointers, timestamps, and the source (file and line) of the call (probably all would be DeviceBuffer since it only goes one level up the call stack), but your own logging can give you more context if it helps, I guess.
I have a simple benchmark that allocates N random blocks from 1 to k bytes, freeing with a certain probability at each iteration, or when the maximum memory size is reached. I can use this to profile and optimize, I think. With 100,000 allocations of at most 2MB and a max allocated size of 87% of 16GB, it takes 30s with cnmem. If you think this is a sufficiently close comparison, I’ll just use it. Or you can give me different parameters.
Ah, I now see the reason to bulk allocate in the Dask UCX comm
Yes, we’re always using RMM pool. Without RMM pool there’s basically two scenarios:
UCX_CUDA_IPC_CACHE=y); orUCX_CUDA_IPC_CACHE=n).What about the time to execute the merge operation? I would be interested in seeing that before anything else.
I’m not sure yet @jakirkham . I’m doing some profiling still to verify that everything works correctly, plus I’ll need to check whether this affects other transports, so it may take a while until we’re confident this is the right solution. There’s still some lower performance when compared to TCP and I’m currently investigating that.
@quasiben and I found that manually setting
UCX_RNDV_THRESHto a really low value (e.g.,1) works around the issue. IOW, all worker-worker transfers seem to be going over PtoP in that situation. It also solves those cases we mentioned above where depending on configuration size of 10000 would not go over PtoP.The transports enabled have an effect on the rendezvous threshold when it’s set to
auto(default). For example, on a DGX-1, havingUCX_TLS=tcp,sockcm,cuda_copy,cuda_ipcwill showucp_ep.c:1222 UCX TRACE rndv threshold is 474127 (send_nbr: 262144), but if we addmminUCX_TLSas well, then we’ll seeucp_ep.c:1222 UCX TRACE rndv threshold is 8256 (send_nbr: 262144).I checked that setting the threshold indeed decreases the runtime to 18 seconds (from about 22 before), but it’s still slower than Python sockets for this particular example. I wasn’t yet able to give a better look at trace and profiling it, but will do that tomorrow.
Yes that sounds plausible. Thanks for that insight Akshay!
So we should change the current context back to the one used for allocating before sending, is that right? Or is there a way for us to inform UCX of the context it should use?
for references, the data we send typically looks like the following (mix of host and gpu memory):
[Ordering inUCX_TLSdoes not matter, correct] ?https://github.com/openucx/ucx/blob/b507c4fc5931057017d5acf71a13a8f78b77c28a/src/ucp/core/ucp_context.c#L75
@quasiben also mentioned to me that we had ~81k calls to
rmm.DeviceBufferin that flamechart. I believe the pool allocator of rmm (cnmem) is known to have performance issues with freeing memory once there’s a certain number of allocations being managed. As a quick test it may be worth testing what happens when you create 10/100k allocations, store them somewhere, and then see how long it takes to delete the references.As for
rmm.DeviceBuffer, I was able yesterday to confirm the same numbers from Ben’s flame graph. Apart from that I can confirm with 110% certainty that we do have the RMM pool enabled, there are two ways I can confirm:I setup two dask-cuda-workers manually and ran with
nvprofto bette understand what was happening in cuda land:We are spending a lot of time with
cuDevicePrimaryCtxRetainthis is happening within numba and we can see it in the worker profile as well:I believe this is known to both @jakirkham and @kkraus14
Another interesting thing to note is that deletion of dependencies takes quite a bit of time in UCX compared with TCP. This can be seen in the
Worker Profile (administrative)page:UCX
TCP
Yes, sorry for being unclear 😄
From the worker profile in the linked performance report, we spend 130 total seconds doing “compute” with UCX on across the 16 GPUs. We only spend 40 total seconds with TCP across the 16 GPUs.
_concatwith UCX, vs 13 seconds with TCPshuffle_group