presto: Majority TPCDS queries failed with "Failed to fetched data from" failure on high SF and large clusters

Many queries failed on higher SF 10TB on large cluster with 16 workers of r5.8xlarge (https://oss.ahana.dev/benchto-run-details?runs=397) 100TB on xlarge cluster with 32 workers of r5.16xlarge (https://oss.ahana.dev/benchto-run-details?runs=398)

These failed queries all have such error

Operator::isBlocked failed for [operator: Exchange, plan node ID: 1465]: Failed to fetched data from 10.78.43.252:8585 /v1/task/20230629_203031_00426_dikw7.6.0.16.0/results/8/0 - Exhausted retries: AsyncSocketException: connect failed, type = Socket not open, errno = 111 (Connection refused)

There was no pattern in the failed nodes IP so we can eliminate single node failures.

Example query Q55 20230630_000636_00000_dikw7

SELECT
  "i_brand_id" "brand_id"
, "i_brand" "brand"
, "sum"("ss_ext_sales_price") "ext_price"
FROM
  date_dim
, store_sales
, item
WHERE ("d_date_sk" = "ss_sold_date_sk")
   AND ("ss_item_sk" = "i_item_sk")
   AND ("i_manager_id" = 28)
   AND ("d_moy" = 11)
   AND ("d_year" = 1999)
GROUP BY "i_brand", "i_brand_id"
ORDER BY "ext_price" DESC, "i_brand_id" ASC
LIMIT 100

Error message and Stack trace

{"type":"VeloxRuntimeError",
 "message":" Operator::isBlocked failed for [operator: Exchange, plan node ID: 1465]: Failed to fetched data from 10.78.43.252:8585 /v1/task/20230629_203031_00426_dikw7.6.0.16.0/results/8/0 - Exhausted retries: AsyncSocketException: connect failed, type = Socket not open, errno = 111 (Connection refused)",
 "suppressed":[],"stack":["Unknown.# 0  _ZN8facebook5velox7process10StackTraceC1Ei(Unknown Source)",
 "Unknown.# 1  _ZN8facebook5velox14VeloxExceptionC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_(Unknown Source)",
 "Unknown.# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_(Unknown Source)",
 "Unknown.# 3  _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE.cold(Unknown Source)",
 "Unknown.# 4  _ZN8facebook5velox4exec6Driver3runESt10shared_ptrIS2_E(Unknown Source)",
 "Unknown.# 5  _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallIZN8facebook5velox4exec6Driver7enqueueESt10shared_ptrIS9_EEUlvE_EEvRNS1_4DataE(Unknown Source)",
 "Unknown.# 6  _ZN5folly6detail8function14FunctionTraitsIFvvEEclEv(Unknown Source)",
 "Unknown.# 7  _ZN5folly18ThreadPoolExecutor7runTaskERKSt10shared_ptrINS0_6ThreadEEONS0_4TaskE(Unknown Source)",
 "Unknown.# 8  _ZN5folly21CPUThreadPoolExecutor9threadRunESt10shared_ptrINS_18ThreadPoolExecutor6ThreadEE(Unknown Source)",
 "Unknown.# 9  _ZSt13__invoke_implIvRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEERPS1_JRS4_EET_St21__invoke_memfun_derefOT0_OT1_DpOT2_(Unknown Source)",
 "Unknown.# 10 _ZSt8__invokeIRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEJRPS1_RS4_EENSt15__invoke_resultIT_JDpT0_EE4typeEOSC_DpOSD_(Unknown Source)",
 "Unknown.# 11 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EE6__callIvJEJLm0ELm1EEEET_OSt5tupleIJDpT0_EESt12_Index_tupleIJXspT1_EEE(Unknown Source)",
 "Unknown.# 12 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EEclIJEvEET0_DpOT_(Unknown Source)",
 "Unknown.# 13 _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallISt5_BindIFMNS_18ThreadPoolExecutorEFvSt10shared_ptrINS7_6ThreadEEEPS7_SA_EEEEvRNS1_4DataE(Unknown Source)",
 "Unknown.# 14 0x0000000000000000(Unknown Source)",
 "Unknown.# 15 start_thread(Unknown Source)",
 "Unknown.# 16 clone(Unknown Source)"],"errorCode":{"code":65536,"name":"GENERIC_INTERNAL_ERROR",
 "type":"INTERNAL_ERROR",
 "retriable":false},"errorCause":"UNKNOWN"}

The query has 5 stages (0-4), and stage 3 and 4 finished successfully.

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [i_brand_id, i_brand, sum]                                                                                                                                                                                                                                                                                                                             >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                                                                                        >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - Output[brand_id, brand, ext_price] => [i_brand_id:integer, i_brand:varchar(50), sum:decimal(38,2)]                                                                                                                                                                                                                                                                  >
             brand_id := i_brand_id (1:35)                                                                                                                                                                                                                                                                                                                                 >
             brand := i_brand (1:61)                                                                                                                                                                                                                                                                                                                                       >
             ext_price := sum (1:81)                                                                                                                                                                                                                                                                                                                                       >
         - TopN[100 by (sum DESC_NULLS_LAST, i_brand_id ASC_NULLS_LAST)] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                   >
             - LocalExchange[SINGLE] () => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                                                    >
                 - RemoteSource[1] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                                                         >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 1 [HASH]                                                                                                                                                                                                                                                                                                                                                         >
     Output layout: [i_brand, i_brand_id, sum]                                                                                                                                                                                                                                                                                                                             >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                                                                                        >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - TopNPartial[100 by (sum DESC_NULLS_LAST, i_brand_id ASC_NULLS_LAST)] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                >
         - Aggregate(FINAL)[i_brand, i_brand_id] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                                           >
                 sum := "presto.default.sum"((sum_20)) (1:81)                                                                                                                                                                                                                                                                                                              >
             - LocalExchange[HASH] (i_brand, i_brand_id) => [i_brand:varchar(50), i_brand_id:integer, sum_20:varbinary]                                                                                                                                                                                                                                                    >
                 - RemoteSource[2] => [i_brand:varchar(50), i_brand_id:integer, sum_20:varbinary]                                                                                                                                                                                                                                                                          >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 2 [SOURCE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [i_brand, i_brand_id, sum_20]                                                                                                                                                                                                                                                                                                                          >
     Output partitioning: HASH [i_brand, i_brand_id]                                                                                                                                                                                                                                                                                                                       >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - Aggregate(PARTIAL)[i_brand, i_brand_id] => [i_brand:varchar(50), i_brand_id:integer, sum_20:varbinary]                                                                                                                                                                                                                                                              >
             sum_20 := "presto.default.sum"((ss_ext_sales_price)) (1:81)                                                                                                                                                                                                                                                                                                   >
         - InnerJoin[("ss_item_sk" = "i_item_sk")] => [ss_ext_sales_price:decimal(7,2), i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                           >
                 Estimates: {rows: 46351051 (3.50GB), cpu: 15300513743569.34, memory: 5458621.00, network: 5458621.00}                                                                                                                                                                                                                                                     >
                 Distribution: REPLICATED                                                                                                                                                                                                                                                                                                                                  >
             - InnerJoin[("ss_sold_date_sk" = "d_date_sk")] => [ss_item_sk:bigint, ss_ext_sales_price:decimal(7,2)]                                                                                                                                                                                                                                                        >
                     Estimates: {rows: 4576801864 (490.19GB), cpu: 15218156819337.52, memory: 8449.70, network: 8449.70}                                                                                                                                                                                                                                                   >
                     Distribution: REPLICATED                                                                                                                                                                                                                                                                                                                              >
                 - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100000_parquet_varchar.store_sales{}]'}, grouped = false] => [ss_sold_date_sk:bigint, ss_item_sk:bigint, ss_ext_sales_price:decimal(7,2)>
                         Estimates: {rows: 288002140051 (30.12TB), cpu: 7568710790041.00, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                     >
                         LAYOUT: tpcds_sf100000_parquet_varchar.store_sales{}                                                                                                                                                                                                                                                                                              >
                         ss_ext_sales_price := ss_ext_sales_price:decimal(7,2):15:REGULAR (1:137)                                                                                                                                                                                                                                                                          >
                         ss_sold_date_sk := ss_sold_date_sk:bigint:0:REGULAR (1:137)                                                                                                                                                                                                                                                                                       >
                         ss_item_sk := ss_item_sk:bigint:2:REGULAR (1:137)                                                                                                                                                                                                                                                                                                 >
                 - LocalExchange[HASH] (d_date_sk) => [d_date_sk:bigint]                                                                                                                                                                                                                                                                                                   >
                         Estimates: {rows: 30 (3.40kB), cpu: 545.14, memory: 0.00, network: 8449.70}                                                                                                                                                                                                                                                                       >
                     - RemoteSource[3] => [d_date_sk:bigint]                                                                                                                                                                                                                                                                                                               >
             - LocalExchange[HASH] (i_item_sk) => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                                              >
                     Estimates: {rows: 5008 (396.65kB), cpu: 351623.96, memory: 0.00, network: 5450171.30}                                                                                                                                                                                                                                                                 >
                 - RemoteSource[4] => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                                                          >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 3 [SOURCE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [d_date_sk]                                                                                                                                                                                                                                                                                                                                            >
     Output partitioning: BROADCAST []                                                                                                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=date_dim, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100000_parquet_varchar.date_dim{domains={d_moy=[ [["11"]] ], d_year=[ [["1999"]] ]}}]'}, grouped = false] => [d_date_sk:bigint]                        >
             Estimates: {rows: 30 (272B), cpu: 272.57, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                                                        >
             LAYOUT: tpcds_sf100000_parquet_varchar.date_dim{domains={d_moy=[ [["11"]] ], d_year=[ [["1999"]] ]}}                                                                                                                                                                                                                                                          >
             d_date_sk := d_date_sk:bigint:0:REGULAR (1:126)                                                                                                                                                                                                                                                                                                               >
             d_moy:int:8:REGULAR                                                                                                                                                                                                                                                                                                                                           >
                 :: [["11"]]                                                                                                                                                                                                                                                                                                                                               >
             d_year:int:6:REGULAR                                                                                                                                                                                                                                                                                                                                          >
                 :: [["1999"]]                                                                                                                                                                                                                                                                                                                                             >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 4 [SOURCE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [i_item_sk, i_brand_id, i_brand]                                                                                                                                                                                                                                                                                                                       >
     Output partitioning: BROADCAST []                                                                                                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=item, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100000_parquet_varchar.item{domains={i_manager_id=[ [["28"]] ]}}]'}, grouped = false] => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]       >
             Estimates: {rows: 5008 (171.69kB), cpu: 175811.98, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                                               >
             LAYOUT: tpcds_sf100000_parquet_varchar.item{domains={i_manager_id=[ [["28"]] ]}}                                                                                                                                                                                                                                                                              >
             i_brand_id := i_brand_id:int:7:REGULAR (1:151)                                                                                                                                                                                                                                                                                                                >
             i_brand := i_brand:varchar(50):8:REGULAR (1:151)                                                                                                                                                                                                                                                                                                              >
             i_item_sk := i_item_sk:bigint:0:REGULAR (1:151)                                                                                                                                                                                                                                                                                                               >
             i_manager_id:int:20:REGULAR                                                                                                                                                                                                                                                                                                                                   >
                 :: [["28"]]                                                                                                                                                                                                                                                                                                                                               >
                                                                                                                                                                                                                                                                                                                                                                           >
                                                                                                                                                                                                                                                                                                                                                                           >
(1 row)

All the rest tasks in stage 0 , 1, 2 failed or aborted.

In stage 2, there are total 28 tasks, and 26 were aborted, 2 failed. Example aborted task in stage 2: 20230630_000636_00000_dikw7.2.0.20.0 Example failed task in stage 2: 20230630_000636_00000_dikw7.2.0.20.0 Example failed task in stage 1: 20230630_000636_00000_dikw7.1.0.0.0 AbortedTasks.zip

The aborted task stack trace:

"# 0  _ZN8facebook5velox7process10StackTraceC1Ei",
 "# 1  _ZN8facebook5velox14VeloxExceptionC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_",
 "# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorEPKcEEvRKNS1_18VeloxCheckFailArgsET0_",
 "# 3  _ZN8facebook5velox4exec4Task9terminateENS1_9TaskStateE",
 "# 4  _ZN8facebook6presto11TaskManager10deleteTaskERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEb",
 "# 5  _ZNSt17_Function_handlerIFvPN8proxygen11HTTPMessageERSt6vectorISt10unique_ptrIN5folly5IOBufESt14default_deleteIS6_EESaIS9_EEPNS0_15ResponseHandlerEEZN8facebook6presto12TaskResource10deleteTaskES2_RKS3_INSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISO_EEEUlS2_RKSB_SE_E_E9_M_invokeERKSt9_Any_dataOS2_SC_OSE_",
 "# 6  _ZNSt17_Function_handlerIFvPN8proxygen11HTTPMessageERSt6vectorISt10unique_ptrIN5folly5IOBufESt14default_deleteIS6_EESaIS9_EEPNS0_15ResponseHandlerESt10shared_ptrIN8facebook6presto4http27CallbackRequestHandlerStateEEEZNSI_22CallbackRequestHandler4wrapESt8functionIFvS2_SC_SE_EEEUlS2_SC_SE_SK_E_E9_M_invokeERKSt9_Any_dataOS2_SC_OSE_OSK_",
 "# 7  _ZN8facebook6presto4http22CallbackRequestHandler5onEOMEv",
 "# 8  _ZN8proxygen21RequestHandlerAdaptor5onEOMEv",
 "# 9  _ZN8proxygen15HTTPTransaction17processIngressEOMEv",
 "# 10 _ZN8proxygen15HTTPTransaction12onIngressEOMEv",
 "# 11 _ZN8proxygen11HTTPSession17onMessageCompleteEmb",
 "# 12 _ZN8proxygen26PassThroughHTTPCodecFilter17onMessageCompleteEmb",
 "# 13 _ZN8proxygen11HTTP1xCodec17onMessageCompleteEv",
 "# 14 _ZN8proxygen11HTTP1xCodec19onMessageCompleteCBEPNS_11http_parserE",
 "# 15 _ZN8proxygen27http_parser_execute_optionsEPNS_11http_parserEPKNS_20http_parser_settingsEhPKcm",
 "# 16 _ZN8proxygen11HTTP1xCodec13onIngressImplERKN5folly5IOBufE",
 "# 17 _ZN8proxygen11HTTP1xCodec9onIngressERKN5folly5IOBufE",
 "# 18 _ZN8proxygen26PassThroughHTTPCodecFilter9onIngressERKN5folly5IOBufE",
 "# 19 _ZN8proxygen11HTTPSession15processReadDataEv",
 "# 20 _ZN8proxygen11HTTPSession17readDataAvailableEm",
 "# 21 _ZN5folly11AsyncSocket17processNormalReadEv",
 "# 22 _ZN5folly11AsyncSocket10handleReadEv",
 "# 23 _ZN5folly11AsyncSocket7ioReadyEt",
 "# 24 _ZN5folly11AsyncSocket9IoHandler12handlerReadyEt",
 "# 25 _ZN5folly12EventHandler16libeventCallbackEisPv",
 "# 26 0x0000000000000000",
 "# 27 event_base_loop",
 "# 28 _ZN12_GLOBAL__N_116EventBaseBackend18eb_event_base_loopEi",
 "# 29 _ZN5folly9EventBase8loopMainEib",
 "# 30 _ZN5folly9EventBase8loopBodyEib",
 "# 31 _ZN5folly9EventBase4loopEv",
 "# 32 _ZN5folly9EventBase11loopForeverEv",
 "# 33 _ZN5folly20IOThreadPoolExecutor9threadRunESt10shared_ptrINS_18ThreadPoolExecutor6ThreadEE",
 "# 34 _ZSt13__invoke_implIvRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEERPS1_JRS4_EET_St21__invoke_memfun_derefOT0_OT1_DpOT2_",
 "# 35 _ZSt8__invokeIRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEJRPS1_RS4_EENSt15__invoke_resultIT_JDpT0_EE4typeEOSC_DpOSD_",
 "# 36 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EE6__callIvJEJLm0ELm1EEEET_OSt5tupleIJDpT0_EESt12_Index_tupleIJXspT1_EEE",
 "# 37 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EEclIJEvEET0_DpOT_",
 "# 38 _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallISt5_BindIFMNS_18ThreadPoolExecutorEFvSt10shared_ptrINS7_6ThreadEEEPS7_SA_EEEEvRNS1_4DataE",
 "# 39 0x0000000000000000",
 "# 40 start_thread",
 "# 41 clone" ${database}.${schema}.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 17 (14 by maintainers)

Commits related to this issue

Most upvoted comments

@mbasmanova : We don’t see these errors in lower scale factors of the TPC-DS including sf1K. It shows up only when query is 10K scale factor (so 10 TB) of data. We can try to determine the number of rows triggering HashTable rewrite here. But just thought to check if you’ll have seen such a failure before and can share some clues.