azure-sdk-for-python: Truly stream chunks so the entire blob doesn't need to be kept in memory

Is your feature request related to a problem? Please describe. I have large gz files I need to stream (10-50GB). I don’t want or have the memory to download the blob into memory first. gz is a streaming format so I only need chunks at a time.

Describe the solution you’d like Something like this. Note the AzureBlobStream implementation that only keeps 1 chunk in memory at a time. It would be nice if StorageStreamDownloader just acted like a stream and behaved this way.

linecount = 0
stream = source_blob.download_blob()
istream = AzureBlobStream(stream) # As chunks are downloaded they are discarded from memory
with io.BufferedReader(gzip.GzipFile(mode="rb", fileobj=istream), buffer_size=istream.block_size) as f:
    next(f)
    for line in f:
        linecount +=1
print(f"read: {linecount} lines")

# Treat a StorageStreamDownloader as a forward read only stream
class AzureBlobStream:
    def __init__(self, ssd: StorageStreamDownloader):
        self.ssd = ssd
        self.chunk_iterator = self.ssd.chunks()
        self.current_buffer = []
        self.current_position_in_buffer = 0
        self.blob_length = ssd.size
        self.bytes_read = 0
        self.block_size = 100 * 1024 * 1024

    def __get_next_buffer(self):
        next_buffer = self.chunk_iterator.next()
        self.current_position_in_buffer = 0
        return next_buffer


    def __read_intern(self, size):
        buffer_length = len(self.current_buffer)
        if buffer_length - self.current_position_in_buffer >= size:
            ret_val = self.current_buffer[self.current_position_in_buffer: self.current_position_in_buffer + size]
            self.current_position_in_buffer = self.current_position_in_buffer + size
            return ret_val, 0
        else:
            ret_val = self.current_buffer[self.current_position_in_buffer:]
            bytes_needed = size - (buffer_length - self.current_position_in_buffer)
            return ret_val, bytes_needed

    def read(self, size):
        if self.bytes_read >= self.blob_length:
            return b''
        if size > self.blob_length - self.bytes_read:
            size = self.blob_length - self.bytes_read
        ret_val = bytearray()
        left_to_read = size
        while left_to_read != 0:
            additional_bytes, left_to_read = self.__read_intern(left_to_read)
            ret_val.extend(additional_bytes)
            if left_to_read != 0:
                self.current_buffer = self.__get_next_buffer()
        self.bytes_read = self.bytes_read + len(ret_val)
        return ret_val

Describe alternatives you’ve considered Downloading 10’s of GB into memory.

Additional context None.

Edit: updated code to work…

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 3
  • Comments: 25 (9 by maintainers)

Most upvoted comments

I’m shocked this is still open. Native python stream functionality should be core to this library.

How can I increase the BUFFER_SIZE for the chunks? Is there any docs?

I missed that this was closed. @ollytheninja reiterated my point. This SDK is striving to be as pythonic as possible and that chunks api is about as far from pythonic as possible. StorageStreamDownloader, despite its name, is not a python stream.

Bump I’ve been trying to find a way for the python api to stream, and I haven’t seen a way to do it yet, or maybe I missed it. The chunks() method on the api documentation doesn’t have any description, so I’m thankful someone figured this out.

Hi @ericthomas1, #24275 was recently merged which added a standard read method to the StrageStreamDownloader class. This will allow you to read an arbitrary size chunk of data from the downloader so the data can be streamed in a more Pythonic way. This is currently released in our latest beta version, 12.14.0b1 and 12.14.0b1. The plan is for this to be in our next full release which is tentatively scheduled for early this month.

In the meantime, or as an alternative, the chunks API, which exists today, can be used to stream data back. See this sample for how that can be done. Thanks.

I resolved the issue from the previous comment by adding “max_chunk_get_size” argument in BlobService Client:

  • blob_service_client = BlobServiceClient(account_url= acc_url, credential=storage_account_key, max_chunk_get_size = 1024) And then using download_blob and chunks methods:
  • blob_client.download_blob().chunks()

Could we get some more samples in the docs for iterating over chunks()?

The section is empty at the moment: image

If I get some time I’ll see about making a pull request and a new issue for this, @tasherif-msft what are your views on reopening this? Or should we raise a new issue?