google-ads-python: AttributeError: '_SingleThreadedRendezvous' object has no attribute 'add_done_callback'

Hi,

I’m trying to run a program on DataFlow with the GoogleAds API. I’ve successfully run my code locally but when I deploy to dataflow I receive the following error:

An exception was raised when trying to execute the workitem 3026106782202680634 : Traceback (most recent call last): File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 554, in apache_beam.runners.common.SimpleInvoker.invoke_process File "pubsub_main.py", line 190, in process File "/usr/local/lib/python3.8/site-packages/platform_data_pull/list_accounts_beta.py", line 49, in harvest_data campaignSettings(self.api_client,self.client,self.cid,changed_entities['campaigns']) File "/usr/local/lib/python3.8/site-packages/platform_data_pull/campaign_pull/campaign_settings_beta.py", line 30, in campaignSettings response = ga_service.search_stream(account,query=query) File "/usr/local/lib/python3.8/site-packages/google/ads/google_ads/v4/services/google_ads_service_client.py", line 314, in search_stream return self._inner_api_calls['search_stream'](request, retry=retry, timeout=timeout, metadata=metadata) File "/usr/local/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__ return wrapped_func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/google/api_core/retry.py", line 281, in retry_wrapped_func return retry_target( File "/usr/local/lib/python3.8/site-packages/google/api_core/retry.py", line 184, in retry_target return target() File "/usr/local/lib/python3.8/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout return func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 150, in error_remapped_callable return _StreamingResponseIterator(result, prefetch_first_result=prefetch_first) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 73, in __init__ self._stored_first_result = six.next(self._wrapped) File "/usr/local/lib/python3.8/site-packages/grpc/_interceptor.py", line 144, in __next__ raise self._exception File "/usr/local/lib/python3.8/site-packages/grpc/_interceptor.py", line 335, in __call__ return self._interceptor.intercept_unary_stream( File "/usr/local/lib/python3.8/site-packages/google/ads/google_ads/interceptors/logging_interceptor.py", line 354, in intercept_unary_stream response.add_done_callback(on_rpc_complete) File "/usr/local/lib/python3.8/site-packages/google/ads/google_ads/interceptors/exception_interceptor.py", line 73, in add_done_callback return self._underlay_call.add_done_callback(fn) AttributeError: '_SingleThreadedRendezvous' object has no attribute 'add_done_callback' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 554, in apache_beam.runners.common.SimpleInvoker.invoke_process File "pubsub_main.py", line 190, in process File "/usr/local/lib/python3.8/site-packages/platform_data_pull/list_accounts_beta.py", line 49, in harvest_data campaignSettings(self.api_client,self.client,self.cid,changed_entities['campaigns']) File "/usr/local/lib/python3.8/site-packages/platform_data_pull/campaign_pull/campaign_settings_beta.py", line 30, in campaignSettings response = ga_service.search_stream(account,query=query) File "/usr/local/lib/python3.8/site-packages/google/ads/google_ads/v4/services/google_ads_service_client.py", line 314, in search_stream return self._inner_api_calls['search_stream'](request, retry=retry, timeout=timeout, metadata=metadata) File "/usr/local/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__ return wrapped_func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/google/api_core/retry.py", line 281, in retry_wrapped_func return retry_target( File "/usr/local/lib/python3.8/site-packages/google/api_core/retry.py", line 184, in retry_target return target() File "/usr/local/lib/python3.8/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout return func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 150, in error_remapped_callable return _StreamingResponseIterator(result, prefetch_first_result=prefetch_first) File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 73, in __init__ self._stored_first_result = six.next(self._wrapped) File "/usr/local/lib/python3.8/site-packages/grpc/_interceptor.py", line 144, in __next__ raise self._exception File "/usr/local/lib/python3.8/site-packages/grpc/_interceptor.py", line 335, in __call__ return self._interceptor.intercept_unary_stream( File "/usr/local/lib/python3.8/site-packages/google/ads/google_ads/interceptors/logging_interceptor.py", line 354, in intercept_unary_stream response.add_done_callback(on_rpc_complete) File "/usr/local/lib/python3.8/site-packages/google/ads/google_ads/interceptors/exception_interceptor.py", line 73, in add_done_callback return self._underlay_call.add_done_callback(fn) AttributeError: '_SingleThreadedRendezvous' object has no attribute 'add_done_callback' [while running 'Pull GoogleAds Data']

This happens during the execution of the request from the API.

`ga_service = apiClient.get_service(‘GoogleAdsService’,version=‘v4’)

query = ('SELECT campaign.id, campaign.name, campaign.status, campaign.serving_status, campaign.start_date, campaign.end_date, campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.frequency_caps, campaign.ad_serving_optimization_status,'
         ' campaign.bidding_strategy, campaign.bidding_strategy_type, campaign.campaign_budget,'
         ' campaign.network_settings.target_content_network, campaign.network_settings.target_google_search, campaign.network_settings.target_partner_search_network, campaign.network_settings.target_search_network,'
         ' campaign.geo_target_type_setting.negative_geo_target_type, campaign.geo_target_type_setting.positive_geo_target_type'
         ' FROM campaign'
         ' WHERE campaign.serving_status != SUSPENDED AND campaign.status != PAUSED'
         ' ORDER BY campaign.id')


response = ga_service.search_stream(account,query=query)`

Any help would be greatly appreciated as this is a serious roadblock for the project.

Thanks!

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 20 (7 by maintainers)

Most upvoted comments

The GoogleAdsService.SearchStream is impacted but not GoogleAdsService.Search

This worked for me response = ga_service.search(account,query=query)

https://developers.google.com/google-ads/api/docs/reporting/streaming

Hi @DBCerigo (and All) - there’s a new version of grpcio that should resolve this problem when using grpc.experimental

Can you try installing grpcio==1.33.2 to see if the error goes away?

Brief update on this:

First, I’ve been unable to repro this because my environment was not properly configured to use the single threaded method for streaming requests, I’ve instead been using multithreaded. This is because this library is not properly importing the config option here. We will instead need to explicitly import grpc.experimental, which we’ll update soon. My guess is everyone in this thread is hitting this error because another part of their application or a dependency is importing grpc.experimental.

There is also an issue in the grpcio library that’s being worked on now, once there’s an Issue I’ll link to it.

In the meantime there unfortunately isn’t a great workaround. You could temporarily change the links in client.py linked above in your local installation so that unary_stream_single_threading_option evaluates to None. That would at least unlock you to work on your implementations until we get a permanent fix in.

Again, apologies for the long-standing thread here. It was a tricky issue to identify but we’ll get it fixed as soon as possible.

@DBCerigo thanks for the info and apologies for not responding sooner. I’m pretty sure this is an issue with the version of grpcio since that’s where _SingleThreadedRendezvous is defined. Do you get any variance in behavior if you change that dependency version? grpcio is a dependency of google-api-core so it may be installing a version that’s incompatible with this library. I’m going to connect with the team that works on that library to see if we can reconcile the different versions.

you will find significantly better performance

Sure ! but it’s need a working search_stream method before 🙂

If you are pulling bulk records, you will find significantly better performance with streamed responses over paged responses. The APIs are very similar on purpose to ease transition.

Yep, hence why this currently blocking bug (AttributeError: '_SingleThreadedRendezvous' object has no attribute 'add_done_callback') is a priority to be resolved for us.

I’m also encountering this exception. Code was running without error previously (a few days ago). I get the same error on both google-ads==6.0.0 or google-ads==7.0.0.

I’m running the code locally from within a jupyter notebook in case that is of relevance. It seems like the issue only occurs when running from within a notebook kernel.

Query I’m running is:

       SELECT
            ad_group.id,
            ad_group.name,
            ad_group.status,
            ad_group.labels,
            campaign.name
        FROM
            ad_group
        WHERE
            campaign.status != 'REMOVED'
            AND ad_group.status != 'REMOVED'

and the full trace back (but not including trace before search_stream) is:

~/.../.venv/lib/python3.7/site-packages/google/ads/google_ads/v4/services/google_ads_service_client.py in search_stream(self, customer_id, query, summary_row_setting, retry, timeout, metadata)
    312             metadata.append(routing_metadata)
    313 
--> 314         return self._inner_api_calls['search_stream'](request, retry=retry, timeout=timeout, metadata=metadata)
    315 
    316     def mutate(

~/.../.venv/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
    143             kwargs["metadata"] = metadata
    144 
--> 145         return wrapped_func(*args, **kwargs)
    146 
    147 

~/.../.venv/lib/python3.7/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
    284                 sleep_generator,
    285                 self._deadline,
--> 286                 on_error=on_error,
    287             )
    288 

~/.../.venv/lib/python3.7/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
    182     for sleep in sleep_generator:
    183         try:
--> 184             return target()
    185 
    186         # pylint: disable=broad-except

~/.../.venv/lib/python3.7/site-packages/google/api_core/timeout.py in func_with_timeout(*args, **kwargs)
    212             """Wrapped function that adds timeout."""
    213             kwargs["timeout"] = next(timeouts)
--> 214             return func(*args, **kwargs)
    215 
    216         return func_with_timeout

~/.../.venv/lib/python3.7/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
    148             # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
    149             prefetch_first = getattr(callable_, "_prefetch_first_result_", True)
--> 150             return _StreamingResponseIterator(result, prefetch_first_result=prefetch_first)
    151         except grpc.RpcError as exc:
    152             six.raise_from(exceptions.from_grpc_error(exc), exc)

~/.../.venv/lib/python3.7/site-packages/google/api_core/grpc_helpers.py in __init__(self, wrapped, prefetch_first_result)
     71         try:
     72             if prefetch_first_result:
---> 73                 self._stored_first_result = six.next(self._wrapped)
     74         except TypeError:
     75             # It is possible the wrapped method isn't an iterable (a grpc.Call

~/.../.venv/lib/python3.7/site-packages/grpc/_interceptor.py in __next__(self)
    142 
    143     def __next__(self):
--> 144         raise self._exception
    145 
    146     def next(self):

~/.../.venv/lib/python3.7/site-packages/grpc/_interceptor.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
    334         try:
    335             return self._interceptor.intercept_unary_stream(
--> 336                 continuation, client_call_details, request)
    337         except Exception as exception:  # pylint:disable=broad-except
    338             return _FailureOutcome(exception, sys.exc_info()[2])

~/.../.venv/lib/python3.7/site-packages/google/ads/google_ads/interceptors/logging_interceptor.py in intercept_unary_stream(self, continuation, client_call_details, request)
    352         response = continuation(client_call_details, request)
    353 
--> 354         response.add_done_callback(on_rpc_complete)
    355 
    356         return response

~/.../.venv/lib/python3.7/site-packages/google/ads/google_ads/interceptors/exception_interceptor.py in add_done_callback(self, fn)
     71 
     72     def add_done_callback(self, fn):
---> 73         return self._underlay_call.add_done_callback(fn)
     74 
     75     def add_callback(self, callback):

AttributeError: '_SingleThreadedRendezvous' object has no attribute 'add_done_callback'