risingwave: bug: query `_rw_kafka_timestamp` returns empty result.

Describe the bug

Query _rw_kafka_timestamp returns empty result.

To Reproduce

Run following sqls:

create source s1 (v1 int, v2 varchar) with (
  connector = 'kafka',
  topic = 'kafka_1_partition_topic',
  properties.bootstrap.server = '127.0.0.1:29092',
  scan.startup.mode = 'earliest'
) row format json;

select _rw_kafka_timestamp from s1;

Returns empty result.

Expected behavior

It should return _rw_kafka_timestamp data.

Additional context

No.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 1
  • Comments: 16 (16 by maintainers)

Commits related to this issue

Most upvoted comments

Pulsar, kinesis, and pubsub don’t provide message timestamps along with the message itself. It is more like a Kafka-specific feature.

Sorry, I missed this PR. I raised a similar question at https://github.com/risingwavelabs/rfcs/pull/20/files#r1019767093. I think we should settle (get the PR merged) the RFC before implementing it. I’m not sure if anyone has agreed on the final design. @liurenjie1024

IIRC, we had only reached the consensus that we don’t have to implement the extra column in our first version. Users generally will attach a custom timestamp in their records so that they can filter data by time even without an MQ-specific timestamp.

This is a list of internal columns supported by Presto Kafka Connector: https://prestodb.io/docs/current/connector/kafka.html#internal-columns Presto doesn’t have a special timestamp variable for Kafka. It’s true that there are others such as _key or _message_corrupt, which we can learn from. But they don’t seem to have need for the timestamp.

Personally, I think there’s not much harm to introduce this feature. But I still prefer to delay it until we have the actual user requirement.

This will cause inconsistent timestamp, seems we have two kinds of timestamp in future:

  1. support range search and support get the timestamp of each record
  2. only support range search

for example:

for kafka, we can use _rw_kafka_timestamp in following way:

  1. range search: select * from source_abc where _rw_kafka_timestamp < '2022-01-02 12:00:00+00:00'
  2. get timestamp of each record: select _rw_kafka_timestamp from source_abc

for pulsar, we can use _rw_pulsar_timestamp in following way:

  1. range search: select * from source_abc where _rw_pulsar_timestamp < '2022-01-02 12:00:00+00:00' but we can’t get timestamp of each record by select _rw_pulsar_timestamp from source_abc

So to solve this inconsistent, maybe we can:

  1. declare clearly how timestamp of each source work in some place, for example:
    • ‘_rw_kafka_timestamp’
      • support range search
      • support get timestamp of each record
    • ‘_rw_pulsar_timestamp’
      • support range search
  2. use different name to distiguish this two kinds of timestamp, for example:
    • ‘_rw_**_full_timestamp’ indicate first kind of timestamp
    • ‘_rw_**_timestamp’ indicate second kind of timestamp

Sorry, my wrong understand of the time🥵. Seems it’s a unix timestamp.

This timestamp is a timestamp in the broker’s timezone

Any reference for this statement?

I found this KIP-32:

The good things about LogAppendTime are:

  1. Broker is more robust.
  2. Monotonically increasing.
  3. Deterministic behavior for log rolling and retention.
  4. If CreateTime is required, it can always be put into the message payload.

In time zones with daylight saving, local time without time zone is not monotonic. This suggests the LogAppendTime from broker should not be affected by local time zone settings. We can also run a simple example to confirm (or refute) this.