confluent-schema-registry: protobuf schema doesn't parse protobuf wire protocol correctly.

There are a whole slew of problems. First and foremost, it isn’t honoring the wire protocol definition - because the docs are structured ridiculously. They document the wire format for avro in a table, making it appear that that is the total of the wire format. But then after the table, there’s a paragraph that goes on to describe how protobufs add an extra array of descriptor information to the header before the protobuf data. Look after the green box on this page: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

The protobuf schema implementation completely ignores the array of message descriptor indexes, instead just searching the buffer for the first non-zero byte. But even the first message in a protobuf file can be encoded with the array 1,0 instead of just a 0, implying an array of length 1, and a message index of 0, instead of an array of length 0 and an assumed message index of 0. Note also that the integers in the array, including the length, are variable length zig zag encoded, not just network byte order, so you have to parse them correctly, too, and cannot just assume 4 bytes per integer.

Then there’s the problem that things other than messages (Types) can be declared within a .proto file so even if the indexes were being honored in the recursive loop implemented by getNestedTypeName(), the exit condition only checks for Type and Namespace, but it is entirely possible to encounter an Enum or a Service or a Method, so it is necessary to iterate over the keys in parent until you find either a Namespace or a Type, and then continue traversing the descriptor hierarchy from there, rather than always assuming the first key is the one to use.

This is relatively simple to implement, by just iterating over the keys until you find one which is instanceof Type or Namespace.

Finally, the code assumes that there is a parsedMessage.package field, which may be there when parsing a .proto string, but is definitely NOT there when parsing a JSON protobuf descriptor. When the root is created via a JSON descriptor, you have to parse the package name by iterating through the Namespace declarations in the parent hierarchy. It’d be great if the package makes it into the JSON descriptor, but until it does, it is probably safer to determine the package name dynamically rather than looking for it from the parser, since it might go away from parsed protos as easily as it could be added to json descriptors.

if (reflection instanceof Namespace && !(reflection instanceof Type) && reflection.nested)
    return reflection.name + '.' + this.getNestedTypeName(reflection.nested)
return keys[0]

will return the fully qualified name without relying on the package string

I’m going to take a stab at fixing the code in ProtoSchema.ts and submitting a PR to fix all of that (and the corresponding changes in the serializer which will generate the correct array by walking the descriptor hierarchy).

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Reactions: 9
  • Comments: 18

Most upvoted comments

On the decode side, it looks like this:

    private decodeHeader(topic: string, buffer: Buffer): ProtoInfo {
        let bufferReader = Reader.create(buffer)
        const magicByte = bufferReader.uint32()
        const schemaId = HostOrder(bufferReader.fixed32())
        const arrayLen = bufferReader.sint32()
        const msgIndexes = new Array<number>(arrayLen)
        for (let i = 0; i < arrayLen; i++) {
            msgIndexes[i] = bufferReader.sint32()
        }
        return {
            magicByte: magicByte,
            schemaId: schemaId,
            msgIndexes: msgIndexes,
            bytesRead: bufferReader.pos,
        }
    }

    public async deserialize(topic: string, buffer: Buffer): Promise<Message<{}>> {
        if (buffer.length < 6) {
            throw new Error(`buffer with length ${buffer.length} is not long enough to contain a protobuf`)
        }
        const protoInfo = this.decodeHeader(topic, buffer)

        const type = await this.protobufResolver.ResolveProtobuf(topic, protoInfo.schemaId, protoInfo.msgIndexes)
        let bufferReader = Reader.create(buffer)
        bufferReader.skip(protoInfo.bytesRead)
        return type.decode(bufferReader)
    }

protobufResolver uses registry client, topic name, and info parsed from wire protocol to resolve a protobuf Type instance, which is then used to decode the protobuf. That allows me to inject whatever logic I want into the deserializer via protobufResolver for figuring out the type that is encoded in the payload, since correctly computing the message type from message indexes isn’t really possible with protobufjs as it is currently implemented. At least not if you also have imported references to other protobufs in your .proto files, since the schema parsed out of the registry won’t include the references. By delegating to a resolver, I can resort to quick hacks like hardcoding the type name based on indexes and topic name, for example.

I’ll probably open-source the serializer and deserializer I built, but it’s not likely to happen for a week or two.

@ideasculptor have you had the chance of working on a PR to address this issue? I have recently started using kafkajs/confluent-schema-registry and I can confirm that the library works for JSON and AVRO types, but fails for PROTOBUF.

I’ll probably open-source the serializer and deserializer I built, but it’s not likely to happen for a week or two.

+1

it’s missing the “message-indexes” part.

image

image

@tulios @Nevon Looks like a lot of people are facing this issue when communicating the data across different services and it is pending for more than 1.5 years. It would be great if same can be picked in next release considering the impact. Thanks in advance.

@Meet-Modi Unfortunately have no time for such contribution now 😦 Using the solution provided by @ideasculptor

Hi @NikitaKemarskiyLeia , did you got a chance to work on the PR?

I have code which encodes it correctly. It isn’t integrated with the schema registry client. I built a separate serializer so that I am using schema registry client only to talk to the registry. Code looks like this:

    private encodePayloadHeader(schemaInfo: SchemaInfo, msgIndexes: number[]): ProtoBuffer {
        let writer = new BufferWriter()
                .uint32(0) // don't zig zag encode. uint32 writes variable number of bytes, so this is 1 byte
                .fixed32(NetworkOrder(schemaInfo.schemaId)) // make big-endian, then write 4 bytes int32
                .sint32(msgIndexes.length) // zig-zag encoded array length

        for (const msgIndex of msgIndexes) {
            writer.sint32(msgIndex) // zig-zag encoded index value
        }
        return writer.finish()
}

I generate the full buffer more or less like this:

        const buf = this.encodePayloadHeader(schemaInfo, msgIndexes)
        const writer = schemaInfo.type.encode(thing)
        return Buffer.concat([buf, writer.finish()])