Macros in RDataFrames Redefine

Hi,

I’m trying to use RDataFrames Redefine with a macro that I’ve defined previously in the code, but I can’t seem to get it working. Here’s the code:

import ROOT
import pyspark

id_code = """
auto id_pair(ROOT::RVec<float> v, ROOT::RVec<int> idx) {
    ROOT::RVec<float> result;
    int size = Max(idx)+1;
    result.resize(size);

    for (int i=0; i<idx.size(); i++) {
        if (idx[i] >= 0) {
            result[idx[i]] = v[i];
        }
    }

    return result;
}"""

ROOT.gInterpreter.Declare(id_code)

RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame

chain = ROOT.TChain("Events")
chain.Add("/eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root")

df = RDataFrame(chain, sparkcontext=sc)

dfN = df.Redefine("Jet_pt", "id_pair(Jet_pt, Jet_genJetIdx)").Count()
print(dfN.GetValue())

This results in a large error that I’ll include at the end. Things seem to go wrong inside the df.Redefine, where if I instead of the macro defined in code use std::Take(), which is essentially the same as the macro, the code runs. I’d like to use other functions later, which is why I’m asking about this instead of using Take(). So has anyone else run into this problem or any ideas how to solve it? I assume that it’s somehow related to where Spark looks for the function (?). I’m using the Bleeding edge stack on K8s clusters.

The error message:

[Stage 2:>                                                          (0 + 8) / 8]

22/06/27 14:11:14 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 35) (10.100.54.163 executor 4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 107, in distrdf_mapper
    rdf_plus = build_rdf_from_range(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 510, in build_rdf_from_range
    chain, entries_in_trees = build_chain_from_range(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 470, in build_chain_from_range
    clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 295, in get_clustered_range_from_percs
    clusters, entries = get_clusters_and_entries(treename, filename)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 170, in get_clusters_and_entries
    tfile = ROOT.TFile.Open(filename)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
    raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file /eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

[Stage 2:>                                                          (0 + 8) / 8]

22/06/27 14:11:20 ERROR TaskSetManager: Task 7 in stage 2.0 failed 4 times; aborting job
22/06/27 14:11:20 WARN TaskSetManager: Lost task 1.3 in stage 2.0 (TID 59) (10.100.38.185 executor 3): TaskKilled (Stage cancelled)
22/06/27 14:11:20 WARN TaskSetManager: Lost task 5.3 in stage 2.0 (TID 58) (10.100.38.185 executor 3): TaskKilled (Stage cancelled)

[Stage 2:>                                                          (0 + 4) / 8]

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_1453/3041979157.py in <module>
----> 1 print(dfN.GetValue())

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in GetValue(self)
    186         returning the value.
    187         """
--> 188         execute_graph(self.proxied_node)
    189         return self.proxied_node.value
    190 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in execute_graph(node)
     53             # All the information needed to reconstruct the computation graph on
     54             # the workers is contained in the head node
---> 55             node.get_head().execute_graph()
     56 
     57 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py in execute_graph(self)
    171         # Execute graph distributedly and return the aggregated results from all
    172         # tasks
--> 173         returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
    174         # Perform any extra checks that may be needed according to the
    175         # type of the head node

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py in ProcessAndMerge(self, ranges, mapper, reducer)
    132 
    133         # Map-Reduce using Spark
--> 134         return parallel_collection.map(spark_mapper).treeReduce(reducer)
    135 
    136     def distribute_unique_paths(self, paths):

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
   1295                 return f(x[0], y[0]), False
   1296 
-> 1297         reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
   1298         if reduced[1]:
   1299             raise ValueError("Cannot reduce empty RDD.")

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
   1437             )
   1438 
-> 1439         return partiallyAggregated.reduce(combOp)
   1440 
   1441     @overload

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in reduce(self, f)
   1248             yield reduce(f, iterator, initial)
   1249 
-> 1250         vals = self.mapPartitions(func).collect()
   1251         if vals:
   1252             return reduce(f, vals)

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in collect(self)
   1195         with SCCallSiteSync(self.context):
   1196             assert self.ctx._jvm is not None
-> 1197             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   1198         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
   1199 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
    188     def deco(*a: Any, **kw: Any) -> Any:
    189         try:
--> 190             return f(*a, **kw)
    191         except Py4JJavaError as e:
    192             converted = convert_exception(e.java_exception)

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in stage 2.0 (TID 56) (10.100.54.163 executor 4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 107, in distrdf_mapper
    rdf_plus = build_rdf_from_range(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 510, in build_rdf_from_range
    chain, entries_in_trees = build_chain_from_range(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 470, in build_chain_from_range
    clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 295, in get_clustered_range_from_percs
    clusters, entries = get_clusters_and_entries(treename, filename)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 170, in get_clusters_and_entries
    tfile = ROOT.TFile.Open(filename)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
    raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file /eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 107, in distrdf_mapper
    rdf_plus = build_rdf_from_range(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 510, in build_rdf_from_range
    chain, entries_in_trees = build_chain_from_range(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 470, in build_chain_from_range
    clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 295, in get_clustered_range_from_percs
    clusters, entries = get_clusters_and_entries(treename, filename)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 170, in get_clusters_and_entries
    tfile = ROOT.TFile.Open(filename)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
    raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file /eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


22/06/27 14:11:21 WARN TaskSetManager: Lost task 0.3 in stage 2.0 (TID 63) (10.100.84.52 executor 5): TaskKilled (Stage cancelled)
22/06/27 14:11:21 WARN TaskSetManager: Lost task 4.3 in stage 2.0 (TID 62) (10.100.84.52 executor 5): TaskKilled (Stage cancelled)
22/06/27 14:11:21 WARN TaskSetManager: Lost task 2.3 in stage 2.0 (TID 61) (10.100.253.117 executor 1): TaskKilled (Stage cancelled)
22/06/27 14:11:21 WARN TaskSetManager: Lost task 6.3 in stage 2.0 (TID 60) (10.100.253.117 executor 1): TaskKilled (Stage cancelled)

Hi @ntoikka ,

I think the main problem with the code you show is the usage of gInterpreter.Declare which is a local function and trying to use the id_pair function you declared only in your local session. That function is not distributed to the workers in your example. Your best option for now is telling the scheduler to declare that code at the beginning of each distributed task, like so

import ROOT

RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
initialize = ROOT.RDF.Experimental.Distributed.initialize

def myinit():
    ROOT.gInterpreter.Declare("""MYCODE""")

initialize(myinit)

# ... Continue with your RDF application

Then I have a couple of clarifications to ask you

  • I am not sure about what you mean by "the code runs if I use std::Take()". Do you use the ROOT::VecOps::Take in the call to Redefine? In that case, I understand the function can work because that’s already in the ROOT library, whereas your own function wasn’t declared in the distributed workers.
  • The error you report has nothing to do with functions not being present, rather it says the file cannot be opened. Can you reproduce this if you do the following?
    chain = ROOT.TChain("Events")
    chain.Add("/eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root")
    df = RDataFrame(chain, sparkcontext=sc)
    print(df.Count().GetValue())
    

Best,
Vincenzo

1 Like

Thanks for the quick response.

When I mentioned std::Take() I meant to say that df.Redefine("Jet_pt", "Take(Jet_pt, Jet_genJetIdx)") works, so I think it’s exactly as you said.

I had a mistake in the original message, the actual path to the file that I pass to chain.Add() has root://eosuser.cern.ch/ before the /eos/. So now when I run

chain = ROOT.TChain("Events")
chain.Add("root://eosuser.cern.ch//eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/0230241E-754C-954D-B5D5-C104C1550830.root")
df = RDataFrame(chain, sparkcontext=sc)
print(df.Redefine("Jet_pt", "id_pair(Jet_pt, Jet_genJetIdx)").Count().GetValue())

it works.

Now to continue on the topic, I’ve previously used ROOT.gSystem.Load() to include libraries to run locally. I now have the function id_pair() in a .so file Nhelpers_C.so and would like to use it from there, so I tried

import ROOT

RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
initialize = ROOT.RDF.Experimental.Distributed.initialize
def myinit():
     ROOT.gSystem.Load('root://eosuser.cern.ch//eos/user/n/ntoikka/code/macros/./Nhelpers_C.so')
initialize(myinit)

but it didn’t work. I also tried a relative path to the library, but it also fails. The error seems similar to the one before, but I’m not too familiar with these. Any suggestions?

Error when using the library:

[Stage 9:>                                                          (0 + 8) / 8]

22/06/27 15:56:50 WARN TaskSetManager: Lost task 1.0 in stage 9.0 (TID 115) (10.100.38.188 executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 109, in distrdf_mapper
    mergeables = get_mergeable_values(rdf_plus.rdf, current_range.id, computation_graph_callable, optimized)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 69, in get_mergeable_values
    resultptr_list = computation_graph_callable(starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 214, in trigger_computation_graph
    actions = generate_computation_graph(graph, starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 184, in generate_computation_graph
    rdf_node, in_task_op = _call_rdf_operation(node.operation, graph[node.parent_id].rdf_node, range_id)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/functools.py", line 888, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 132, in _call_rdf_operation
    rdf_node = rdf_operation(*in_task_op.args, **in_task_op.kwargs)
cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void>::Redefine(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

  ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void>::Redefine(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

22/06/27 15:57:00 WARN TaskSetManager: Lost task 5.3 in stage 9.0 (TID 138) (10.100.38.188 executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 109, in distrdf_mapper
    mergeables = get_mergeable_values(rdf_plus.rdf, current_range.id, computation_graph_callable, optimized)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 69, in get_mergeable_values
    resultptr_list = computation_graph_callable(starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 214, in trigger_computation_graph
    actions = generate_computation_graph(graph, starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 184, in generate_computation_graph
    rdf_node, in_task_op = _call_rdf_operation(node.operation, graph[node.parent_id].rdf_node, range_id)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/functools.py", line 888, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 132, in _call_rdf_operation
    rdf_node = rdf_operation(*in_task_op.args, **in_task_op.kwargs)
cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void>::Redefine(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

  ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void>::Redefine(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.


	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

22/06/27 15:57:00 ERROR TaskSetManager: Task 5 in stage 9.0 failed 4 times; aborting job
22/06/27 15:57:00 WARN TaskSetManager: Lost task 1.3 in stage 9.0 (TID 139) (10.100.38.188 executor 1): TaskKilled (Stage cancelled)
22/06/27 15:57:00 WARN TaskSetManager: Lost task 0.3 in stage 9.0 (TID 140) (10.100.54.185 executor 3): TaskKilled (Stage cancelled)
22/06/27 15:57:00 WARN TaskSetManager: Lost task 4.3 in stage 9.0 (TID 141) (10.100.54.185 executor 3): TaskKilled (Stage cancelled)

[Stage 9:>                                                          (0 + 4) / 8]

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_3111/3371986165.py in <module>
      5 df = RDataFrame(chain, sparkcontext=sc)
      6 
----> 7 print(df.Redefine("Jet_pt", "id_pair(Jet_pt, Jet_genJetIdx)").Count().GetValue())

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in GetValue(self)
    186         returning the value.
    187         """
--> 188         execute_graph(self.proxied_node)
    189         return self.proxied_node.value
    190 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in execute_graph(node)
     53             # All the information needed to reconstruct the computation graph on
     54             # the workers is contained in the head node
---> 55             node.get_head().execute_graph()
     56 
     57 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py in execute_graph(self)
    171         # Execute graph distributedly and return the aggregated results from all
    172         # tasks
--> 173         returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
    174         # Perform any extra checks that may be needed according to the
    175         # type of the head node

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py in ProcessAndMerge(self, ranges, mapper, reducer)
    132 
    133         # Map-Reduce using Spark
--> 134         return parallel_collection.map(spark_mapper).treeReduce(reducer)
    135 
    136     def distribute_unique_paths(self, paths):

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
   1295                 return f(x[0], y[0]), False
   1296 
-> 1297         reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
   1298         if reduced[1]:
   1299             raise ValueError("Cannot reduce empty RDD.")

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
   1437             )
   1438 
-> 1439         return partiallyAggregated.reduce(combOp)
   1440 
   1441     @overload

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in reduce(self, f)
   1248             yield reduce(f, iterator, initial)
   1249 
-> 1250         vals = self.mapPartitions(func).collect()
   1251         if vals:
   1252             return reduce(f, vals)

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in collect(self)
   1195         with SCCallSiteSync(self.context):
   1196             assert self.ctx._jvm is not None
-> 1197             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   1198         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
   1199 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
    188     def deco(*a: Any, **kw: Any) -> Any:
    189         try:
--> 190             return f(*a, **kw)
    191         except Py4JJavaError as e:
    192             converted = convert_exception(e.java_exception)

/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 9.0 failed 4 times, most recent failure: Lost task 5.3 in stage 9.0 (TID 138) (10.100.38.188 executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 109, in distrdf_mapper
    mergeables = get_mergeable_values(rdf_plus.rdf, current_range.id, computation_graph_callable, optimized)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 69, in get_mergeable_values
    resultptr_list = computation_graph_callable(starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 214, in trigger_computation_graph
    actions = generate_computation_graph(graph, starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 184, in generate_computation_graph
    rdf_node, in_task_op = _call_rdf_operation(node.operation, graph[node.parent_id].rdf_node, range_id)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/functools.py", line 888, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 132, in _call_rdf_operation
    rdf_node = rdf_operation(*in_task_op.args, **in_task_op.kwargs)
cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void>::Redefine(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

  ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void>::Redefine(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.


	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 109, in distrdf_mapper
    mergeables = get_mergeable_values(rdf_plus.rdf, current_range.id, computation_graph_callable, optimized)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 69, in get_mergeable_values
    resultptr_list = computation_graph_callable(starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 214, in trigger_computation_graph
    actions = generate_computation_graph(graph, starting_node, range_id)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 184, in generate_computation_graph
    rdf_node, in_task_op = _call_rdf_operation(node.operation, graph[node.parent_id].rdf_node, range_id)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/functools.py", line 888, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/ComputationGraphGenerator.py", line 132, in _call_rdf_operation
    rdf_node = rdf_operation(*in_task_op.args, **in_task_op.kwargs)
cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RLoopManager,void>::Redefine(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

I had to cut some of the lines at the end due to character limit, I’ll provide to whole message if it’s necessary