aws-sdk-js-v3: EMFIL error when using SQS and DynamoDB for large amount of data

Checkboxes for prior research

Describe the bug

Related to this issue. We still have errors, and after more thorough investigation it was identified, that errors occur not only while using SQS client, but while using DynamoDB client as well.

Case: our lambda reads kafka topics (may simultaneously read up to 6 topics and process messages from them), and during high load (from 1000 messages in topic(s)) we see EMFIL errors during processing. Errors do not occur at the beginning of the processing, usually after some amount was already processed. E.g. our log timeline looks like this usually: image Where blue color messages are ok, red and dark red - error messages.

So, when using AWS SDK@3 we encounter errors EMFIL, it seems that there’re not enough descriptors. But the same operations were ok with v2, problems surfaced only after moving to v3. Please see details below.

SDK version number

@aws-sdk/client-sqs@3.363.0, @aws-sdk/client-dynamodb@3.363.0, @aws-sdk/node-http-handler@3.374.0, @aws-sdk/util-dynamodb@3.365.0

Which JavaScript Runtime is this issue in?

Node.js

Details of the browser/Node.js/ReactNative version

node v16.17.1

Reproduction Steps

Simplified example of our code is below. I tried to include all steps which may help to solve the issue.

import { SendMessageRequest, SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs'
import { NodeHttpHandler } from '@aws-sdk/node-http-handler';
import https from 'https';
import { MSKEvent } from 'aws-lambda';
import {
  DynamoDBClient, PutItemCommand
} from '@aws-sdk/client-dynamodb';
import { marshall } from '@aws-sdk/util-dynamodb';

let isInitialized = false;
let failHandler: FailHandler;
let sqs: SQSClient;
let sqsQueueUrl: SendMessageRequest['QueueUrl'] | undefined;

const region = 'aws-region';
const sqsName = 'sqs-name';
const dlqName = 'dlq-name';
const tableName = 'table-name';

type LambdaFailure = {
  reason: string;
  stack: string;
  message: string;
  event: Record<string, unknown>
};

type Message = {
  id: string;
  dates: string[]; // '2023-07-25'
  toBeSendNow: boolean;
  [otherFileds: string]: unknown;
}

async function initSqs() {
  const agent = new https.Agent({
    maxSockets: 25
  });
  sqs = new SQSClient({
    region,
    requestHandler: new NodeHttpHandler({
      httpsAgent: agent
   })
  });
  sqsQueueUrl = getSqsQueueUrl(sqsName);
}

class FailHandler {
  private sqs: SQSClient;

  private readonly queueUrl: string;

  constructor(private sqsOptions: { queueName: string }) {
    this.sqs = new SQSClient({
      region
    });
    this.queueUrl = getSqsQueueUrl(sqsOptions.queueName);
  }

  public async handle(msg: LambdaFailure): Promise<void> {
    try {
      const command = new SendMessageCommand({
        MessageBody: JSON.stringify(msg),
        QueueUrl: this.queueUrl
      });
      await this.sqs.send(command);
    } catch (err) {
      console.error('failed to send event', err); // here we get EMFIL printed to log
    }
  }
}

class Repository {
  private readonly connection: DynamoDBClient;

  private readonly table: string = tableName;

  constructor() {
    this.connection = new DynamoDBClient({});
  }

  public async addData(entity: Message): Promise<void> {
    const {
      id, dates
    } = entity;
    
    const key = `${id}${dates[0]}`
    const item = {
      key,
      event: JSON.stringify(entity)
    };
    const command = new PutItemCommand({
      TableName: this.table,
      Item: marshall(item)
    });

    await this.connection.send(command); // at this point errors occur
  }
}

class SQSService {
  constructor(
    private readonly sqs: SQSClient,
    private readonly queueUrl: SendMessageRequest['QueueUrl'] | undefined
  ) {
    this.sqs = sqs;
    this.queueUrl = queueUrl;
  }

  public async sendMessageToSQS(message: Message): Promise<void> {
    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify({
        message
      })
    });
    await this.sqs.send(command); // at this point errors occur
  }
}

class MessagesService {
  constructor(
    private readonly dynamoRepo: Repository,
    private readonly sqsService: SQSService,
    private readonly failHandler: FailHandler
  ) {
    this.failHandler = failHandler;
  }
  private async sendNowMessages(message: Message | null) {
    if (!message) {
      return;
    }
    await this.sqsService.sendMessageToSQS(message);
  }

  private async handleOtherMessages(messages: Message[]) {
    if (!messages.length) {
      return;
    }
    const promises = messages.map((message) => this.dynamoRepo.addData(message));
    await Promise.all(promises);
  }

  public handleMessage = async (kafkaMessage: Message): Promise<void> => {
    let nowMessage: Message | null = null;
    const otherMessages: Message[] = [];

    if (kafkaMessage.toBeSendNow) {
      nowMessage = kafkaMessage;
    }
    for (const date of kafkaMessage.dates) {
      const msg = {
        ...kafkaMessage,
        dates: [date]
      };
      otherMessages.push(msg);
    }

    try {
      await this.sendNowMessages(nowMessage);
      await this.handleOtherMessages(otherMessages);
    } catch (err) {
      await this.failHandler.handle({
        event: kafkaMessage as any,
        message: (err as Error).message,
        reason: 'reason',
        stack: (err as Error).stack ?? ''
      });
    }
    
  }
}

async function handler(event: MSKEvent): Promise<void> {
  if (!isInitialized) {
    isInitialized = true;
    failHandler = new FailHandler({ queueName: dlqName });
    await initSqs();
  }
  try {
    // here messages are decoded
    // let's just take messages as an array
    const messages: Message[] = [
      {
        id: '1',
        dates: ['2024-01-01', '2025-01-03', '2025-03-01'],
        toBeSendNow: true
      }
    ]; // from 25 to 75 items of Message type (see above)

    const sqsService = new SQSService(sqs, sqsQueueUrl);
    const dynamoDbRepo = new Repository();
    const messageService = new MessagesService(dynamoDbRepo, sqsService, failHandler);

    const messagePromises = messages.map((message) => messageService.handleMessage(message));
    await Promise.all(messagePromises);
  } catch (err) {
    console.error('Lambda failed', err);
    throw err;
  }
}

If you need, I may generate for you messages by indicated simplified interface.

Observed Behavior

Errors when using SQS:

"err": {
            "type": "Error",
            "message": "connect EMFILE 3.250.244.21:443 - Local (undefined:undefined)",
            "stack": "Error: connect EMFILE 3.250.244.21:443 - Local (undefined:undefined)\n    at internalConnect (node:net:1041:16)\n    at defaultTriggerAsyncIdScope (node:internal/async_hooks:464:18)\n    at GetAddrInfoReqWrap.emitLookup [as callback] (node:net:1187:9)\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:111:8)",
            "errno": -24,
            "code": "EMFILE",
            "syscall": "connect",
            "address": "3.250.244.21",
            "port": 443,
            "$metadata": {
                "attempts": 1,
                "totalRetryDelay": 0
            }
        }
"err": {
            "type": "Error",
            "message": "getaddrinfo EMFILE sqs.eu-west-1.amazonaws.com",
            "stack": "Error: getaddrinfo EMFILE sqs.eu-west-1.amazonaws.com\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:109:26)",
            "errno": -24,
            "code": "EMFILE",
            "syscall": "getaddrinfo",
            "hostname": "sqs.eu-west-1.amazonaws.com",
            "$metadata": {
                "attempts": 1,
                "totalRetryDelay": 0
            }
        }

Errors when using DynamoDB:

"err": {
            "type": "Error",
            "message": "connect EMFILE 52.119.242.12:443 - Local (undefined:undefined)",
            "stack": "Error: connect EMFILE 52.119.242.12:443 - Local (undefined:undefined)\n    at internalConnect (node:net:1041:16)\n    at defaultTriggerAsyncIdScope (node:internal/async_hooks:464:18)\n    at GetAddrInfoReqWrap.emitLookup [as callback] (node:net:1187:9)\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:111:8)",
            "errno": -24,
            "code": "EMFILE",
            "syscall": "connect",
            "address": "52.119.242.12",
            "port": 443,
            "$metadata": {
                "attempts": 1,
                "totalRetryDelay": 0
            }
        }
"err": {
            "type": "Error",
            "message": "getaddrinfo EMFILE dynamodb.eu-west-1.amazonaws.com",
            "stack": "Error: getaddrinfo EMFILE dynamodb.eu-west-1.amazonaws.com\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:109:26)",
            "errno": -24,
            "code": "EMFILE",
            "syscall": "getaddrinfo",
            "hostname": "dynamodb.eu-west-1.amazonaws.com",
            "$metadata": {
                "attempts": 1,
                "totalRetryDelay": 0
            }
        }

Expected Behavior

No EMFILE errors.

Possible Solution

If we knew we’d open PR, not issue 😃

Additional Information/Context

No response

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 2
  • Comments: 16 (5 by maintainers)

Most upvoted comments

try initializing the DynamoDBClient that is used in new Repository outside of the handler code path.

Hello, I’ve faced the same problem. In general terms, the lambda receives a 100-message batch and should send the same number of messages to SQS. I measured the number of descriptors on the lambda runs on second and third versions of aws-sdk. It is expected to create a number of descriptors for sending messages equal to the size of the batch - that happens on both versions of sdk. In a situation where there is only one lambda run, namely in a situation where open descriptors will not be reused, I saw that after the messages have been sent, the second version deletes ALL descriptors for sending messages and the third version sometimes leaves some open. But if more messages are sent to this hot lambda, the initial number of descriptors will be larger than at the first run. It turns out that the number of descriptors will grow during subsequent lambda runs until we get an EMFILE error.

Surely if I destroy SQS client at the end of each execution of the lambda, then the number of descriptors becomes equal to the initial one and that helps to get rid of the problem, but I’m not sure if it should work like that.

Hello, After several attempts on my part, it turns out that my problem is similar to what @RanVaknin said. I was calling a lambda thought the LambdaClient from @aws-sdk/client-lambda and for each call, i was re-creating a new instance of this client. There was no problem on the AWS SDK v2, but in the v3, it seems that a remaining connection stays alive and uses a file descriptor even after garbage collection. This connector seems to close itself after a little time, but in a batch process, you can quickly reach the amount of FD limit (1024 for a lambda).

My fix is simple : always re-use the same client when you have to call the same ressource.

NB: AWS_NODEJS_CONNECTION_REUSE_ENABLED seems to not work when i was creating multiple instance of LambdaClient