pinot: Fix the issue with "pinot-pulsar" module (potentially library conflicts)

Apache Pulsar connector has been added from https://github.com/apache/pinot/pull/7026

However, it currently is facing some issues on runtime (potentially dependency conflicts). We need to fix the conflicts to make the connector work correctly.

2021/08/06 10:53:35.451 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel] [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: myTable_REALTIME, partition: myTable__0__0__20210806T1753Z
java.lang.RuntimeException: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
        at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.api.MessageId.fromByteArray(MessageId.java:58) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.<init>(MessageIdStreamOffset.java:47) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.<init>(LLRealtimeSegmentDataManager.java:1183) ~[pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:349) ~[pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:162) ~[pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:168) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:89) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
        at org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.skipField(ByteBufCodedInputStream.java:192) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.common.api.proto.PulsarApi$MessageIdData$Builder.mergeFrom(PulsarApi.java:1602) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.impl.MessageIdImpl.fromByteArray(MessageIdImpl.java:106) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newMessageIdFromByteArray$3(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:35) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        ... 21 more
pinot-server_1      | 2021/08/05 14:25:59.009 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel] [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: datasource_610bf4bf19200003007e55b3_REALTIME, partition: datasource_610bf4bf19200003007e55b3__0__0__20210805T1425Z
pinot-server_1      | java.lang.IndexOutOfBoundsException: readerIndex(1) + length(8) exceeds writerIndex(6): UnpooledHeapByteBuf(ridx: 1, widx: 6, cap: 6/6)
pinot-server_1      | 	at shaded.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1478) ~[pinot-azure-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at shaded.io.netty.buffer.AbstractByteBuf.readLongLE(AbstractByteBuf.java:845) ~[pinot-azure-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.readRawLittleEndian64(ByteBufCodedInputStream.java:309) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.skipField(ByteBufCodedInputStream.java:177) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.common.api.proto.PulsarApi$MessageIdData$Builder.mergeFrom(PulsarApi.java:1602) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.client.impl.MessageIdImpl.fromByteArray(MessageIdImpl.java:106) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
pinot-server_1      | 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
pinot-server_1      | 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
pinot-server_1      | 	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
pinot-server_1      | 	at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newMessageIdFromByteArray$3(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:35) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 25 (13 by maintainers)

Commits related to this issue

Most upvoted comments

Hi The PR was upgraded from draft status to ready for review. Still waiting to be merged