google-cloud-python: BigQuery Storage: Disappointing performance when parsing Avro blocks

The google-cloud-bigquery-storage performance of parsing the Avro-encoded blocks of rows is disappointing, especially when compared to the Go implementation.

  • Download and parse the protos, but not the contained Avro bytes: 12.3s
  • Download and parse the Avro into Python dictionaries (with fastavro): 42.1s
  • Download and parse the Avro into a pandas DataFrame (needs to convert to dictionaries first): 73.4s

All three of the following benchmarks read data from the bigquery-public-data.usa_names.usa_1910_current table. They were run on an n1-standard-8 instance (though only a single stream is used).

# coding: utf-8
import concurrent.futures
from google.cloud import bigquery_storage_v1beta1
client = bigquery_storage_v1beta1.BigQueryStorageClient()
project_id = 'swast-scratch'
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = 'bigquery-public-data'
table_ref.dataset_id = 'usa_names'
table_ref.table_id = 'usa_1910_current'
session = client.create_read_session(
    table_ref,
    'projects/{}'.format(project_id),
    requested_streams=1,
)
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(
    stream=stream,
)
rowstream = client.read_rows(position)

Where they differ is in what they do with the blocks.

Parse the proto, but not Avro bytes: print(sum([page.num_items for page in rowstream.rows(session).pages]))

swast@pandas-gbq-test:~/benchmark$ time python3 parse_proto_no_avro.py 
5933561

real    0m12.278s
user    0m3.496s
sys     0m2.376s

Parse the Avro into rows with print(len(list(rowstream.rows(session)))):

swast@pandas-gbq-test:~/benchmark$ time python3 parse_avro.py 
5933561

real    0m42.055s
user    0m37.784s
sys     0m3.504s

Parse the Avro bytes into a pandas DataFrame.

df = rowstream.rows(session).to_dataframe()
print(len(df.index))
swast@pandas-gbq-test:~/benchmark$ time python3 parse_avro_to_dataframe.py 
5933561

real    1m13.449s
user    1m8.180s
sys     0m2.396s

CC @jadekler, since I’d like to track these metrics over time with the benchmarks project you’re working on.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 2
  • Comments: 21 (11 by maintainers)

Most upvoted comments

It will take some refactoring (just-in-time codegen!) and testing before it’s production-ready, but I want to give an update. I’ve built an implementation at https://github.com/tswast/google-cloud-python/blob/issue7805-block_reader/bigquery_storage/google/cloud/bigquery_storage_v1beta1/_avro_to_arrow.py which can parse a specific 10,000,000 rows & 162.12 MB table containing INT64, FLOAT64, and BOOL columns (the three easiest scalar types to handle).

With this custom parser, powered by Numba and PyArrow, I can download and parse the whole table in 16 seconds

$ time python benchmark/parse_avro_to_arrow.py
10000000
python benchmark/parse_avro_to_arrow.py  7.65s user 1.68s system 59% cpu 15.560 total

versus 1 minute, 10 seconds with fastavro

$ time python benchmark/parse_df.py
10000000
python benchmark/parse_df.py  57.36s user 2.35s system 85% cpu 1:10.20 total

I want to handle strings next, but string support will require solving https://issues.apache.org/jira/browse/ARROW-5531 in pyarrow. Instead, I plan to get a preliminary implementation ready that supports tables with only INT64, FLOAT64, and BOOL columns. From there, we can improve the parser to add support for more types and maybe add a hidden flag to use the fast path when available.

Also, while not trivial 5-15% speedup is nothing compared to the experiments with avoiding creating Python objects and going straight to dataframe from Avro (maybe with Arrow as intermediary?), which can double the speed, so my efforts have been focussed there first.

I know the arrow project is working on compatibility with Avro for 2019, not sure how far along they’ve gotten. I’ve achieved very large order of magnitude speedsup in reading data over ODBC into python using turbodbc’s fetchallarrow() method.

I actually had some thoughts about how to make a parallel Avro parser. We’d have to do it in two passes, since there are a lot of integers that need to be parsed to identify where the record boundaries are. From my notes:

Avro could be parsed in parallel if we do a first pass to identify the offset of each record. We can even keep track of the record order so that when written to Arrow, it doesn’t matter the actual execution order.

Also, having a first pass solves the array sizing problem for strings. I need to parse how long each string is in order to find record boundaries, so can track the total bytes in the first pass. Could precalculate Arrow offsets for strings in the first pass, too.

I have tested with processing blocks/pages in parallel within a stream, too, and got a 5-15% speedup. I haven’t contributed that change yet, though because I do want to retain the block/page order, especially in the single stream case, so I’d need to do something to make sure the blocks are actually concatenated in the right order once we get dataframes from them.

Is numba returning a 0d numpy array with a string inside?

It’s a byte array with utf-8 bytes. I haven’t figured out how to efficiently convert that to a string, which is why I was considering a Row wrapper class to convert to string only when necessary.

@numba.jit(nopython=True)
def _read_bytes(position, block):
    position, strlen = _read_long(position, block)
    value = numpy.empty(strlen, dtype=numpy.uint8)
    for i in range(strlen):
        value[i] = block[position + i]  # Copy the bytes, ugh.
    return (position + strlen, value)

Is the Python object constructor in the numba routine?

Outside of it. I put it in the next() function. Potentially I could do it in a Python jit function, but since I still have to iterate over the row block in Python code, I didn’t measure any benefits from doing that.

def next(self):
    """Get the next row in the page."""
    self._parse_block()  # Exits immediately if the block is already parsed.
    if self._remaining > 0:
        self._remaining -= 1
    return Row(self._schema, six.next(self._iter_rows))

To what extent could numba receive an avro blob and return an array?

Actually, lists and list comprehensions are in the list of supported nopython features, so that’s what I’m returning: a list of tuples. Based on the slowdown I’m seeing, I suspect they aren’t true lists/tuples and convert themselves into one if you try to use them in an unsupported way, like pass into an object constructor, but I haven’t traced through to verify that.