risingwave: bug: `barrier_inflight_latency` is too high and keeps growing

I’m running NexMark with a self-made query, which consists of one Join and one Agg. What could be the reason that barrier latency keeps growing? It has become > 8min

image image

The query is

create materialized view mv_q6s as
  SELECT
    MAX(B.price) AS final,
    A.id,
    A.seller
  FROM
    auction AS A,
    bid AS B
  WHERE
    A.id = B.auction
    and B.date_time between A.date_time and A.expires
  GROUP BY
    A.id,
    A.seller

Is it stuck? – No

/risedev ctl meta pause

After pause, the barrier number starts to drop. So, it’s not stuck. The pause succeed after 324.06s 😂 and the in-flight barriers were cleared

[cargo-make] INFO - Build Done in 324.06 seconds.

image

So, what could be the reason?

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 16 (9 by maintainers)

Commits related to this issue

Most upvoted comments

While the barrier in-flight latency is high, the source throughput is not extremely low in this issue. I suspect this is somehow related to the bufferred messages when backpressure happens. With some experiements, I can conclude that this is indeed the cause and the behaviour changes come from the permit-based backpressure introduced by #6170.

I deployed one CN using risedev dev full so we use local channel for exchange. Here are the results:

  1. One commit before #6170 (915ac8)
    • avg barrier in-flight latency = ~20s
  2. #6170 (2ac0c8d) with MAX_CHUNK_PERMITS = 28682 (default):
    • avg barrier in-flight latency = ~2.7min
    • overall source throughput is similar to 1) but it is more balanced across the two sources.
  3. #6170 (2ac0c8d) with MAX_CHUNK_PERMITS = 2048:
    • avg barrier in-flight latency = ~30s
    • soure throughput similar to 2) image image image

In summary, the default setting in #6170 makes the number of bufferred messages larger than before by default. This leads to more messages being processed between barrier when backpressure happens so the barrier in-flight latency increases. This won’t affect source throughput because the buffer size won’t affect the actor throughput.

IMO, we can decrease the default buffer size (i.e. lower MAX_CHUNK_PERMITS) as a quick fix and in the future we can use some heuristics to tune it dynamically based on barrier in-flight latency and actor throughput.

@wangrunji0408 points out that the data generator will sleep and delay to meet the ratio of 46:3:1, only if we have set the min.event.gap.in.ns to non-zero. As sleeping for a little time is not accurate in tokio, I set the value to 1000 (then it will sleep at least 1000ns*1024=1ms for each chunk) and tested it… and it seemed to work well?

The throughput of the actor 1-4 in my case is ~70000 rows per sec, which is much larger than the result from @hzxa21. 🤔 This is interesting. Do we test under debug profile before?

image image image

I can consistenly reproduce this issue with risedev d full. I observered abnormal streaming actor metrics:

  • Actor Execution Time for the last fragment (join -> agg -> mv) is consistently at ~1s. This means tha the actor is stuck or almost stuck given that this metric measures the rate of tokio total poll time for the actor (i.e. the max value of this metric should be 1s).

I originally suspected this is caused by high storage read latency but the cache hit rate is high and the read tail latency shown in the metrics is not significant. I will continue investigating into the issue.

image image image

I’m designing a fine-grained backpressure mechanism to control the join actors’ event consumption from two sides. Will first write a design document for this.

I guess the main cause is the “debug build”. 🤔 Maybe we should set a smaller initial permits in debug build.

The original test done by me was run against release build.

I guess the main cause is the “debug build”. 🤔 Maybe we should set a smaller initial permits in debug build.

Some ideas:

  • Why is the throughput of the actor 1-4 only ~2000 rows per second? This seems too low, and what’s the bottleneck? I’ll investigate it later.
  • The current permits of 32k is not that large. If we have an RTT of 20ms, the performance will be effected by false back-pressures if the throughput is higher than 1600k rows per second, which is comparable to the actual performance. We may try turning it half, but I’m afraid this can not resolve the problem entirely.
  • Make the permits size-based may also not work as expected 😦. A chunk with a wide scheme will be large in size, but it doesn’t mean that the throughput will be lower. It really depends on the workload.
  • A large buffer in exchange should only make the first barriers become large. After the buffer is filled, the next epochs should be small. However, we have a hard limit 40 of concurrent barriers. If we reach this limit, then there’ll be another large epoch. This situation repeats over and over again. Will this lead to problem?
  • We do have the mechanism of pausing the source if a source executor has not received a new barrier for some time, introduced by @yezizp2012. But this is not that aggressive and the target is to resolve the meta failure.
  • The dimension tables in Nexmark generator should have fewer records. But as we poll the sources in a fair manner, the actual result is that, there are 15-50x dimension records generated, compared to the bid in the same time. This makes the workload not realistic. Will this lead to problems?

Great work!!! In short, the messages were piled up in the exchange channels.

I agree that we may need to decrease the MAX_CHUNK_PERMITS. 2048 is too small, perhaps we can start from half of the current MAX_CHUNK_PERMITS?

Some other ideas:

  1. If we implement estimate_size() for DataChunk, it looks like a better kind of permit than the number of rows.
  2. Is it possible to report the current permit number to Prometheus and show in Grafana? The current used permit divided by the total permit measures the degree of backpressure, which sounds like a pretty useful metric. The reporting can be done on every barrier.
  3. A random idea: If the number of in-flight barriers reaches the upper bound i.e. 40, we may not only stop injecting barriers but also block the source executor to generate any new events. What do you guys think?

Actor traces:

--- Actor Traces ---
>> Actor 1
[captured 638.914706ms ago]
Actor 1: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.370004022s]
  Epoch 3438920976564224 [!!! 11.070000095s]
    MaterializeExecutor 10000007F (actor 1, executor 127) [!!! 11.070000095s]
      ProjectExecutor 10000007E (actor 1, executor 126) [!!! 11.070000095s]
        HashAggExecutor 10000007C (actor 1, executor 124) [!!! 11.070000095s]
          ProjectExecutor 10000007A (actor 1, executor 122) [!!! 8.480000077s]
            FilterExecutor 100000078 (actor 1, executor 120) [!!! 8.480000077s]
  Subtask [!!! 270.370004022s]
    HashJoinExecutor 100000076 (actor 1, executor 118) [!!! 3.760000039s]
      store_get [0ns]

>> Actor 2
[captured 292.524903ms ago]
Actor 2: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.710004024s]
  Epoch 3438921521037312 [!!! 1.190000012s]
    MaterializeExecutor 20000007F (actor 2, executor 127) [!!! 1.190000012s]
      ProjectExecutor 20000007E (actor 2, executor 126) [!!! 1.190000012s]
        HashAggExecutor 20000007C (actor 2, executor 124) [!!! 1.190000012s]
          ProjectExecutor 20000007A (actor 2, executor 122) [0ns]
            FilterExecutor 200000078 (actor 2, executor 120) [0ns]
  Subtask [!!! 270.710004024s]

>> Actor 3
[captured 535.509605ms ago]
Actor 3: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.470004023s]
  Epoch 3438920976564224 [!!! 11.750000099s]
    MaterializeExecutor 30000007F (actor 3, executor 127) [260.000003ms]
      ProjectExecutor 30000007E (actor 3, executor 126) [260.000003ms]
        HashAggExecutor 30000007C (actor 3, executor 124) [260.000003ms]
          store_get [0ns]
  Subtask [!!! 270.470004023s]

>> Actor 4
[captured 621.001006ms ago]
Actor 4: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.390004022s]
  Epoch 3438920976564224 [!!! 11.270000096s]
    MaterializeExecutor 40000007F (actor 4, executor 127) [490.000005ms]
      ProjectExecutor 40000007E (actor 4, executor 126) [490.000005ms]
        HashAggExecutor 40000007C (actor 4, executor 124) [490.000005ms]
          store_get [0ns]
  Subtask [!!! 270.390004022s]

>> Actor 5
[captured 125.374701ms ago]
Actor 5: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.880004026s]
  Epoch 3438937023578112 [!!! 11.570000099s]
    ProjectExecutor 500000070 (actor 5, executor 112) [!!! 9.000000082s]
      SourceExecutor 50000005C (actor 5, executor 92) [!!! 9.000000082s]
        source_recv_barrier [!!! 11.570000099s]

>> Actor 6
[captured 966.80971ms ago]
Actor 6: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.040004017s]
  Epoch 3438937023578112 [!!! 10.730000091s]
    ProjectExecutor 600000070 (actor 6, executor 112) [!!! 8.680000077s]
      SourceExecutor 60000005C (actor 6, executor 92) [!!! 8.680000077s]
        source_recv_barrier [!!! 10.730000091s]

>> Actor 7
[captured 321.697103ms ago]
Actor 7: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.690004024s]
  Epoch 3438937023578112 [!!! 11.380000097s]
    ProjectExecutor 700000070 (actor 7, executor 112) [!!! 8.66000008s]
      SourceExecutor 70000005C (actor 7, executor 92) [!!! 8.66000008s]
        source_recv_barrier [!!! 11.380000097s]

>> Actor 8
[captured 772.806408ms ago]
Actor 8: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.23000402s]
  Epoch 3438937023578112 [!!! 10.920000093s]
    ProjectExecutor 800000070 (actor 8, executor 112) [!!! 8.530000077s]
      SourceExecutor 80000005C (actor 8, executor 92) [!!! 8.530000077s]
        source_recv_barrier [!!! 10.920000093s]

>> Actor 9
[captured 719.894607ms ago]
Actor 9: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.280004021s]
  Epoch 3438937023578112 [590.000006ms]
    dispatch_chunk [550.000006ms]
      LocalOutput (actor 2) [550.000006ms]
[Detached 5]
  source_recv_barrier [590.000006ms]

>> Actor 10
[captured 111.684301ms ago]
Actor 10: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.890004026s]
  Epoch 3438937023578112 [!!! 1.250000013s]
    dispatch_chunk [!!! 1.180000012s]
      LocalOutput (actor 4) [!!! 1.180000012s]
[Detached 5]
  source_recv_barrier [!!! 1.250000013s]

>> Actor 11
[captured 671.793507ms ago]
Actor 11: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.330004021s]
  Epoch 3438937023578112 [!!! 10.580000091s]
    dispatch_chunk [!!! 9.640000084s]
      LocalOutput (actor 4) [750.000007ms]
[Detached 5]
  source_recv_barrier [!!! 10.580000091s]

>> Actor 12
[captured 121.743801ms ago]
Actor 12: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.880004026s]
  Epoch 3438937023578112 [!!! 11.520000098s]
    ProjectExecutor C00000073 (actor 12, executor 115) [440.000005ms]
      SourceExecutor C00000058 (actor 12, executor 88) [440.000005ms]
        source_recv_barrier [!!! 11.520000098s]

--- RPC Traces ---
>> RPC 127.0.0.1:5688 (101)
[captured 39.3003ms ago]
/stream_service.StreamService/BarrierComplete:101 [214.000003729s]
  collect_barrier (epoch 3438923400871936) [!!! 214.000003729s]

>> RPC 127.0.0.1:5688 (103)
[captured 1.6252ms ago]
/stream_service.StreamService/BarrierComplete:103 [207.010003342s]
  collect_barrier (epoch 3438923762761728) [!!! 207.010003342s]

>> RPC 127.0.0.1:5688 (105)
[captured 538.283605ms ago]
/stream_service.StreamService/BarrierComplete:105 [200.000002976s]
  collect_barrier (epoch 3438924224135168) [!!! 200.000002976s]

>> RPC 127.0.0.1:5688 (107)
[captured 15.6766ms ago]
/stream_service.StreamService/BarrierComplete:107 [189.000001932s]
  collect_barrier (epoch 3438924647497728) [!!! 189.000001932s]

>> RPC 127.0.0.1:5688 (109)
[captured 597.400906ms ago]
/stream_service.StreamService/BarrierComplete:109 [186.000001697s]
  collect_barrier (epoch 3438925402537984) [!!! 186.000001697s]

>> RPC 127.0.0.1:5688 (111)
[captured 661.197406ms ago]
/stream_service.StreamService/BarrierComplete:111 [179.000001083s]
  collect_barrier (epoch 3438925561331712) [!!! 179.000001083s]

>> RPC 127.0.0.1:5688 (113)
[captured 643.721006ms ago]
/stream_service.StreamService/BarrierComplete:113 [171.000000277s]
  collect_barrier (epoch 3438926015758336) [!!! 171.000000277s]

>> RPC 127.0.0.1:5688 (115)
[captured 769.848208ms ago]
/stream_service.StreamService/BarrierComplete:115 [163.999999582s]
  collect_barrier (epoch 3438926541094912) [!!! 163.999999582s]

>> RPC 127.0.0.1:5688 (117)
[captured 704.509207ms ago]
/stream_service.StreamService/BarrierComplete:117 [155.999999444s]
  collect_barrier (epoch 3438926991392768) [!!! 155.999999444s]

>> RPC 127.0.0.1:5688 (119)
[captured 530.020705ms ago]
/stream_service.StreamService/BarrierComplete:119 [149.999999346s]
  collect_barrier (epoch 3438927520202752) [!!! 149.999999346s]

>> RPC 127.0.0.1:5688 (121)
[captured 456.127504ms ago]
/stream_service.StreamService/BarrierComplete:121 [143.999999274s]
  collect_barrier (epoch 3438927924953088) [!!! 143.999999274s]

>> RPC 127.0.0.1:5688 (123)
[captured 950.910509ms ago]
/stream_service.StreamService/BarrierComplete:123 [135.999999189s]
  collect_barrier (epoch 3438928323084288) [!!! 135.999999189s]

>> RPC 127.0.0.1:5688 (125)
[captured 985.98871ms ago]
/stream_service.StreamService/BarrierComplete:125 [129.999999125s]
  collect_barrier (epoch 3438928814866432) [!!! 129.999999125s]

>> RPC 127.0.0.1:5688 (127)
[captured 969.40781ms ago]
/stream_service.StreamService/BarrierComplete:127 [123.999999062s]
  collect_barrier (epoch 3438929205723136) [!!! 123.999999062s]

>> RPC 127.0.0.1:5688 (129)
[captured 427.070004ms ago]
/stream_service.StreamService/BarrierComplete:129 [112.999998946s]
  collect_barrier (epoch 3438929600053248) [!!! 112.999998946s]

>> RPC 127.0.0.1:5688 (131)
[captured 247.966202ms ago]
/stream_service.StreamService/BarrierComplete:131 [111.999998935s]
  collect_barrier (epoch 3438930356535296) [!!! 111.999998935s]

>> RPC 127.0.0.1:5688 (133)
[captured 793.225108ms ago]
/stream_service.StreamService/BarrierComplete:133 [103.999998838s]
  collect_barrier (epoch 3438930433802240) [!!! 103.999998838s]

>> RPC 127.0.0.1:5688 (136)
[captured 896.900209ms ago]
/stream_service.StreamService/BarrierComplete:136 [96.999998749s]
  collect_barrier (epoch 3438930922373120) [!!! 96.999998749s]

>> RPC 127.0.0.1:5688 (138)
[captured 139.489801ms ago]
/stream_service.StreamService/BarrierComplete:138 [88.999998677s]
  collect_barrier (epoch 3438931374309376) [!!! 88.999998677s]

>> RPC 127.0.0.1:5688 (140)
[captured 631.400406ms ago]
/stream_service.StreamService/BarrierComplete:140 [83.009998637s]
  collect_barrier (epoch 3438931948208128) [!!! 83.009998637s]

>> RPC 127.0.0.1:5688 (142)
[captured 951.031909ms ago]
/stream_service.StreamService/BarrierComplete:142 [76.009998606s]
  collect_barrier (epoch 3438932309311488) [!!! 75.999998606s]

>> RPC 127.0.0.1:5688 (144)
[captured 724.331607ms ago]
/stream_service.StreamService/BarrierComplete:144 [69.999998618s]
  collect_barrier (epoch 3438932747026432) [!!! 69.999998618s]

>> RPC 127.0.0.1:5688 (146)
[captured 617.180106ms ago]
/stream_service.StreamService/BarrierComplete:146 [62.999998581s]
  collect_barrier (epoch 3438933155184640) [!!! 62.999998581s]

>> RPC 127.0.0.1:5688 (148)
[captured 437.205304ms ago]
/stream_service.StreamService/BarrierComplete:148 [54.99999914s]
  collect_barrier (epoch 3438933620752384) [!!! 54.99999914s]

>> RPC 127.0.0.1:5688 (150)
[captured 801.369808ms ago]
/stream_service.StreamService/BarrierComplete:150 [47.999999877s]
  collect_barrier (epoch 3438934156967936) [!!! 47.999999877s]

>> RPC 127.0.0.1:5688 (152)
[captured 195.291802ms ago]
/stream_service.StreamService/BarrierComplete:152 [38.000000219s]
  collect_barrier (epoch 3438934591930368) [!!! 38.000000219s]

>> RPC 127.0.0.1:5688 (154)
[captured 589.118506ms ago]
/stream_service.StreamService/BarrierComplete:154 [37.000000211s]
  collect_barrier (epoch 3438935286874112) [!!! 37.000000211s]

>> RPC 127.0.0.1:5688 (156)
[captured 409.803304ms ago]
/stream_service.StreamService/BarrierComplete:156 [35.000000203s]
  collect_barrier (epoch 3438935326720000) [!!! 35.000000203s]

>> RPC 127.0.0.1:5688 (158)
[captured 689.820907ms ago]
/stream_service.StreamService/BarrierComplete:158 [32.000000187s]
  collect_barrier (epoch 3438935469522944) [!!! 32.000000187s]

>> RPC 127.0.0.1:5688 (160)
[captured 507.576805ms ago]
/stream_service.StreamService/BarrierComplete:160 [30.000000178s]
  collect_barrier (epoch 3438935647715328) [!!! 30.000000178s]

>> RPC 127.0.0.1:5688 (162)
[captured 315.319903ms ago]
/stream_service.StreamService/BarrierComplete:162 [28.000000154s]
  collect_barrier (epoch 3438935790649344) [!!! 28.000000154s]

>> RPC 127.0.0.1:5688 (164)
[captured 681.562507ms ago]
/stream_service.StreamService/BarrierComplete:164 [26.000000132s]
  collect_barrier (epoch 3438935934435328) [!!! 26.000000132s]

>> RPC 127.0.0.1:5688 (166)
[captured 759.699107ms ago]
/stream_service.StreamService/BarrierComplete:166 [23.000000096s]
  collect_barrier (epoch 3438936041586688) [!!! 23.000000096s]

>> RPC 127.0.0.1:5688 (168)
[captured 444.740304ms ago]
/stream_service.StreamService/BarrierComplete:168 [21.000000089s]
  collect_barrier (epoch 3438936232951808) [!!! 21.000000089s]

>> RPC 127.0.0.1:5688 (170)
[captured 696.196507ms ago]
/stream_service.StreamService/BarrierComplete:170 [11.000000094s]
  collect_barrier (epoch 3438936384798720) [!!! 11.000000094s]

>> RPC 127.0.0.1:5688 (171)
[captured 444.7µs ago]
<initial>

>> RPC 127.0.0.1:5688 (91)
[captured 230.813202ms ago]
/stream_service.StreamService/BarrierComplete:91 [248.020004116s]
  collect_barrier (epoch 3438920976564224) [!!! 248.020004116s]

>> RPC 127.0.0.1:5688 (93)
[captured 43.6223ms ago]
/stream_service.StreamService/BarrierComplete:93 [239.990004091s]
  collect_barrier (epoch 3438921521037312) [!!! 239.990004091s]

>> RPC 127.0.0.1:5688 (95)
[captured 184.907402ms ago]
/stream_service.StreamService/BarrierComplete:95 [233.000004065s]
  collect_barrier (epoch 3438922057842688) [!!! 233.000004065s]

>> RPC 127.0.0.1:5688 (97)
[captured 563.726205ms ago]
/stream_service.StreamService/BarrierComplete:97 [226.000004041s]
  collect_barrier (epoch 3438922507812864) [!!! 226.000004041s]

>> RPC 127.0.0.1:5688 (99)
[captured 562.578005ms ago]
/stream_service.StreamService/BarrierComplete:99 [219.000003921s]
  collect_barrier (epoch 3438922942054400) [!!! 219.000003921s]

[cargo-make] INFO - Build Done in 1.62 seconds.