Distributed RDataFrames memory issue on workers

Dear experts,

I am using the new distributed RDataframes framework, using the bleeding edge software stack and the k8s cluster, defining
SparkRDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame

I recently encountered an issue regarding the maximum memory size allowed on the workers, the error is as follow:

Job aborted due to stage failure: Total size of serialized results of 6 tasks (1039.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)

For your information I obtain such a big result because this is obtained through the method AsNumpy() (a maybe too large array…).
The error happens during the reduction job (the individual jobs performing the AsNumpy() operation are working, but the merging part crashes due to too high memory request.

I tried to increase the memory of the spark driver using
spark.driver.maxResultSize = 2g
spark.driver.memory = 2g
but it seems not to work… again the individual jobs are fine, but the reduction job is crashing

Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 16) (10.100.197.253 executor 5): TaskResultLost (result lost from block manager)

Do I setup spark the right way ? Or more importantly, am I allowed to do so ? :stuck_out_tongue:

Cheers,
Brian

Dear @vbrian,
Distributed RDataFrame does the merging of partial results from the mappers in the distributed resources, sending back only the final result of your analysis to your SWAN session. In your case sending such a large array can lead to this issue, although increasing the driver memory should solve it (if everything else is indeed working correctly).

I am not aware if there might be any restrictions to the driver memory in the SWAN configuration, although I imagine that this parameter might be fixed when you connect to SWAN from the configuration panel (since it’s where you decide the configuration of your VM: software stack, cores, availability of the Spark cluster etc.). Probably SWAN experts can help us understand this bit better.

Cheers,
Vincenzo

I’d say that increasing the driver memory via that parameter should work. @rcastell do you see a reason why this would not be enough?

@vbrian , have you by any chance tried to use this parameter outside of an RDF application? E.g. running some regular Spark job that brings 2g to the driver as a result? It might be worth trying it to see if its a problem in RDF that we need to understand or on the Spark side.

Hi,
increasing the driver memory is ok.
Just keep in mind that the driver memory counts towards the limit of memory you get when you start session, so if you allocate e.g. 16g the JVM will crash when it tries to get the part above your session limit.

Hello @etejedor, @rcastell,

First of all, you may be right saying that it can overflow my session limit memory since I ran it using bleeding edge where “only” 8G are available… However launching the same in LCG100 with 16G seems not to solve the case. Moreover I would like to understand more the previous error message. From what I understood the total number of task size is 1039MiB which is farly below the 8000 available.

So to be fair I never really used spark outside an RDF application, but maybe I can make some trials on this side. Anyway I tried to change the memory on the driver as described, the error message for now is the following, maybe you can understand it much better than me what it could imply.

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 16.1 failed 4 times, most recent failure: Lost task 1.3 in stage 16.1 (TID 112) (10.100.42.57 executor 4): TaskResultLost (result lost from block manager)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Cheers,
Brian