velox: ParquetReader passes wrong columnIds array to DuckDB::ParquetReader
The following query fails with index out of bound error:
select count(*) from lineitem where partkey <20000000;
Call stack
std::__1::unique_ptr<duckdb::ColumnReader, std::__1::default_delete<duckdb::ColumnReader> >::get() const memory:2318
duckdb::StructColumnReader::GetChildReader(unsigned long long) parquet-amalgamation.cpp:13520
duckdb::ParquetReader::ScanInternal(duckdb::ParquetReaderScanState&, duckdb::DataChunk&) parquet-amalgamation.cpp:16349
duckdb::ParquetReader::Scan(duckdb::ParquetReaderScanState&, duckdb::DataChunk&) parquet-amalgamation.cpp:16283
facebook::velox::parquet::ParquetRowReader::next(unsigned long long, std::__1::shared_ptr<facebook::velox::BaseVector>&) ParquetReader.cpp:150
facebook::velox::connector::hive::HiveDataSource::next(unsigned long long) HiveConnector.cpp:495
facebook::velox::exec::TableScan::getOutput() TableScan.cpp:95
facebook::velox::exec::Driver::runInternal(std::__1::shared_ptr<facebook::velox::exec::Driver>&, std::__1::shared_ptr<facebook::velox::exec::BlockingState>*) Driver.cpp:375
facebook::velox::exec::Driver::run(std::__1::shared_ptr<facebook::velox::exec::Driver>) Driver.cpp:459
facebook::velox::exec::Driver::enqueue(std::__1::shared_ptr<facebook::velox::exec::Driver>)::$_0::operator()() const Driver.cpp:195
void folly::detail::function::FunctionTraits<void ()>::callSmall<facebook::velox::exec::Driver::enqueue(std::__1::shared_ptr<facebook::velox::exec::Driver>)::$_0>(folly::detail::function::Data&) Function.h:371
folly::detail::function::FunctionTraits<void ()>::operator()() Function.h:400
folly::ThreadPoolExecutor::runTask(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&) Exception.h:279
folly::ThreadPoolExecutor::runTask(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&) Executor.h:235
folly::ThreadPoolExecutor::runTask(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&) ThreadPoolExecutor.cpp:116
folly::CPUThreadPoolExecutor::threadRun(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>) CPUThreadPoolExecutor.cpp:302
decltype(*(std::__1::forward<folly::ThreadPoolExecutor*&>(fp0)).*fp(std::__1::forward<std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>&>(fp1))) std::__1::__invoke<void (folly::ThreadPoolExecutor::*&)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*&, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>&, void>(void (folly::ThreadPoolExecutor::*&)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*&, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>&) type_traits:3688
std::__1::__bind_return<void (folly::ThreadPoolExecutor::*)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), std::__1::tuple<folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> >, std::__1::tuple<>, __is_valid_bind_return<void (folly::ThreadPoolExecutor::*)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), std::__1::tuple<folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> >, std::__1::tuple<> >::value>::type std::__1::__apply_functor<void (folly::ThreadPoolExecutor::*)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), std::__1::tuple<folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> >, 0ul, 1ul, std::__1::tuple<> >(void (folly::ThreadPoolExecutor::*&)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), std::__1::tuple<folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> >&, std::__1::__tuple_indices<0ul, 1ul>, std::__1::tuple<>&&) functional:2852
std::__1::__bind_return<void (folly::ThreadPoolExecutor::*)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), std::__1::tuple<folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> >, std::__1::tuple<>, __is_valid_bind_return<void (folly::ThreadPoolExecutor::*)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), std::__1::tuple<folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread> >, std::__1::tuple<> >::value>::type std::__1::__bind<void (folly::ThreadPoolExecutor::*)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>&>::operator()<>() functional:2885
void folly::detail::function::FunctionTraits<void ()>::callSmall<std::__1::__bind<void (folly::ThreadPoolExecutor::*)(std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*, std::__1::shared_ptr<folly::ThreadPoolExecutor::Thread>&> >(folly::detail::function::Data&) Function.h:371
folly::detail::function::FunctionTraits<void ()>::operator()() Function.h:400
folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()::operator()() NamedThreadFactory.h:40
decltype(std::__1::forward<folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>(fp)()) std::__1::__invoke<folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>(folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()&&) type_traits:3747
void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>&, std::__1::__tuple_indices<>) thread:280
void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()> >(void*) thread:291
_pthread_start 0x00007fff205a28fc
thread_start 0x00007fff2059e443
This is because DuckDB creates readers for all table columns even they are not used in the query and I created issue https://github.com/duckdb/duckdb/issues/2898 while ParquetRowReader only include the columns with filters:
ParquetRowReader::ParquetRowReader(
std::shared_ptr<::duckdb::ParquetReader> reader,
const dwio::common::RowReaderOptions& options,
memory::MemoryPool& pool)
: reader_(std::move(reader)), pool_(pool) {
...
std::vector<::duckdb::column_t> columnIds;
columnIds.reserve(rowType_->size());
duckdbRowType_.reserve(rowType_->size());
for (auto& node : filter) {
columnIds.push_back(node.column);
duckdbRowType_.push_back(reader_->return_types[node.column]);
}
In DuckDB ParquetReader::ScanInternal(), it gets the reader
auto file_col_idx = state.column_ids[filter_col.first];
But filter_col.first = 1 (partkey has hive column index 1) and column_ids array only has 1 element. Reading index 1 caused wrong file_col_idx.
Actually it’s bad practice to pass in the state and columnIds to DuckDB ParquetReader from Velox. The state should be DuckDB ParquetReader internal state and should not be passed from outside. It also needs to set the column_ids correctly within DuckDB. Note that DuckDB reads the file metadata already during initialization, and the schema has all columns information and can be used to construct column_ids array.
Additionally the column_ids array doesn’t seem necessary, if both column_id and filter_col.first are hiveColumnIndexes. Then the above line should be
auto file_col_idx = filter_col.first;
Furthermore, DuckDB should not create so many readers, one for each table column. It should only create readers for the columns needed.
I think DuckDB needs big refactoring to manage its states correctly before we can work on any additional optimizations.
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 17 (5 by maintainers)
@yingsu00 Here is a nice blog post about Parquet reader in DuckDB: https://duckdb.org/2021/06/25/querying-parquet.html
Sure, drop me a line hannes@duckdblabs.com