beam: [Bug]: Fix or rewrite a broken Avro Example.

What happened?

The input for the avro bitcoin example defined here is not accessible in GCS: https://github.com/apache/beam/blob/ca0787642a6b3804a742326147281c99ae8d08d2/sdks/python/apache_beam/examples/avro_bitcoin.py#L116

Listing the files in that bucket returns a 404, bucket not found exception

gsutil -m ls -lh "gs://beam-avro-test/bitcoin/txns/*"
# BucketNotFoundException: 404 gs://beam-avro-test bucket does not exist.

We should fix the example or rewrite it to a working one.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 20 (10 by maintainers)

Most upvoted comments

No problem, let us know if you need any help. Have fun!

For the reference, here is the code I finally used to convert one parquet to avro:

import argparse
import logging

import apache_beam as beam

from apache_beam.io import ReadFromParquet, WriteToAvro
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions


SCHEMA = {
    'fields': [
        {'name': 'hvfhs_license_num', 'type': ['null', 'string']},
        {'name': 'dispatching_base_num', 'type': ['null', 'string']},
        {'name': 'originating_base_num', 'type': ['null', 'string']},
        {'name': 'request_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'on_scene_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'pickup_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'dropoff_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'PULocationID', 'type': ['null', 'long']},
        {'name': 'DOLocationID', 'type': ['null', 'long']},
        {'name': 'trip_miles', 'type': ['null', 'double']},
        {'name': 'trip_time', 'type': ['null', 'long']},
        {'name': 'base_passenger_fare', 'type': ['null', 'double']},
        {'name': 'tolls', 'type': ['null', 'double']},
        {'name': 'bcf', 'type': ['null', 'double']},
        {'name': 'sales_tax', 'type': ['null', 'double']},
        {'name': 'congestion_surcharge', 'type': ['null', 'double']},
        {'name': 'airport_fee', 'type': ['null', 'double']},
        {'name': 'tips', 'type': ['null', 'double']},
        {'name': 'driver_pay', 'type': ['null', 'double']},
        {'name': 'shared_request_flag', 'type': ['null', 'string']},
        {'name': 'shared_match_flag', 'type': ['null', 'string']},
        {'name': 'access_a_ride_flag', 'type': ['null', 'string']},
        {'name': 'wav_request_flag', 'type': ['null', 'string']},
        {'name': 'wav_match_flag', 'type': ['null', 'string']},
    ],
    'name': 'nyc_fhv_trips',
    'type': 'record',
}


class ConvertTimestampsDoFn(beam.DoFn):
    def process(self, record: dict):
        for timestamp_key in ('request_datetime', 'on_scene_datetime', 'pickup_datetime', 'dropoff_datetime'):
            if record[timestamp_key]:
                # convert datetime object to milliseconds
                record[timestamp_key] = int(record[timestamp_key].timestamp() * 1000)

        if record["airport_fee"]:
            record["airport_fee"] = float(record["airport_fee"])
        yield record


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        help='Input file to process.',
        required=True,
    )
    parser.add_argument(
        '--output',
        dest='output',
        help='Output file to write results to.',
        required=True,
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | ReadFromParquet(known_args.input)
            | beam.ParDo(ConvertTimestampsDoFn())
            | WriteToAvro(known_args.output, SCHEMA, file_name_suffix='.avro')
        )


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Done.

Can you give it another try? hopefully, this time the files are correct.

Let me fix this. I will update these files later. Thanks for catching this issue.

Done with uploading. gs://apache-beam-samples/nyc_trip/avro contain the converted avro files. Please let me know if these files work for you. Thanks!

I am uploading both parquet and avro formats under gs://apache-beam-samples/nyc_trip

I changed the schema a little bit:

SCHEMA = {
    'fields': [
        {'name': 'hvfhs_license_num', 'type': ['null', 'string']},
        {'name': 'dispatching_base_num', 'type': ['null', 'string']},
        {'name': 'originating_base_num', 'type': ['null', 'string']},
        {'name': 'request_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'on_scene_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'pickup_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'dropoff_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']},
        {'name': 'PULocationID', 'type': ['null', 'long']},
        {'name': 'DOLocationID', 'type': ['null', 'long']},
        {'name': 'trip_miles', 'type': ['null', 'double']},
        {'name': 'trip_time', 'type': ['null', 'long']},
        {'name': 'base_passenger_fare', 'type': ['null', 'double']},
        {'name': 'tolls', 'type': ['null', 'double']},
        {'name': 'bcf', 'type': ['null', 'double']},
        {'name': 'sales_tax', 'type': ['null', 'double']},
        {'name': 'congestion_surcharge', 'type': ['null', 'double']},
        {'name': 'airport_fee', 'type': ['null', 'long', 'double']},
        {'name': 'tips', 'type': ['null', 'double']},
        {'name': 'driver_pay', 'type': ['null', 'double']},
        {'name': 'shared_request_flag', 'type': ['null', 'string']},
        {'name': 'shared_match_flag', 'type': ['null', 'string']},
        {'name': 'access_a_ride_flag', 'type': ['null', 'string']},
        {'name': 'wav_request_flag', 'type': ['null', 'string']},
        {'name': 'wav_match_flag', 'type': ['null', 'string']},
    ],
    'name': 'nyc_fhv_trips',
    'type': 'record',
}

+1 to @liferoad’s comment.

You can pick up any dataset you like. We can help you put them under gs://apache-beam-samples.

Thanks for trying out the example. Looks like it is broken and doesn’t have an integration test exercising it. I asked for details on https://github.com/apache/beam/pull/5496. In the mean time I think we can repurpose this bug to fix this example or provide a better one.