ucx-py: Dask-cudf multi partition merge slows down with `ucx`

Dask-cudf multi partition merge slows down with ucx .

Dask-cudf merge seems to slow down with ucx .

Wall time: (15.4 seconds on tcp) vs (37.8 s on ucx) (exp-01)

In the attached example we see a slow down with ucx vs just using tcp .

Wall Times on exp-01

UCX Time

CPU times: user 19.3 s, sys: 1.97 s, total: 21.2 s
Wall time: 38.4 s
2945293


CPU times: user 16.7 s, sys: 1.71 s, total: 18.4 s
Wall time: 37.8 s
2943379

TCP times

CPU times: user 10.8 s, sys: 815 ms, total: 11.6 s
Wall time: 15.7 s
2944022

CPU times: user 10.9 s, sys: 807 ms, total: 11.7 s
Wall time: 15.4 s
2943697 

Repro Code:

Helper Function to create distributed dask-cudf frame


import dask_cudf
import cudf
import os
import time
import dask.dataframe as dd
import dask.array as da

from dask_cuda import LocalCUDACluster
from dask.distributed import Client,wait
from dask.utils import parse_bytes

def create_random_data(n_rows=1_000,n_parts = 10, n_keys_index_1=100_000,n_keys_index_2=100,n_keys_index_3=100, col_prefix = 'a'):
    
    chunks = n_rows//n_parts

    df = dd.concat([
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_1'),
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_2'),
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_3'),
        da.random.randint(0, n_keys_index_1, size=n_rows,chunks = chunks ).to_dask_dataframe(columns= col_prefix + '_0'),
        da.random.randint(0, n_keys_index_2, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_1'),
        da.random.randint(0, n_keys_index_3, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_2'),
        
    ], axis=1).persist()
    
    gdf = df.map_partitions(cudf.from_pandas)
    gdf =  gdf.persist()
    _ = wait(gdf)
    return gdf

RMM Setup:

def setup_rmm_pool(client):
    client.run(
        cudf.set_allocator,
        pool=True,
        initial_pool_size= parse_bytes("26GB"),
        allocator="default"
    )
    return None

setup_rmm_pool(client)

Merge Code:

The slow down happens on the merge step.

rows_1, parts_1 = 140_176_770, 245
rows_2, parts_2 = 21_004_393, 171

df_1 = create_random_data(n_rows= rows_1, n_parts = parts_1, col_prefix = 'a')
df_2 = create_random_data(n_rows= rows_2,  n_parts = parts_2, col_prefix = 'b')

merged_df = df_1.merge(df_2, left_on = ['a_0','a_1','a_2'], right_on = ['b_0','b_1','b_2'])
%time len(merged_df)

Additional Context:

There has been discussion about this on our internal slack channel, please see for more context.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 123 (108 by maintainers)

Most upvoted comments

After a lot more profiling I was able to pinpoint the primary issue with workflows such as this one, see image below:

Screenshot 2020-02-02 at 23 23 28

What happens here is UCX is not always transferring data over NVLink, but over TCP (which incurs in DtoH and HtoD copies), even worse it breaks down each transfer into segments of 8KB, forcing a copy+stream synchronization for many times, making that process extremely slow. The stats of memory copy looks as follows:

CUDA Memory Operation Statistics (nanoseconds)

Time(%)      Total Time  Operations         Average         Minimum         Maximum  Name                              
-------  --------------  ----------  --------------  --------------  --------------  --------------------------------------------------------------------------------
   55.6      8407106705     2520239          3335.8            1760         4781363  [CUDA memcpy HtoD]                
   42.8      6466457263     2578038          2508.3            1120          204227  [CUDA memcpy DtoH]                
    1.4       207633644       94986          2185.9            1280           24224  [CUDA memcpy DtoD]                
    0.3        38804550        1279         30339.8            7104          441828  [CUDA memcpy PtoP]                
    0.0         1577193         719          2193.6            1824            3456  [CUDA memset]                     


CUDA Memory Operation Statistics (KiB)

              Total      Operations              Average            Minimum              Maximum  Name                 
-------------------  --------------  -------------------  -----------------  -------------------  --------------------------------------------------------------------------------
            933.324             719                1.298              0.004                1.500  [CUDA memset]        
        4699152.953            1279             3674.084            662.445             4469.922  [CUDA memcpy PtoP]   
       28620716.464         2520239               11.356              0.001             4469.922  [CUDA memcpy HtoD]   
       28080943.471           94986              295.632              0.004             4469.922  [CUDA memcpy DtoD]   
       19852018.817         2578038                7.700              0.001              294.039  [CUDA memcpy DtoH] 

As can be noticed, only 0.3% of all time spent transferring memory is happening over NVLink (as seen by PtoP). There’s also 1.4% of transfer time happening in DtoD (AFAIK, that means transfers are going from one device to another via host, i.e., DtoH on source device + HtoD on target device), which is not clear to me why it happens given this is running on a DGX-2 and all devices should have an NVLink connection due to the NVSwitch.

The TCP segment size is configured via UCX_TCP_TX_SEG_SIZE (default for this is 8KB, as mentioned previously) and UCX_TCP_RX_SEG_SIZE, and I confirmed increasing those reduce the number of copies+synchronization, with an 8MB size reducing merge compute time from ~22 seconds to ~16 seconds (still marginally slower than regular Python sockets). When the segment sizes are increased I noticed that no copies occur in PtoP (meaning 100% of transfers are going over TCP), as seen below:

CUDA Memory Operation Statistics (nanoseconds)

Time(%)      Total Time  Operations         Average         Minimum         Maximum  Name                              
-------  --------------  ----------  --------------  --------------  --------------  --------------------------------------------------------------------------------
   60.4      3695125675       63249         58421.9            1760         3222347  [CUDA memcpy HtoD]                
   37.4      2291190454       92884         24667.2            1376         2480377  [CUDA memcpy DtoH]                
    2.2       132241437       54952          2406.5            1280           32032  [CUDA memcpy DtoD]                
    0.0          364322         160          2277.0            1792            9728  [CUDA memset]                     


CUDA Memory Operation Statistics (KiB)

              Total      Operations              Average            Minimum              Maximum  Name                 
-------------------  --------------  -------------------  -----------------  -------------------  --------------------------------------------------------------------------------
       25365207.218           63249              401.037              0.001             4469.922  [CUDA memcpy HtoD]   
       18847178.518           54952              342.975              0.004             4469.922  [CUDA memcpy DtoD]   
       16605241.842           92884              178.774              0.001             4469.922  [CUDA memcpy DtoH]   
            162.906             160                1.018              0.004                1.500  [CUDA memset] 

I was not able yet to find how UCX is determining whether a transfer should go over TCP or over NVLink, but I expected that for the example being discussed in this thread 100% of them would go over NVLink, given that we’re not reading from or writing to host explicitly.

I will continue to look for answers here, and I was wondering if @Akshay-Venkatesh perhaps has an idea if we’re doing something wrong or how we could better configure things in case this is a misconfiguration issue.

I think there are some confusions, let me attempt to clarify:

  1. The overall slowdown for this issue refers to UCX being generally slower (independent of transport) than TCP via Python sockets – this remains an issue that is possibly on the Python side but still unconfirmed and thus we have people working on many fronts;
  2. The patch Akshay is referring improves InfiniBand specifically which is important for us, but not the main topic of this issues.

Finally, there has been a general trend assuming a large part of slowdowns comes from memory allocation. While that may be true, I think we need to be extra careful on interpreting our profile reports as all of them are limited in not including all the information we need (i.e., we don’t have any profile that includes C++, CUDA and Python information all at once). For instance, I think one such assumptions is coming from the fact that rmm.DeviceBuffer appears as one of most time-consuming operations, and that is a call coming from distributed.protocol.ucx.read which hints it’s not only the allocation that takes place, but the copy as well, and the copy may include additional overhead, such as (but not limited to) cuIpcOpenMemHandle if that’s the first time a worker is reading another worker’s memory pool data.

While I’m not saying the above is actually happening, we need to be careful to identify exactly what’s going on, since we don’t have the full picture yet, we may be looking at the wrong piece of the puzzle.

Adding the dask reports for ucx/tcp:

The notable issue in the task graph is that for TCP there is more communication but the workers are fully engaged (no white space):

Screen Shot 2020-01-29 at 1 48 16 PM

For UCX, there is less communication workers are not actively working on a particular task (white space):

Screen Shot 2020-01-29 at 1 47 36 PM

@jakirkham you asked for reports from Azure, here they are:

TCP UCX + NVLink UCX + NVLink + InfiniBand (without nv_peer_mem)

Wrote a quick test:

import rmm
import time

rmm.reinitialize(pool_allocator=True, initial_pool_size=int(2**34))

num_allocations = 10000
allocations = [None] * num_allocations

start = time.time()
for i in range(num_allocations):
    allocations[i] = rmm.DeviceBuffer(size=100)
end = time.time()
time_result = (end - start) * 1000

print(f"Time taken for allocating {num_allocations} buffers: {time_result}ms")


start = time.time()
for i in range(num_allocations):
    allocations[i] = None
end = time.time()
time_result = (end - start) * 1000

print(f"Time taken for freeing {num_allocations} buffers: {time_result}ms")

Results:

Time taken for allocating 100000 buffers: 44.32559013366699ms
Time taken for freeing 100000 buffers: 22098.9887714386ms

Time taken for allocating 50000 buffers: 23.27561378479004ms
Time taken for freeing 50000 buffers: 5765.538692474365ms

Time taken for allocating 25000 buffers: 11.168956756591797ms
Time taken for freeing 25000 buffers: 1489.1653060913086ms

Time taken for allocating 10000 buffers: 5.175113677978516ms
Time taken for freeing 10000 buffers: 360.734224319458ms

I checked how things are looking currently, and below are the results I got with RAPIDS 0.20 and UCX 1.9 on a DGX-2:

TCP:

%time len(merged_df)
CPU times: user 8.22 s, sys: 576 ms, total: 8.8 s
Wall time: 35.2 s

%time len(merged_df)
CPU times: user 8.26 s, sys: 924 ms, total: 9.18 s
Wall time: 35.3 s

UCX + NVLink:

%time len(merged_df)
CPU times: user 8.04 s, sys: 975 ms, total: 9.01 s
Wall time: 21 s

%time len(merged_df)
CPU times: user 7.86 s, sys: 857 ms, total: 8.72 s
Wall time: 20.4 s

It seems like there has been a regression in performance with TCP, and UCX is outperforming TCP now (as we would expect). From a UCX-Py side, I think the original issue here has been resolved over the past year, do you think we can close this @VibhuJawa ? It may be worth investigating the potential regression in this merge workflow, but at this point I don’t think there’s anything to be done on the UCX front.

Thanks for following up on this Peter. Agree that we should close this on UCX-PY as there is no action item left on that front.

Might be worth exploring the TCP slow down though as the 2x slow don seems concerning (CC: @randerzander / @beckernick ) .

It looks like UCX + NVLink + InfiniBand is a bit faster, but is experiencing the memory allocation problem much worse than UCX + NVLink. It seems memory allocation is taking roughly twice as much time in the former than the latter. Though I would expect both perform the same number of allocations. Why should one take so much longer? 🤔

At this point I think we should start getting some NSight profiles and looking at them to see where we’re spending the time. If we need to add additional nvtx ranges into cuDF / RMM we can.

Not time. Maximum number of active allocations.

At least for the MRE given above we are looking at 298948 allocations and deallocations.

Do you happen to have any info about what the largest number of allocations we had alive at any given time is? I think that’s important as well because if we’re just doing 1000 allocations, 1000 deallocations repeatedly we wouldn’t have performance problems.

RMM logging will give you the sizes, device IDs, pointers, timestamps, and the source (file and line) of the call (probably all would be DeviceBuffer since it only goes one level up the call stack), but your own logging can give you more context if it helps, I guess.

I have a simple benchmark that allocates N random blocks from 1 to k bytes, freeing with a certain probability at each iteration, or when the maximum memory size is reached. I can use this to profile and optimize, I think. With 100,000 allocations of at most 2MB and a max allocated size of 87% of 16GB, it takes 30s with cnmem. If you think this is a sufficiently close comparison, I’ll just use it. Or you can give me different parameters.

More likely (to parrot @kkraus14 wink) this is showing us that RMM’s pool allocator (CNMeM) is experiencing degrading performance due to lots of allocations/deallocations. Here’s the numbers to back that up.

Ah, I now see the reason to bulk allocate in the Dask UCX comm

Yes, we’re always using RMM pool. Without RMM pool there’s basically two scenarios:

  1. End up OOMing (implies UCX_CUDA_IPC_CACHE=y); or
  2. It’s incredibly slow (implies UCX_CUDA_IPC_CACHE=n).

What about the time to execute the merge operation? I would be interested in seeing that before anything else.

I’m not sure yet @jakirkham . I’m doing some profiling still to verify that everything works correctly, plus I’ll need to check whether this affects other transports, so it may take a while until we’re confident this is the right solution. There’s still some lower performance when compared to TCP and I’m currently investigating that.

@quasiben and I found that manually setting UCX_RNDV_THRESH to a really low value (e.g., 1) works around the issue. IOW, all worker-worker transfers seem to be going over PtoP in that situation. It also solves those cases we mentioned above where depending on configuration size of 10000 would not go over PtoP.

The transports enabled have an effect on the rendezvous threshold when it’s set to auto (default). For example, on a DGX-1, having UCX_TLS=tcp,sockcm,cuda_copy,cuda_ipc will show ucp_ep.c:1222 UCX TRACE rndv threshold is 474127 (send_nbr: 262144), but if we add mm in UCX_TLS as well, then we’ll see ucp_ep.c:1222 UCX TRACE rndv threshold is 8256 (send_nbr: 262144).

I checked that setting the threshold indeed decreases the runtime to 18 seconds (from about 22 before), but it’s still slower than Python sockets for this particular example. I wasn’t yet able to give a better look at trace and profiling it, but will do that tomorrow.

Yes that sounds plausible. Thanks for that insight Akshay!

So we should change the current context back to the one used for allocating before sending, is that right? Or is there a way for us to inform UCX of the context it should use?

for references, the data we send typically looks like the following (mix of host and gpu memory):

In [17]: frames
Out[17]:
[b'',
 b'\x80',
 b'\x83\xa7headers\x81\x91\xa4data\x8a\xa4type\xc4$\x80\x03ccudf.core.dataframe\nDataFrame\nq\x00.\xa5index\x85\xacindex_column\x83\xa4type\xc41\x80\x03ccudf.core.column.numerical\nNumericalColumn\nq\x00.\xa5dtype\xa3<i8\xabframe_count\x01\xa4name\xc4\x04\x80\x03N.\xa5dtype\xc4C\x80\x03cnumpy\ndtype\nq\x00X\x02\x00\x00\x00i8q\x01K\x00K\x01\x87q\x02Rq\x03(K\x03X\x01\x00\x00\x00<q\x04NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x05b.\xa4type\xc4#\x80\x03ccudf.core.index\nGenericIndex\nq\x00.\xabframe_count\x01\xb1index_frame_count\x01\xaccolumn_names\xc4\x16\x80\x03X\x01\x00\x00\x00aq\x00X\x01\x00\x00\x00bq\x01\x86q\x02.\xa7columns\x92\x83\xa4type\xc41\x80\x03ccudf.core.column.numerical\nNumericalColumn\nq\x00.\xa5dtype\xa3<f8\xabframe_count\x01\x83\xa4type\xc41\x80\x03ccudf.core.column.numerical\nNumericalColumn\nq\x00.\xa5dtype\xa3<i8\xabframe_count\x01\xaftype-serialized\xc40\x80\x04\x95%\x00\x00\x00\x00\x00\x00\x00\x8c\x13cudf.core.dataframe\x94\x8c\tDataFrame\x94\x93\x94.\xaaserializer\xa4cuda\xabcompression\x93\xc0\xc0\xc0\xa7lengths\x93\xce\x00z\x12\x00\xce\x00z\x12\x00\xce\x00z\x12\x00\xa5count\x03\xa4keys\x91\x91\xa4data\xabbytestrings\x90',
 <numba.cuda.cudadrv.devicearray.DeviceNDArray at 0x7f24fd997c50>,
 <numba.cuda.cudadrv.devicearray.DeviceNDArray at 0x7f24fd99b110>,
 <numba.cuda.cudadrv.devicearray.DeviceNDArray at 0x7f24fd99b290>]

@quasiben also mentioned to me that we had ~81k calls to rmm.DeviceBuffer in that flamechart. I believe the pool allocator of rmm (cnmem) is known to have performance issues with freeing memory once there’s a certain number of allocations being managed. As a quick test it may be worth testing what happens when you create 10/100k allocations, store them somewhere, and then see how long it takes to delete the references.

As for rmm.DeviceBuffer, I was able yesterday to confirm the same numbers from Ben’s flame graph. Apart from that I can confirm with 110% certainty that we do have the RMM pool enabled, there are two ways I can confirm:

  1. If you watch the memory usage at runtime you can see the initial pool size gets allocated for each GPU at startup and never grows beyond that;
  2. When you disable the pool, the flame graph reports the following:
rmm.DeviceBuffer
- TCP: 332s
- UCX: 287s

I setup two dask-cuda-workers manually and ran with nvprof to bette understand what was happening in cuda land:

==11522== Profiling application: /datasets/bzaitlen/miniconda3/envs/rapidsai-latest/bin/python /datasets/bzaitlen/miniconda3/envs/rapidsai-latest/bin/dask-cuda-worker ucx://10.33.227.163:8786 --enable-nvlink --enable-tcp-over-ucx
==11522== Profiling result:
No kernels were profiled.
            Type  Time(%)      Time     Calls       Avg       Min       Max  Name
      API calls:   98.50%  324.77ms         1  324.77ms  324.77ms  324.77ms  cuDevicePrimaryCtxRetain
                    0.90%  2.9561ms        97  30.474us     138ns  1.2527ms  cuDeviceGetAttribute
                    0.47%  1.5458ms         1  1.5458ms  1.5458ms  1.5458ms  cuDeviceTotalMem
                    0.08%  274.87us         2  137.43us  128.12us  146.75us  cuDeviceGetName
                    0.05%  162.31us         1  162.31us  162.31us  162.31us  cuMemGetInfo
                    0.00%  4.4480us         6     741ns     182ns  1.7090us  cuDeviceGetCount
                    0.00%  3.9780us         1  3.9780us  3.9780us  3.9780us  cuDeviceGetPCIBusId
                    0.00%  2.3980us         1  2.3980us  2.3980us  2.3980us  cuCtxPushCurrent
                    0.00%  1.7690us         1  1.7690us  1.7690us  1.7690us  cuInit
                    0.00%  1.7600us         4     440ns     150ns     799ns  cudaGetDeviceCount
                    0.00%  1.6120us         3     537ns     480ns     611ns  cuDeviceGet
                    0.00%  1.0650us         1  1.0650us  1.0650us  1.0650us  cuDriverGetVersion
                    0.00%     951ns         1     951ns     951ns     951ns  cuCtxGetCurrent
                    0.00%     742ns         1     742ns     742ns     742ns  cuDeviceComputeCapability
                    0.00%     251ns         1     251ns     251ns     251ns  cuDeviceGetUuid

We are spending a lot of time with cuDevicePrimaryCtxRetain this is happening within numba and we can see it in the worker profile as well:

Screen Shot 2020-01-30 at 8 00 56 PM

I believe this is known to both @jakirkham and @kkraus14

Another interesting thing to note is that deletion of dependencies takes quite a bit of time in UCX compared with TCP. This can be seen in the Worker Profile (administrative) page:

UCX Screen Shot 2020-01-29 at 3 17 07 PM

TCP Screen Shot 2020-01-29 at 3 17 25 PM

Yes, sorry for being unclear 😄

From the worker profile in the linked performance report, we spend 130 total seconds doing “compute” with UCX on across the 16 GPUs. We only spend 40 total seconds with TCP across the 16 GPUs.

  • 62 seconds doing _concat with UCX, vs 13 seconds with TCP
  • 19 vs 9 for shuffle_group