druid: "java.lang.NullPointerException" occurs when using APPROX_QUANTILE_DS

Affected Version

  • 0.21.1
  • 2 overlords, 2 coordinators
  • 2 routers, 2 brokers
  • 3 historicals, 11 middle managers

Description

Hello, I am trying to calculate quantiles using “APPROX_QUANTILE_DS()”. but java.lang.NullPointerException occurs in my query. Exception occurs in “org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedMemBuffer”.

So, I think this is due to out of memory. (There is not enough memory available for the operation) However, increasing the memory does not solve the problem.

Also, the problem only occurs when using some service codes (e.g. ‘top’, ‘cafe’)

What I’m curious about is:

  1. Is it a memory problem? Or do you think there is another cause?
  2. For k=128, how much memory is needed for the quantile operation?
  3. Should I use a different aggregator to compute the quantile?

I don’t have any good ideas to solve the problem 😦

my query:

SELECT COALESCE("mytable".country, '_') AS country,
  (APPROX_QUANTILE_DS("mytable".quantile_duration, 0.9)) AS quantile
FROM "mytable"
WHERE ("mytable".service_code = 'top')
AND __time >= '2021-06-01' AND __time <= '2021-06-01'
GROUP BY COALESCE("mytable".country, '_')

datasource configuration:

  • __time : 2021-06-01/2021-06-13
  • dimension: country (cardinality 239), service_code(cardinality 129) etc.
  • metric: quantile_duration (Apply quantilesDoublesSketch, k=128(default)) etc.
  • 7 segments (total 1.42 GB)

full log:

at  org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedMemBuffer(DirectUpdateDoublesSketch.java:254)
at  org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedBuffer(DirectUpdateDoublesSketch.java:238)
at  org.apache.datasketches.quantiles.DoublesMergeImpl.mergeInto(DoublesMergeImpl.java:84)
at  org.apache.datasketches.quantiles.DoublesUnionImpl.updateLogic(DoublesUnionImpl.java:200)
at  org.apache.datasketches.quantiles.DoublesUnionImpl.update(DoublesUnionImpl.java:118)
at  org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchMergeAggregator.updateUnion(DoublesSketchMergeAggregator.java:80)
at  org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchMergeBufferAggregator.aggregate(DoublesSketchMergeBufferAggregator.java:66)
at  org.apache.druid.query.aggregation.AggregatorAdapters.aggregateBuffered(AggregatorAdapters.java:164)
at  org.apache.druid.query.groupby.epinephelinae.AbstractBufferHashGrouper.aggregate(AbstractBufferHashGrouper.java:161)
at  org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.aggregate(SpillingGrouper.java:172)
at  org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouper.aggregate(ConcurrentGrouper.java:269)
at  org.apache.druid.query.groupby.epinephelinae.Grouper.aggregate(Grouper.java:85)
at  org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.lambda$createGrouperAccumulatorPair$2(RowBasedGrouperHelper.java:332)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:44)
at  org.apache.druid.java.util.common.guava.ConcatSequence.lambda$accumulate$0(ConcatSequence.java:41)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.FilteringAccumulator.accumulate(FilteringAccumulator.java:41)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:44)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.FilteredSequence.accumulate(FilteredSequence.java:45)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.ConcatSequence.accumulate(ConcatSequence.java:41)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner$1.accumulate(SpecificSegmentQueryRunner.java:87)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner.doNamed(SpecificSegmentQueryRunner.java:171)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner.access$100(SpecificSegmentQueryRunner.java:44)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner$2.wrap(SpecificSegmentQueryRunner.java:153)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.query.CPUTimeMetricQueryRunner$1.wrap(CPUTimeMetricQueryRunner.java:78)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:247)
at  org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:234)
at  java.util.concurrent.FutureTask.run(FutureTask.java:266)
at  org.apache.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:247)
at  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at  java.lang.Thread.run(Thread.java:748)

Any help would be greatly appreciated.

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 20 (14 by maintainers)

Commits related to this issue

Most upvoted comments

Yes, I think the problem is too many items per country. Druid uses a fixed-size buffer per row to keep the sketch (DoublesSketch). Since the buffer size is fixed but Druid doesn’t know the number of items in advance, it estimates the buffer size to be large enough to hold one billion items in the sketch. So, when you have less items than one billion, the sketch can fit in the buffer and everything works well. The interesting part is when you have more items than one billion. In that case, Druid lets the sketch allocate extra heap memory to hold those items that don’t fit in the buffer. However, DoublesSketch is not working as we expected and throws NPE when it tries to allocate more memory. This issue is filed in https://github.com/apache/datasketches-java/issues/358.

As a workaround, you could use other functions to compute approximate quantiles, such as DS_QUANTILES_SKETCH or APPROX_QUANTILE. Note that APPROX_QUANTILE uses the deprecated approximate histogram aggregator and its accuracy might be not great.

As Lee Rhodes said, it might take quite a while to fix the root cause and go through release cycles for datasketches-memory and datasketches-java. Therefore I would suggest using the workaround that I mentioned above, namely increasing the MAX_STREAM_LENGTH constant. It affects the size pre-allocated for each sketch in the BufferAggregator. The assumption was that due to data fragmentation across multiple dimensions with power-law distribution only a small number of sketches will reach that size and move to on-heap memory. Since this mechanism is broken now, let’s set a much higher limit until it is fixed. And let’s do it quickly before 0.22 branch is created. I can do a pull request if we agree on the value.

Here is the size of one slot in the BufferAggregator in bytes for the default sketch parameter K=128 for different values of MAX_STREAM_LENGTH: 1B (current): 24608 10B: 28704 100B: 31776 1T: 34848

I suggest setting to 1T.

Hi @AlexanderSaydakov, thank you for taking a look. It does fail in the Druid master branch. You can easily reproduce it by running DoublesSketchAggregatorTest.buildingSketchesAtQueryTime() after setting DoublesSketchAggregatorFactory.MAX_STREAM_LENGTH to something very low, like 10.

Could someone point to the code that allocates this memory for BufferAggregator please?

Those buffers are allocated in DruidProcessingModule.

☝️ The above comment is the druid table result. This is the value after already rolling-up with quantilesDoublesSketch and becoming ingestion.

The number of rows in the original table is as follows. query:

SELECT
  country,
  SUM("count") AS total_num_rows_original
FROM "mytable"
WHERE __time >= '2021-06-01' AND __time <= '2021-06-01' AND service_code = 'top'
GROUP BY 1
ORDER BY 2 DESC

query result:

{"country":"kr","total_num_rows_original":1082227280}
{"country":"us","total_num_rows_original":10978845}
{"country":"jp","total_num_rows_original":2896190}
{"country":"ca","total_num_rows_original":2767109}
{"country":"au","total_num_rows_original":1862148}
{"country":"vn","total_num_rows_original":1718031}
{"country":"nz","total_num_rows_original":575751}
{"country":"de","total_num_rows_original":556492}
{"country":"sg","total_num_rows_original":536305}
{"country":"id","total_num_rows_original":425479}
{"country":"hk","total_num_rows_original":373920}
{"country":"ph","total_num_rows_original":364786}
{"country":"","total_num_rows_original":361175}
{"country":"th","total_num_rows_original":360037}
{"country":"my","total_num_rows_original":333746}
{"country":"gb","total_num_rows_original":324027}
{"country":"mx","total_num_rows_original":240169}
{"country":"ae","total_num_rows_original":237182}
...
{"country":"ad","total_num_rows_original":3}
{"country":"gw","total_num_rows_original":3}
{"country":"so","total_num_rows_original":3}
{"country":"mq","total_num_rows_original":1}
{"country":"sy","total_num_rows_original":1}

If total aggregation is performed, the number of original rows is about 81 billion, up to 20 times the value of N in table(https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch.html)

But the number of bytes required is 2^36 ~ 2^37 about 81 billion rows, increasing by 1 KB on a log scale. Based on this calculation, 30KB to 32KB seems to be sufficient.