distributed: Compression slows down network comms

State of the art

Before dask either sends data over the network or writes it to disk, it tries compressing a small bit of it (10 kB) with lz4 and, if it compresses to 90% of its original size or better on the sample, it compresses the whole thing (implementation: maybe_compress). For spill/unspill, compression/decompression blocks the event loop. For network comms, compression/decompression runs on a thread pool with a single worker (offload) while the GIL is released (this has been fixed very recently for outbound comms; before it blocked the event loop: #7593).

Performance

Until now we did not have hard data on how beneficial compression is - just an academic expectation that “disk slow, network slow, cpu fast”. In coiled-runtime, I’ve just added tests [ coiled/coiled-runtime#714 ] that demonstrate different performance when data is compressible vs. when it is uncompressible. coiled-runtime benchmarks run on coiled, which in turn means that all workers run on AWS EC2.

Test results are very bad.

I’m seeing that:

  • network-intensive tests that don’t spill (test_array.py::*) are up to 70% slower when the workers spend time compressing and decompressing data compared to when maybe_compress gives up after it fails to compress 10 kB
  • spill-intensive tests that perform no network comms (test_spill.py::test_spilling) get a substantial speedup from compression
  • tests that are both spill-intensive and network-intensive (test_spill.py::test_dot_product_spill) don’t show benefits from compression, probably because the two effects cancel each other out.

Whoops. Disk slow, cpu fast, network faster.

image

Tests were performed downstream of #7593.

In the above chart, each test ran in 3 different configurations, which differ in how the original data is generated:

uncompressible

a = da.random.random(...)

compressible

This data compresses to 42% of its original size, at a speed of 570 MiB/s (measured on lz4 4.0).

def compressible(x):
    y = x.reshape(-1)
    y[::2] = 0
    return y.reshape(x.shape)

a = da.random.random(...).map_blocks(compressible)

dummy

This was to rule out that the slowdown between compressible and uncompressible was because of the extra layer in the graph or by the introduction of cloudpickle in the serialization.

def dummy(x):
    return x.reshape(x.shape)

a = da.random.random(...).map_blocks(dummy)

Possible solutions

First of all, we need to verify that the difference is actually caused by pure waiting time for decompression and not something else (e.g. GIL).

Reinstate blosc + fix #7433

blosc was removed in #5269 over concerns on maintainability. However, it is 5x faster than lz4 and is can work around #7433 (with additional work to just reverting #5269). #7433 is a known major cause of slowness besides raw throughput of the C algorithm. This is my preferred choice.

Increase number of offload threads

At the moment all compression/decompression is pipelined onto a single offload thread. Increasing this number could be beneficial - if user functions alone are insufficient to saturate the CPU. Thread safety of the various C libraries used for compression/decompression would need to be thoroughly verified.

Completely disable compression by default.

I don’t think this is a good idea because (1) it would harm spilling and (2) it would severely harm client<->worker comms in all configurations where the client-worker bandwidth is much more limited than the worker-worker one - such as in Coiled. From customer feedback, we know that many dask users can’t read/write on cloud storage and are forced to upload/download everything from/to their laptop with scatter / gather.

Disable compression in network, keep it in spilling

Small code change needed. Same concerns as above.

Disable compression in worker-worker and scheduler-worker comms, keep it in client-worker comms and spilling

Larger and cumbersome code change needed. Very ugly IMHO.

Leave everything as it is in core dask; disable compression in coiled

This would make sense if we thought non-Coiled users had much lower inter-worker bandwidth than on Coiled, on average. I don’t think this is a good idea, as I don’t have any evidence supporting this statement.

CC @fjetter @mrocklin @gjoseph92 @hendrikmakait

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 16 (10 by maintainers)

Most upvoted comments

cramjam.snappy.decompress_into doesn’t seem to work…

That’s because the standard out of cramjam is a file-like buffer, and when that goes into another de/compression it reads it to the end, then subsequent calls act just like io.Buffer and will read zero bytes. Then zero bytes to de/compress. Here is some slightly modified version of the cramjam stuff, just getting the buffer view and using that via bytes for example.

Also note python-snappy’s .compress is the raw format, and cramjam uses .de/compress_raw to denote that, cramjam.snappy.compress is the snappy framed/streaming format.

import numpy
import lz4.block
import snappy
import cramjam
import blosc
import blosc2

x = numpy.random.random(64 * 2**20 // 8)
x[::2] = 0
b = x.tobytes()

print("=== lz4 ===")
c = lz4.block.compress(b)
print(len(c) / len(b))
assert lz4.block.decompress(c) == b
%timeit lz4.block.compress(b)
%timeit lz4.block.decompress(c)

print("=== cramjam.lz4 ===")
c = bytes(cramjam.lz4.compress_block(b))
print(len(c) / len(b))
assert bytes(cramjam.lz4.decompress_block(c)) == b
d = bytearray(len(b))
cramjam.lz4.decompress_block_into(c, output=d)
assert d == b
%timeit cramjam.lz4.compress_block(b)
%timeit cramjam.lz4.decompress_block(c)
%timeit cramjam.lz4.decompress_block_into(c, output=d)

print("=== snappy ===")
c = snappy.compress(b)
print(len(c) / len(b))
assert snappy.decompress(c) == b
%timeit snappy.compress(b)
%timeit snappy.decompress(c)

print("=== cramjam.snappy raw ===")
c = bytes(cramjam.snappy.compress_raw(b))
print(len(c) / len(b))
assert bytes(cramjam.snappy.decompress_raw(c)) == b
d = bytearray(len(b))
cramjam.snappy.decompress_raw_into(c, output=d)
assert d == b
%timeit cramjam.snappy.compress_raw(b)
%timeit cramjam.snappy.decompress_raw(c)
%timeit cramjam.snappy.decompress_raw_into(c, output=d)

print("=== cramjam.snappy ===")
c = bytes(cramjam.snappy.compress(b))
print(len(c) / len(b))
assert bytes(cramjam.snappy.decompress(c)) == b
d = bytearray(len(b))
cramjam.snappy.decompress_into(c, output=d)
assert d == b
%timeit cramjam.snappy.compress(b)
%timeit cramjam.snappy.decompress(c)
%timeit cramjam.snappy.decompress_into(c, output=d)

print("=== blosc ===")
c = blosc.compress(b, typesize=8)
print(len(c) / len(b))
assert blosc.decompress(c) == b
%timeit blosc.compress(b, typesize=8)
%timeit blosc.decompress(c)
#%timeit blosc.decompress_ptr(c, id(d) + ???)

print("=== blosc2 ===")
c = blosc2.compress(b, typesize=8)
print(len(c) / len(b))
assert blosc2.decompress(c) == b
d = bytearray(len(b))
blosc2.decompress(c, dst=d)
assert d == b
%timeit blosc2.compress(b, typesize=8)
%timeit blosc2.decompress(c)
%timeit blosc2.decompress(c, dst=d)

=== lz4 === 0.567649319767952 89.4 ms ± 2.67 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) 55.4 ms ± 641 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) === cramjam.lz4 === 0.567649319767952 86.3 ms ± 573 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) 36.5 ms ± 243 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) 12 ms ± 22 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) === snappy === 0.5744812339544296 73.9 ms ± 277 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) 60.9 ms ± 2.45 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) === cramjam.snappy raw === 0.5744812339544296 87.6 ms ± 1.63 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) 47.2 ms ± 528 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) 23.2 ms ± 255 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) === cramjam.snappy === 0.5746491700410843 81.7 ms ± 791 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) 61 ms ± 178 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) 34 ms ± 138 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) === blosc === 0.8442404121160507 15.7 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 10.6 ms ± 207 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) === blosc2 === 0.8434544950723648 39.8 ms ± 1.55 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) 11.9 ms ± 229 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 6.67 ms ± 257 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Also, I don’t think it’d be absurd to add C-Blosc2 bindings to cramjam either.