nebula: Encounter error "Unable to activate object" when there are multiple threads / concurrent tasks in Spark

Settings

Hardware and software overview

NebulaGraph Database deployment

  • Deployment: installed in the single-node mode on an EC2 machine
  • Nebula version: 3.6.0
  • Nebula Spark Connector version: nebula-spark-connector_3.0-3.0-SNAPSHOT
  • Spark version: 3.3.3
  • Installation method: via RPM
  • Storage: 600G
  • vCPU: 8
  • Memory: 64G
  • OS: Linux

Computation cluster

  • Computation cluster uses an EMR cluster
  • EC2 machine and EMR cluster can connect to each other
  • Spark version: 3.1.2

Graph data

  • Number of nodes: 289418552
  • Number of edges: 303938330
  • Partition number: 20

Others

  • Already added public IP of the EC2 machine to hosts

Issues

When trying scanning full graph data such as count() as shown in the snippet below on the EMR machine, we encountered Unable to activate object error. Snippet:

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(10).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()

dataset.count()

Error log:

23/12/15 07:59:57 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 103) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
	at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
	at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
	at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
	at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

23/12/15 07:59:57 ERROR TaskSetManager: Task 1 in stage 10.0 failed 4 times; aborting job
23/12/15 07:59:57 WARN TaskSetManager: Lost task 2.3 in stage 10.0 (TID 141) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 7.3 in stage 10.0 (TID 136) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 5.3 in stage 10.0 (TID 133) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 3.3 in stage 10.0 (TID 135) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 4.3 in stage 10.0 (TID 139) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 0.3 in stage 10.0 (TID 140) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 9.3 in stage 10.0 (TID 134) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 6.3 in stage 10.0 (TID 137) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 8.3 in stage 10.0 (TID 138) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost task 1.3 in stage 10.0 (TID 132) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
	at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
	at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
	at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
	at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:147)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:204)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:203)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:425)
  at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3047)
  at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3046)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3749)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:3046)
  ... 49 elided
Caused by: com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
  at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
  at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
  at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
  at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
  at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
  at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
  at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
  at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)

However, we can successfully run the following and get results on EMR machine.

dataset.show()

We also tested scripts involving different volumes of the graph data. When val n_limit = 1000000, we can successfully run the following (which is a modified snippet from nebula-algorithm package):

val ORIGIN_ID_COL = "id"
val fieldNames         = dataset.schema.fieldNames
val n_limit = 1000000
val (srcName, dstName) = (fieldNames(0), fieldNames(1))
val srcIdDF: DataFrame = dataset.select(srcName).withColumnRenamed(srcName, ORIGIN_ID_COL).limit(n_limit)
val dstIdDF: DataFrame = dataset.select(dstName).withColumnRenamed(dstName, ORIGIN_ID_COL).limit(n_limit)
val idDF               = srcIdDF.union(dstIdDF).distinct()
idDF.show()

However, when we increase to val n_limit = 10000000, it failed and we got the same Unable to activate object error.

What we found so far

With more tests going on, we found that when number of all threads / concurrent tasks is 1, there would not be such error, whereas when number of threads is greater than 1, the error appears. We are suspecting that there is certain constraint of NebulaGraph Database and wondering whether proper configuration tuning could help.

Could you please help with this issue? Feel free to let me know if I need provide more information. Thanks a lot!

About this issue

  • Original URL
  • State: closed
  • Created 7 months ago
  • Comments: 22 (11 by maintainers)

Most upvoted comments

This question came up to me very accidentally, it’s about the port amount. maybe you can try according to the post. https://blog.csdn.net/gltncx11/article/details/122068479 @sparkle-apt

OK, I’ll make a test to see if there any connection leak. And at the mean time, maybe you can update your nebula-spark-connector to the latest version.

You can config minloglevel as 0 and config v as 3 for more detailed info. https://docs.nebula-graph.io/3.6.0/5.configurations-and-logs/2.log-management/logs/#parameter_descriptions

We have decided to not be blocked by this issue for the moment and move forward with other projects and test in larger clusters. We will get back to it when bandwidth allows. So we can close the issue. Thanks for the reminder.

@Nicole00 No worries! There are 303938330 USES edges in the space.

So wired! Does the first four tasks are located in the different machines with the other tasks? Can you sure the telnet storaged_host_ip 9779 is ok for all the spark workers?

In addition, we have observed weird behavior in another test, which is to connect to the database and count number via spark-shell.

spark-shell --master yarn --deploy-mode client --driver-memory=2G --executor-memory=2G  --num-executors=2 --executor-cores=2 --conf spark.dynamicAllocation.enabled=false --jars nebula-spark-connector_3.0-3.0-SNAPSHOT-jar-with-dependencies.jar

We run again the following snippet

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

sc.setLogLevel("INFO")
val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(20).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
dataset.show()
dataset.count()

The first four tasks raised “Unable to activate object” the error while the following ones did not. Screenshot 2023-12-20 at 11 10 21 Screenshot 2023-12-20 at 11 10 02 We are concerned about this unstable and unexpected behavior and looking forward to your suggestion. Thanks! @Nicole00 @QingZ11 @wey-gu

please make sure all the spark workers can ping the storaged address.