risingwave: perf bug: barriers pile up due to temporal filter

Longevity reglngvty-20231204-140246 failed

!!! longevity Result!!!
Today's date:2023-12-05
Result               FAIL                
Pipeline Message     1@Monday nexmark q8 q9 q15 q16 with 200000 throughput
TestBed              kubebench/3264g-medium-all-affinity
RW Version           nightly-20231204    
Test Start time      2023-12-04 14:08:16 
Test End time        2023-12-05 02:10:30 
Namespace            reglngvty-20231204-140246
Queries              nexmark_q8,nexmark_q9,nexmark_q15,nexmark_q16
Grafana Metric       https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=reglngvty-20231204-140246&from=1701698896000&to=1701742230000
Grafana Logs         https://grafana.test.risingwave-cloud.xyz/d/liz0yRCZz1/log-search-dashboard?orgId=1&var-data_source=Logging:%20test-useast1-eks-a&var-namespace=reglngvty-20231204-140246&from=1701698896000&to=1701742230000
Buildkite Job        https://buildkite.com/risingwave-test/longevity-test/builds/840
Crash Container logs https://rw-qa-artifacts-public.s3.us-east-1.amazonaws.com/longevity/840_logs.txt
Report               https://rw-qa-artifacts-public.s3.us-east-1.amazonaws.com/longevity/840_report.txt

The barriers were blocked by something per 30 minutes… Meanwhile, we do a select limit 1 check every 30 minutes. Is it an coincident?

image

Running command SELECT * FROM nexmark_q16_1 LIMIT 1

About this issue

  • Original URL
  • State: closed
  • Created 7 months ago
  • Comments: 15 (14 by maintainers)

Most upvoted comments

After careful consideration, I think that we should make a back-pressure mechanism in NowExecutor similar with other source executors. Instead of inserting the timestamp row per epoch strictly, it should generate the timestamp row only for the latest epoch, skipping the accumulated barriers in the barrier_receiver channel.

I mentioned this is “similar with other source executors” because other source executors will always prioritize barrier events than data events. Here is the same. The only difference is that the “delayed” data events is not static, but changing according to the last passed barrier.

The downside of this approach, as pointed by @BugenZhao, is that the batch query will not get consistent results once the epoch it reads was just skipped. However, it seems complicated to fix this, and I tend to put this aside.

IIUC, there are two benefits of this approach:

  1. Storage iterator will be created less frequently and there will be more data involved in each scan
  2. There will be less range tombstones generated

Essentially, we reduce the frequency of scan, while the total amount of data to be scanned is the same with and without this optimization. Therefore, it will help only if the bottleneck lies in creating the storage iterator, not in the actual row scan. Have we verified that is the case?

After careful consideration, I think that we should make a back-pressure mechanism in NowExecutor similar with other source executors. Instead of inserting the timestamp row per epoch strictly, it should generate the timestamp row only for the latest epoch, skipping the accumulated barriers in the barrier_receiver channel.

I mentioned this is “similar with other source executors” because other source executors will always prioritize barrier events than data events. Here is the same. The only difference is that the “delayed” data events is not static, but changing according to the last passed barrier.

The downside of this approach, as pointed by @BugenZhao, is that the batch query will not get consistent results once the epoch it reads was just skipped. However, it seems complicated to fix this, and I tend to put this aside.

After careful consideration, I think that we should make a back-pressure mechanism in NowExecutor similar with other source executors. Instead of inserting the timestamp row per epoch strictly, it should generate the timestamp row only for the latest epoch, skipping the accumulated barriers in the barrier_receiver channel.

This was implemented in https://github.com/risingwavelabs/risingwave/pull/13271. May consider merging it?

As for the batch query consistency, in this PR the batch query consistency between state tables is still preserved, because for any given epoch, the state of now executor is still consistent to the state of dynamic filter. The only assumption we break is that the value now executor queried from an epoch is the same as the given epoch, which I think is acceptable.

The downside of this approach, as pointed by @BugenZhao, is that the batch query will not get consistent results once the epoch it reads was just skipped. However, it seems complicated to fix this, and I tend to put this aside.

  1. We can let the streaming job with temporal filter reuse the same Now source actor.
  2. Furthermore, We can let the “now” in batch query use the value of the state table in the common now source(convert it to a join).

The first can maintain consistency among the streaming queries, and the second can maintain the same “now” semantic currently but hurt the batch query performance with now(). Just give a solution, not an emergency to do them.

The root cause of the problem is: the speed of DynamicFilterExecutor reading rows to delete can not catch up with now()

Specifically,

  1. NowExecutor emits a timestamp row for every epoch, and this is not affected by the back-pressure mechanism.
  2. When DynamicFilterExecutor receives this timestamp and it needs to scan all rows expired by it. This might cost more than 1 second, especially when data is not cached. Let’s say that the scan operation cost 5s to complete.
  3. Every 1 seconds, a new barrier is generated; while every 5 seconds, a barrier can be passed, obviously, more and more barriers accumulated, forming the increasing barrier latency in the Grafana.
  4. In reglngvty-20231204-140246, finally, all rows in DynamicFilterExecutor’s state table are deleted after a while. Afterwards, the DynamicFilterExecutor scans nothing, so that’s prettty fast, allowing it to catch up with Now
  5. Then, new events come from source, which previously was backpressured. After some while, back to 4 again.