Hi,
I’m trying to use RDataFrames Redefine with a macro that I’ve defined previously in the code, but I can’t seem to get it working. Here’s the code:
import ROOT
import pyspark
id_code = """
auto id_pair(ROOT::RVec<float> v, ROOT::RVec<int> idx) {
ROOT::RVec<float> result;
int size = Max(idx)+1;
result.resize(size);
for (int i=0; i<idx.size(); i++) {
if (idx[i] >= 0) {
result[idx[i]] = v[i];
}
}
return result;
}"""
ROOT.gInterpreter.Declare(id_code)
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
chain = ROOT.TChain("Events")
chain.Add("/eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root")
df = RDataFrame(chain, sparkcontext=sc)
dfN = df.Redefine("Jet_pt", "id_pair(Jet_pt, Jet_genJetIdx)").Count()
print(dfN.GetValue())
This results in a large error that I’ll include at the end. Things seem to go wrong inside the df.Redefine, where if I instead of the macro defined in code use std::Take(), which is essentially the same as the macro, the code runs. I’d like to use other functions later, which is why I’m asking about this instead of using Take(). So has anyone else run into this problem or any ideas how to solve it? I assume that it’s somehow related to where Spark looks for the function (?). I’m using the Bleeding edge stack on K8s clusters.
The error message:
[Stage 2:> (0 + 8) / 8]
22/06/27 14:11:14 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 35) (10.100.54.163 executor 4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
process()
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
out_iter = func(split_index, iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
return f(iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
merger.mergeValues(iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
for k, v in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
for obj in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
for obj in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
return f(*args, **kwargs)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
return mapper(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 107, in distrdf_mapper
rdf_plus = build_rdf_from_range(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 510, in build_rdf_from_range
chain, entries_in_trees = build_chain_from_range(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 470, in build_chain_from_range
clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 295, in get_clustered_range_from_percs
clusters, entries = get_clusters_and_entries(treename, filename)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 170, in get_clusters_and_entries
tfile = ROOT.TFile.Open(filename)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file /eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[Stage 2:> (0 + 8) / 8]
22/06/27 14:11:20 ERROR TaskSetManager: Task 7 in stage 2.0 failed 4 times; aborting job
22/06/27 14:11:20 WARN TaskSetManager: Lost task 1.3 in stage 2.0 (TID 59) (10.100.38.185 executor 3): TaskKilled (Stage cancelled)
22/06/27 14:11:20 WARN TaskSetManager: Lost task 5.3 in stage 2.0 (TID 58) (10.100.38.185 executor 3): TaskKilled (Stage cancelled)
[Stage 2:> (0 + 4) / 8]
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/tmp/ipykernel_1453/3041979157.py in <module>
----> 1 print(dfN.GetValue())
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in GetValue(self)
186 returning the value.
187 """
--> 188 execute_graph(self.proxied_node)
189 return self.proxied_node.value
190
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in execute_graph(node)
53 # All the information needed to reconstruct the computation graph on
54 # the workers is contained in the head node
---> 55 node.get_head().execute_graph()
56
57
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py in execute_graph(self)
171 # Execute graph distributedly and return the aggregated results from all
172 # tasks
--> 173 returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
174 # Perform any extra checks that may be needed according to the
175 # type of the head node
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py in ProcessAndMerge(self, ranges, mapper, reducer)
132
133 # Map-Reduce using Spark
--> 134 return parallel_collection.map(spark_mapper).treeReduce(reducer)
135
136 def distribute_unique_paths(self, paths):
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
1295 return f(x[0], y[0]), False
1296
-> 1297 reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
1298 if reduced[1]:
1299 raise ValueError("Cannot reduce empty RDD.")
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
1437 )
1438
-> 1439 return partiallyAggregated.reduce(combOp)
1440
1441 @overload
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in reduce(self, f)
1248 yield reduce(f, iterator, initial)
1249
-> 1250 vals = self.mapPartitions(func).collect()
1251 if vals:
1252 return reduce(f, vals)
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in collect(self)
1195 with SCCallSiteSync(self.context):
1196 assert self.ctx._jvm is not None
-> 1197 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
1198 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
1199
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in stage 2.0 (TID 56) (10.100.54.163 executor 4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
process()
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
out_iter = func(split_index, iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
return f(iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
merger.mergeValues(iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
for k, v in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
for obj in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
for obj in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
return f(*args, **kwargs)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
return mapper(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 107, in distrdf_mapper
rdf_plus = build_rdf_from_range(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 510, in build_rdf_from_range
chain, entries_in_trees = build_chain_from_range(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 470, in build_chain_from_range
clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 295, in get_clustered_range_from_percs
clusters, entries = get_clusters_and_entries(treename, filename)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 170, in get_clusters_and_entries
tfile = ROOT.TFile.Open(filename)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file /eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
process()
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
out_iter = func(split_index, iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 3472, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 540, in func
return f(iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 2554, in combineLocally
merger.mergeValues(iterator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 253, in mergeValues
for k, v in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1430, in mapPartition
for obj in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py", line 1416, in aggregatePartition
for obj in iterator:
File "/cvmfs/sft-nightlies.cern.ch/lcg/nightlies/devswan/Mon/spark/3.3.0-cern1/x86_64-centos7-gcc11-opt/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
return f(*args, **kwargs)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py", line 128, in spark_mapper
return mapper(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 107, in distrdf_mapper
rdf_plus = build_rdf_from_range(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 510, in build_rdf_from_range
chain, entries_in_trees = build_chain_from_range(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 470, in build_chain_from_range
clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 295, in get_clustered_range_from_percs
clusters, entries = get_clusters_and_entries(treename, filename)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 170, in get_clusters_and_entries
tfile = ROOT.TFile.Open(filename)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/Mon/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file /eos/user/n/ntoikka/data/20UL18JMENano_106X_upgrade2018_realistic_v16_L1v1-v1/30000/EAB5816F-5C3D-5247-AB9F-94B8BECDEF08.root
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
22/06/27 14:11:21 WARN TaskSetManager: Lost task 0.3 in stage 2.0 (TID 63) (10.100.84.52 executor 5): TaskKilled (Stage cancelled)
22/06/27 14:11:21 WARN TaskSetManager: Lost task 4.3 in stage 2.0 (TID 62) (10.100.84.52 executor 5): TaskKilled (Stage cancelled)
22/06/27 14:11:21 WARN TaskSetManager: Lost task 2.3 in stage 2.0 (TID 61) (10.100.253.117 executor 1): TaskKilled (Stage cancelled)
22/06/27 14:11:21 WARN TaskSetManager: Lost task 6.3 in stage 2.0 (TID 60) (10.100.253.117 executor 1): TaskKilled (Stage cancelled)