nebula: Encounter error "Unable to activate object" when there are multiple threads / concurrent tasks in Spark
Settings
Hardware and software overview
NebulaGraph Database deployment
- Deployment: installed in the single-node mode on an EC2 machine
- Nebula version: 3.6.0
- Nebula Spark Connector version: nebula-spark-connector_3.0-3.0-SNAPSHOT
- Spark version: 3.3.3
- Installation method: via RPM
- Storage: 600G
- vCPU: 8
- Memory: 64G
- OS: Linux
Computation cluster
- Computation cluster uses an EMR cluster
- EC2 machine and EMR cluster can connect to each other
- Spark version: 3.1.2
Graph data
- Number of nodes: 289418552
- Number of edges: 303938330
- Partition number: 20
Others
- Already added public IP of the EC2 machine to hosts
Issues
When trying scanning full graph data such as count() as shown in the snippet below on the EMR machine, we encountered Unable to activate object error.
Snippet:
import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
val ec2_public_ip = "xx.xx.xx.xx"
val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(10).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
dataset.count()
Error log:
23/12/15 07:59:57 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 103) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
23/12/15 07:59:57 ERROR TaskSetManager: Task 1 in stage 10.0 failed 4 times; aborting job
23/12/15 07:59:57 WARN TaskSetManager: Lost task 2.3 in stage 10.0 (TID 141) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 7.3 in stage 10.0 (TID 136) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 5.3 in stage 10.0 (TID 133) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 3.3 in stage 10.0 (TID 135) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 4.3 in stage 10.0 (TID 139) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 0.3 in stage 10.0 (TID 140) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 9.3 in stage 10.0 (TID 134) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 6.3 in stage 10.0 (TID 137) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 8.3 in stage 10.0 (TID 138) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost task 1.3 in stage 10.0 (TID 132) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:147)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:204)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:203)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:425)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3047)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3046)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3749)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3046)
... 49 elided
Caused by: com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
However, we can successfully run the following and get results on EMR machine.
dataset.show()
We also tested scripts involving different volumes of the graph data. When val n_limit = 1000000, we can successfully run the following (which is a modified snippet from nebula-algorithm package):
val ORIGIN_ID_COL = "id"
val fieldNames = dataset.schema.fieldNames
val n_limit = 1000000
val (srcName, dstName) = (fieldNames(0), fieldNames(1))
val srcIdDF: DataFrame = dataset.select(srcName).withColumnRenamed(srcName, ORIGIN_ID_COL).limit(n_limit)
val dstIdDF: DataFrame = dataset.select(dstName).withColumnRenamed(dstName, ORIGIN_ID_COL).limit(n_limit)
val idDF = srcIdDF.union(dstIdDF).distinct()
idDF.show()
However, when we increase to val n_limit = 10000000, it failed and we got the same Unable to activate object error.
What we found so far
With more tests going on, we found that when number of all threads / concurrent tasks is 1, there would not be such error, whereas when number of threads is greater than 1, the error appears. We are suspecting that there is certain constraint of NebulaGraph Database and wondering whether proper configuration tuning could help.
Could you please help with this issue? Feel free to let me know if I need provide more information. Thanks a lot!
About this issue
- Original URL
- State: closed
- Created 7 months ago
- Comments: 22 (11 by maintainers)
This question came up to me very accidentally, it’s about the port amount. maybe you can try according to the post. https://blog.csdn.net/gltncx11/article/details/122068479 @sparkle-apt
OK, I’ll make a test to see if there any connection leak. And at the mean time, maybe you can update your nebula-spark-connector to the latest version.
You can config
minloglevelas 0 and configvas 3 for more detailed info. https://docs.nebula-graph.io/3.6.0/5.configurations-and-logs/2.log-management/logs/#parameter_descriptionsWe have decided to not be blocked by this issue for the moment and move forward with other projects and test in larger clusters. We will get back to it when bandwidth allows. So we can close the issue. Thanks for the reminder.
@Nicole00 No worries! There are 303938330
USESedges in the space.So wired! Does the first four tasks are located in the different machines with the other tasks? Can you sure the
telnet storaged_host_ip 9779is ok for all the spark workers?In addition, we have observed weird behavior in another test, which is to connect to the database and count number via spark-shell.
We run again the following snippet
The first four tasks raised “Unable to activate object” the error while the following ones did not.
We are concerned about this unstable and unexpected behavior and looking forward to your suggestion. Thanks!
@Nicole00 @QingZ11 @wey-gu
please make sure all the spark workers can ping the storaged address.