aws-xray-sdk-python: async operations with patched aiobotocore gives error "Already ended segment and subsegment cannot be modified."

I’m running into some problems with XRay in combination with async functions and aiobotocore.

Packages:

aioboto3==6.4.1
aiobotocore==0.10.2
aws-xray-sdk==2.4.2
boto3==1.9.176
botocore==1.12.176

I’ve created a very small sample script to demonstrate my issue:

import aioboto3
import asyncio
import logging
import sys

from datetime import datetime

from aws_xray_sdk.core import patch
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext

logger = logging.getLogger()
logger.setLevel(logging.INFO)

xray_logger = logging.getLogger('aws_xray_sdk')
xray_logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)

xray_recorder.configure(service='Test', context=AsyncContext(), sampling=False)

patch([
    'aiobotocore',
], raise_errors=False)


async def perform_test(gather=False, loop_count=10):
    async_client = aioboto3.client('clouddirectory', region_name='eu-west-1')

    try:
        with xray_recorder.in_segment_async('ASYNC'):
            @xray_recorder.capture_async('execute')
            async def execute(client):
                retval = await client.list_directories(state='ENABLED')
                return retval['Directories']

            results = []
            if gather:
                with xray_recorder.capture_async('gather'):
                    for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
                        results += result

            else:
                with xray_recorder.capture_async('loop'):
                    for i in range(loop_count):
                        results += await execute(async_client)

            return results
    finally:
        await async_client.close()


def test_gather(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
        logger.info('GATHER: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('GATHER: FAIL')


def test_loop(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=False, loop_count=loop_count))
        logger.info('LOOP: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('LOOP: FAIL')


logger.info('==========================================')
test_gather()
logger.info('==========================================')
logger.info('==========================================')
test_loop()
logger.info('==========================================')

The expected outcome should be something like:

==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: PASS 0:00:00.251635
==========================================
==========================================
LOOP: PASS 0:00:01.386801
==========================================

The timings are irrelevant, but it it means that both approaches of using aiobotocore and XRay work together. However, the actual outcome is:

successfully patched module aiobotocore
==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: FAIL
Traceback (most recent call last):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 82, in record_subsegment_async
    return_value = await wrapped(*args, **kwargs)
  File "/home/idoorn/xray_test.py", line 36, in execute
    retval = await client.list_directories(state='ENABLED')
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py", line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 101, in record_subsegment_async
    stack=stack,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/boto_utils.py", line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 102, in put_http_meta
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/idoorn/xray_test.py", line 58, in test_gather
    asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/home/idoorn/xray_test.py", line 42, in perform_test
    for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 33, in __call__
    meta_processor=None,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 105, in record_subsegment_async
    subsegment.add_exception(exception, stack)
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 220, in add_exception
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.
==========================================
==========================================
LOOP: PASS 0:00:01.989784
==========================================

Using asyncio.gather() doesn’t seem to play nice with xray, as when the call to clouddirectory comes, the subsegment which was used for tracing the call has already been closed. Only when calling the async calls sequentially will xray work nicely. But that obviously would kind of defeat the benefits of using async. 😃

NOTE: if I remove the lines:

patch([
    'aiobotocore',
], raise_errors=False)

from the above test script, everything works well, but no tracing is occurring on the clouddirectory calls obviously.

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Reactions: 2
  • Comments: 46 (18 by maintainers)

Most upvoted comments

Any update on this issue? I’m experiencing a similar problem but even with or without patching aiobotocore.

Traceback (most recent call last):
  File '/path/to/app/tasks.py', line 276, in work
    await self._fire_medium_messages(medium, client)
  File '/path/to/app/tasks.py', line 243, in _fire_medium_messages
    await self._execute_fire_tasks(tasks, message_ids, is_bulk_test=bulk_test, template=msg_state_template)
  File '/path/to/app/tasks.py', line 200, in _execute_fire_tasks
    results = await asyncio.gather(*tasks)
  File '/path/to/app/mediums/message.py', line 51, in fire
    await client.send_message(**args)
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py', line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py', line 101, in record_subsegment_async
    stack=stack,
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/ext/boto_utils.py', line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py', line 102, in put_http_meta
    self._check_ended()
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py', line 283, in _check_ended
    raise AlreadyEndedException('Already ended segment and subsegment cannot be modified.')
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

This is how I used aws_xray_sdk

import aiobotocore
from aws_xray_sdk.core import patch, xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext
xray_recorder.configure(service='sqs', context=AsyncContext())
patch(modules_to_patch=['aiobotocore'])

These are the modules I’m using:

aiobotocore==0.10.3
aiohttp==3.3.2
aioredlock==0.2.0
aiomysql==0.0.19
aioelasticsearch
asyncpgsa==0.22.0
aws-xray-sdk==2.4.2
boto3==1.9.99

Hey,

Thank you for reporting the issue you are having and giving a detailed explanation of it. This will definitely help us in determining the root cause and finding a solution for it. It sounds like the issue is that the asynchronous tasks aren’t holding the appropriate references to their parents, and so when the tasks are finished and the subsegments are closed, only the top-most parents in the context entity list are being referenced. We’ll have to investigate this and see what changes are necessary to provide a fix. Please stay tuned

Thanks again for reporting this issue!

Hi all, Apologies for the delay on diving into this issue. I’ll try to get this prioritized on our backlog. Thank you for being patient.

@NathanielRN I don’t think this PR should be closed

I have updated to a version that includes #340 and am still seeing the original issue

aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

My package versions are:

aioboto3==11.2.0
aiobotocore==2.5.0
aws-xray-sdk==2.12.0
aws-lambda-powertools==2.18.0
boto3==1.26.76
botocore==1.29.76

I’m planning to try @garyd203’s library next to see if it resolves

Hi @garyd203,

Thank you for developing this library & proposing your solution. We will add it to our backlog to investigate using your AsyncContext solution in the SDK here. Alternatively, if you were able to make a pull request we would be happy to test it with the initial problematic sample app to verify it resolves the bug.

I encountered OP’s original error, and solved it by patching AsyncContext. It’s in a library of other X-Ray patches and extensions called xraysink. See that library’s documentation for how to patch asyncio tasks.

Detailed Explanation

I agree with @IvDoorn 's analysis that this problem is to do with asyncio usage rather than any specific library, and addiitonally displays as a symptom of endlessly nested subsegments (rather than parallel subsegments). Specifically, every asyncio Task will used a shared segment stack (see task_factory) which means subsegments will be added and removed by parallel processing, whereas the segment stack is designed to be used in sequential processing only.

My solution was to propagate a copy of the segment stack to child asyncio Task’s. This means that any subsegments created by the Task will have the correct parent. It was over a year ago, so i don’t remember exactly what happens to the unused top part of the segment stack in the child Task, but IIRC it gets ignored and just works out in the wash

aws-xray-sdk maintainers (@willarmiros @srprash): The xraysink library is Apache licensed, so feel free to migrate any of that code into your main library.

Hi, I am sorry for not responding in a timely fashion. I am currently very busy in my job and had no time to dig into it, yet. So do not expect progress on this from my side in the near future.

Thank you for sharing your findings.

I agree that contextvars is the way to go. I think we have to check backward compatibility. IMHO the problem is caused by the add/remove order mentioned by IvDoorn in comment https://github.com/aws/aws-xray-sdk-python/issues/164#issuecomment-509981165. I hope I can find some time on weekend to investigate the rewrite of AsyncContext.

I’ll come back to you when I have something to share.

Hi @cbuschka,

Sorry we haven’t been able to figure this one out yet. I spent quite a bit of time trying to fix this issue myself to no avail. I think that the root cause might lie with us using a stack here stored in each task (when using AsyncContext) to record all in-flight (sub)segments. There might be a race condition with retrieving entities from the stack that causes this issue.

Unfortunately I spent a bit too much trying to fix our current setup instead of trying to replace it altogether. It might be worth looking into using the contextvars object in Python to store the stack instead of just storing it as a field in the task. According to the python docs, contextvars is the right tool to use for async context use cases.

The problem seems to be that the aws_xray_sdk has a single list of traces which are open. And has problems when segments are closed out of order (which is a normal pattern when asyncio.gather() is being used, and I think also common for the async pattern in general)

With some debugging I found that async_recorder.py::record_subsegment_async() keeps track of the subsegment. as soon as it arrives in the finally statement, it adds the required annotations to the subsegment which it has a reference to. But when it calls end_subsegment it doesn’t close the subsegment it has a reference to, but it closes a subsegment which is on top of the entities stack, which might actually be a completely different subsegment.

The async function is in a class, and the sync function is in the same class, so it’s something like this:

class  MyClass:
    async def do_some_async_work(self, params: list[str]):
        value_ids = await asyncio.gather(*[self.get_a_value(param) for param in params])
        return self.get_some_results(value_ids)

    async def get_a_value(self, param: str) -> str:
        response = await call_some_service(param)
        return response["id"]

    def get_some_results(self, value_ids: list[str]) -> :
        with self.get_db_connection() as conn:
            results = conn.execute("select something from somewhere where value_id in %s", tuple(value_ids)).fetchall()

        return [result[0] for result in results]

The conn.execute is synchronous (as is the get_db_connection).

No thread-pools for this right now.

Hi @Padwicker We still have this task in our backlog. Will try to prioritize this as soon as possible. We would appreciate if you want to contribute based on the investigation in this issue. Thanks!

I’m seeing this using gevent as well, with the regular, non-aio versions of boto3 and botocore. Since gevent doesn’t use asyncio (it’s a concurrency alternative to asyncio that uses greenlets for anyone unfamiliar with it), I would guess it’s running into the issue @IvDoorn mentioned where segments are assumed to be stacks, but I can’t say for sure.

Hey,

I’m afraid you are staring at aiobotocore, while that is not where the problem is. As mentioned in my first post, the problem is in asyncio.gather(). Also please see my comment here: https://github.com/aws/aws-xray-sdk-python/issues/164#issuecomment-507253449

Here is my updated script, which still demonstrates the issue, but does not use aiobotocore.

import asyncio
import logging
import sys

from datetime import datetime

from aws_xray_sdk.core import patch
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext

logger = logging.getLogger()
logger.setLevel(logging.INFO)

xray_logger = logging.getLogger('aws_xray_sdk')
xray_logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)

xray_recorder.configure(service='Test', context=AsyncContext(), sampling=False)

async def perform_test(gather=False, loop_count=10):

    with xray_recorder.in_segment_async('ASYNC'):
        @xray_recorder.capture_async('execute')
        async def execute(i, maximum):
            segment = xray_recorder.current_subsegment()
            segment.put_annotation('before', True)
            await asyncio.sleep(maximum - 1)
            segment.put_annotation('after', True)

        if gather:
            with xray_recorder.capture_async('gather'):
                await asyncio.gather(*[execute(i, loop_count) for i in range(loop_count)])
        else:
            with xray_recorder.capture_async('loop'):
                for i in range(loop_count):
                    await execute(i, loop_count)


def test_gather(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
        logger.info('GATHER: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('GATHER: FAIL')


def test_loop(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=False, loop_count=loop_count))
        logger.info('LOOP: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('LOOP: FAIL')


logger.info('==========================================')
test_gather(5)
logger.info('==========================================')
logger.info('==========================================')
test_loop(5)
logger.info('==========================================')

The problem is this:

1) -> start Segment
2) -> Run asyncio.gather([range(5)])
3) -> Handler 1 starts
4) -> Create Subsegment 1, grab current segment as parent (which is Segment)
5) -> Perform an IO operation (go to sleep)
6) -> Handler 2 starts
7) -> Create Subsegment 2, grab current segment as parent (which is Subsegment 1)
8) -> Perform an IO operation (go to sleep)
9) -> Handler 1 IO operation completes
10) -> Handler 1 closes the current segment (which is Subsegment 2)
11) -> Handler 2 IO operation completes
12 -> Handler 2 closes the current segment (which is Subsegment 1)

As you can see, steps 7, 10 and 12 are all performing bad operations. If the handlers would try to put an annotation to their segment, handler2 would discover his segment to have been closed already.

The XRay code assumes that each handler passed to asyncio.gather() are completing in the exact reversed order as provided to asyncio.gather call. For which there is no such guarantee, stuff is async, so one handler might be much faster then another.

I guess what needs to happen is:

  • Patch asyncio.gather(), to ensure that each of the handlers which is passed get an environment where the “current segment” points at the correct segment (the segment which was current at the time of calling asyncio.gather().
  • Anywhere in the code where the stack of segments is conspired a lifo queue, should be updated, to ensure they always interact with correct segment.

I’m experiencing this as well. This happens if I have an async function that uses await asyncio.gather(*[foo(x) for x in list]).

Once it’s switched to for x in list: await foo(x), it works fine.

Not sure if it is relevant, but I’ve noticed that the XRay segments when using asyncio.gather() are build up in a deep tree. For example: asyncio.gather([task() for i in range(5)])

yields segments:

segment = {
  id = 1
  subsegment = [{
    id = 2
    subsegment = [{
      id = 3
      subsegment = [{
        id = 4
        subsegment = [{
          id = 5
          subsegment = [{
            id = 6
          }]
        }]
      }]
    }]
  }]
}

which doesn’t really feel correct, as I would expect a similar outcome as when I called them sequentially:

segment = {
  id = 1
  subsegment = [{
    id = 2
  },{
    id = 3
  },{
    id = 4
  },{
    id = 5
  },{
    id = 6
  }]
}

I’m just raising this as observation, as maybe this isconstruction is the cause of the issue. If not then my comment can be discarded.