beam: [Bug]: Python WriteToBigtable get stuck for large jobs due to client dead lock

What happened?

Hello,

we are running Dataflow Python jobs using Beam 2.49.0. We are starting those jobs from a notebook using the functionality described here. Btw, this example crashes on beam 2.50.0 notebook kernel, I reported this problem to our Google support, let me know if this is something of interest and I will report a separate issue here.

Problem description:

We have a very simple pipeline that reads data using ReadFromBigQuery, and does two beam.Map operations to clean and transform the data to google.cloud.bigtable.row.DirectRow and then WriteToBigTable is used to write the data.

We are testing the performance of BigTable HDD vs SDD-based instances, so we wanted to run jobs that insert 10kk and 100kk rows.

Unfortunately, the 10kk job that was writing to the HDD instance got stuck after writing 9,999,567 rows. image

image As you can see in the screenshot, the job scaled to about 500 workers, wrote most of the records in ~20min and then it scaled down to 2 workers, and no progress was made for ~18h. I canceled the job manually at that point.

After rerunning, the job has run to completion in 20 minutes.

image

Today, I’ve started two more jobs, each meant to write 100kk rows to BigTable (one to HDD and the other to SSD-based instance). Both got stuck at near completion. Here are some details about one of those jobs: image image

One thing I noticed in all of those jobs is that “stragglers” are detected. image

However, a reason why they are straggling is undermined:

image

Repro code:

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners import DataflowRunner

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable

from google.cloud.bigtable import row

import datetime

from typing import Dict, Any, Tuple, List


def to_bt_row(beam_row: Tuple[str, Dict[str, Any]]) -> row.DirectRow:
    import datetime
    """
    Creates BigTable row from standard dataflow row with key mapping to a dict.
    The key is used as a BigTable row key and the dict keys are used as BigTable column names.
    The dict values are used as the column values.
    
    To keep it simple:
    - all columns are assigned to a column family called default
    - the cell timestamp is set to current time
    """
    from google.cloud.bigtable import row as row_
    (key, values) = beam_row
    bt_row = row_.DirectRow(row_key=key)
    for k, v in values.items():
        bt_row.set_cell(
            "default",
            k.encode(),
            str(v).encode(),
            datetime.datetime.now()
        )
    return bt_row

def set_device_id_as_key(row: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
    """
    Given dict, convert it to two-element tuple. 
    The first element in the tuple is the original dicts value under "device_id" key.
    The second tuple element is the original dict without the "device_id" key. 
    """
    k = row.pop("device_id")
    return k, row

def insert_data(n: int, source_bq_table: str, instance: str, destination_table:str, jobname="test_job"):
    options = pipeline_options.PipelineOptions(
        flags={},
        job_name=jobname
    )
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    options.view_as(GoogleCloudOptions).region = 'us-east1'
    dataflow_gcs_location = 'gs://redacted-gcs-bucket/dataflow'
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

    p = beam.Pipeline(InteractiveRunner())

    res = (
        p | 'QueryTable' >> beam.io.ReadFromBigQuery(
            query=f"""
            SELECT* FROM `redacted.redacted.{source_bq_table}` 
            limit {n}
            """,
            use_standard_sql=True,
            project="redacted",
            use_json_exports=True,
            gcs_location="gs://redactedbucket/bq_reads"
        )
        | "set device id" >> beam.Map(set_device_id_as_key)
        | "create bt rows" >> beam.Map(to_bt_row)
        | "write out" >> WriteToBigTable(
            project_id="another-project",
            instance_id=instance,
            table_id=destination_table
        )
    )

    DataflowRunner().run_pipeline(p, options=options)

insert_data(100_000_000, "bq_table_with_100kk_rows", "xyz-ssd", "some_table", "test_100kk_ssd")

Let me know if you need any further details, I’d be very glad to help!

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

About this issue

  • Original URL
  • State: open
  • Created 9 months ago
  • Comments: 17 (2 by maintainers)

Most upvoted comments

yes, the issue is actually in the bigtable client library. For now, please use Beam 2.45.

I have a similar issue posted here

In my case, even a small WriteToBigTable job could get stuck (but at a very low chance). Not sure if my logs helps with the diagnosis

Unable to perform SDK-split for work-id: 5193980908353266575 due to error: INTERNAL: Empty split returned. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] { trail_point { source_file_loc { filepath: "dist_proc/dax/workflow/worker/fnapi_operators.cc" line: 2738 } } }']
=== Source Location Trace: ===
dist_proc/dax/internal/status_utils.cc:236
 And could not Checkpoint reader due to error: OUT_OF_RANGE: Cannot checkpoint when range tracker is finished. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] { trail_point { source_file_loc { filepath: "dist_proc/dax/workflow/worker/operator.cc" line: 340 } } }']
=== Source Location Trace: ===
dist_proc/dax/io/dax_reader_driver.cc:253
dist_proc/dax/workflow/worker/operator.cc:340

Also, the issue still occurs in 2.51.0 version

Just saw the ticket. I will ask our engineers to take a closer look. Thanks.