nest: Kafka Microservice Does not validate DTO

Bug Report

Kafka Microservice does not validate dto’s. kafka payload response is incorrect according to docs.

Current behavior

@MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any

it does not output KillDragonMessage interface but interface IncomingMessage

@Controller('account')
export class AccountController {

  @MessagePattern('account_me')
  getMe(
    @Payload() payload: SomeDto
  ) {
      console.log('Received payload', payload);
    return { };
  }

AccountController code above gives an error, [ERROR] 15:45:00 TypeError: The first argument must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object. Received an instance of Object

value of payload comes out to IncomingMessage which is

payload {
  magicByte: 2,
  attributes: 0,
  timestamp: '1608237989883',
  offset: '33',
  key: null,
  value: { id: 999, something: 1 },
  headers: {
    kafka_correlationId: '58c52bab-3fad-4b7a-9f96-4d46bb5ceb78',
    kafka_replyTopic: 'account_me.reply',
    kafka_replyPartition: '0'
  },
  isControlRecord: false,
  batchContext: {
    firstOffset: '33',
    firstTimestamp: '1608237989883',
    partitionLeaderEpoch: 0,
    inTransaction: false,
    isControlBatch: false,
    lastOffsetDelta: 0,
    producerId: '-1',
    producerEpoch: 0,
    firstSequence: 0,
    maxTimestamp: '1608237989883',
    timestampType: 0,
    magicByte: 2
  },
  topic: 'account_me',
  partition: 0
}

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 5
  • Comments: 37 (16 by maintainers)

Most upvoted comments

I got this interface from someone on another thread on here and am using that for the @Payload type.

export interface IncomingKafkaMessage<V = unknown, K = unknown, H = Record<string, unknown>> {
  magicByte: number;
  topic: string;
  partition: number;
  timestamp: string;
  size: number;
  attributes: number;
  offset: string;
  key: K;
  value: V;
  headers: H;
  isControlRecord: boolean;
  batchContext: {
    firstOffset: string;
    firstTimestamp: string;
    partitionLeaderEpoch: number;
    inTransaction: boolean;
    isControlBatch: boolean;
    lastOffsetDelta: number;
    producerId: string;
    producerEpoch: number;
    firstSequence: number;
    maxTimestamp: string;
    magicByte: number;
  };
}

So for the example would be:

@Payload() message: IncomingKafkaMessage<KillDragonMessage>

And to access:

message.value.dragonId

Where value will be correctly typed as KillDragonMessage

I think the documentation is misleading because it says:

the ValidationPipe works the same for WebSockets and microservices, regardless of the transport method that is used

So I assumed setting app.useGlobalPipes will apply auto-validation for microservice controllers and websocket gateways, too.

I tried with websocket gateway but they are not auto-validated, I need to add @UsePipes(new ValidationPipe()) to each @SubscribeMessage. Or is this a bug?

@enesyalcin i forgot which method i was using…

you can try doing this first,

export class PayloadDto {
  @Expose()
  @Type(() => SomeInputDto)
  value: SomeInputDto;
}

in controller use like @Payload() payload: PayloadDto validation will start working. make sure to add validation pipe in global or class.

if that doesnt work, following is the pipe that you can use. Problem is kafka payload.value has the dto needed so validator doesnt work as we need to define @type for validator. so if you modify the pipe to use payload.value you will achieve the same.

@Injectable()
export class KafkaValidationPipe extends ValidationPipe {
  public async transform(value: any, metadata: ArgumentMetadata) {
/// change data etc
    return await super.transform(value, metadata);
  }
}

and in your main.ts

    app.useGlobalPipes(
      new KafkaValidationPipe({
        transform: true,
        whitelist: true,
        forbidUnknownValues: true,
        forbidNonWhitelisted: true,
        skipMissingProperties: false,
      }),
    );

@underfisk @jmcdo29

this is my minimum reproduction, i want to validate create request organization entity from api gateway

https://github.com/tanyudii/nestjs-kafka

@underfisk already said that we need to have a ValidationPipe for Micrsoservices (be it kafka or RabbitMQ). Check this out:

import { Controller, Logger, UseFilters, UsePipes, ValidationPipe } from '@nestjs/common';

@Controller('event-broker')
export class EventBrokerController {

  @UseFilters(new ExceptionFilter())
  @UsePipes(new ValidationPipe())
  @EventPattern('jsontest', Transport.KAFKA)
  async processStream(
    @Payload() message: CommunicationEventDto,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    const { value, headers, topic, key } = message;

    // here your logic
    });
  }

my validation object

import { IsNumber, IsObject, IsString } from 'class-validator';

interface IncomingMessage {
  topic: string;
  partition: number;
  timestamp: string;
  magicByte: number;
  attributes: number;
  offset: string;
  key: any;
  value: any;
  headers: Record<string, any>;
}

export interface CommunicationEventHeader {
  mvno_id?: number;
  endpoint?: string;
  msisdn?: string;
  email?: string;
  priority?: 'HIGH' | 'MID' | 'LOW';
  allowedChannels?: any;
}

export class CommunicationEventDto implements IncomingMessage {
  @IsString()
  readonly topic: string;

  @IsNumber()
  readonly partition: number;

  @IsString()
  readonly timestamp: string;

  @IsNumber()
  readonly magicByte: number;

  @IsNumber()
  readonly attributes: number;

  @IsString()
  readonly offset: string;

  @IsString()
  readonly key: string;

  @IsObject()
  readonly value: { payload: any };

  @IsObject()
  readonly headers: CommunicationEventHeader | Record<string, any>;
}

@kodeine I had the same issue also for RabbitMQ, what happens is that you need ValidationPipe, since Kafka has a custom object, you need to pass on the payload your Pipe to validate, i ended up creating my own but class-validator will do just fine Try to import globally validation pipe(if you have an hybrid app) or just use in your method the decorator to test