I/O of ROOT files stored on EOS

Dear experts,

I want to make a RDataFrame from few root files stored on eos of my colleague. (He has provided me access to the directory). I wish to use spark clusters on SWAN and am also familiar with distributed RDF in python.
Problem: Unable to read the root file. It gives me an error Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 60) (ithdp3106.cern.ch executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

My hunch is that there is something happening with xrootd and python that I can’t understand. Here is the code snippet:

f = ‘root://eosuser.cern.ch//eos/user/…/…/truth*.root’
SparkRDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
df = SparkRDataFrame(“tree_name”,f)
h = df.Histo1D((‘myhist’, ‘myhist’, 10, 0, 1.0), ‘var_name’)
h.Draw()

Any help is appreciated!

Cheers,
Nilima.

Dear Nilima,

When you open the Spark connector to connect to a certain Spark cluster (it opens by clicking on the “star” button when you are in the notebook), you can tick a few boxes (the “bundles”) that add Spark options to your connection before you establish it. One of these bundles is “Include EOSFilesystem options”. If you tick it, it will make sure it propagates the kerberos ticket in your SWAN session to the Spark executors – I assume that is your problem.

Also, I’d use https://swan-k8s.cern.ch, since in that infrastructure the kerberos ticket is present in your SWAN session and can be propagated. If you are in swan004-005-006 (puppet nodes), you can still use the feature but you need to kinit first before establishing the connection with the Spark cluster. Note that the puppet nodes are the old SWAN infrastructure we are migrating away from, so soon you won’t need to worry about this anymore (you’ll be always using the new k8s infrastructure).

Hope this helps,

Enric

Hi Enric,

Thank you for the prompt reply! I used https://swan-k8s.cern.ch along with the bundle that you suggested. Unfortunately it still does not work for me. I get a bunch of errors (shown below). In order to test if something is wrong with the input file, I cloned a tutorial and tried to load the example file which is also on eos. I still get the same set of errors.

Py4JJavaError Traceback (most recent call last)
in

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in _call_action_result(self, *args, **kwargs)
196 result of the current action node.
197 “”"
→ 198 return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
199
200 def create_variations(self) → VariationsProxy:

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in GetValue(self)
188 returning the value.
189 “”"
→ 190 execute_graph(self.proxied_node)
191 return self.proxied_node.value
192

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in execute_graph(node)
55 # All the information needed to reconstruct the computation graph on
56 # the workers is contained in the head node
—> 57 node.get_head().execute_graph()
58
59

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py in execute_graph(self)
205 # Execute graph distributedly and return the aggregated results from all
206 # tasks
→ 207 returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
208 # Perform any extra checks that may be needed according to the
209 # type of the head node

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py in ProcessAndMerge(self, ranges, mapper, reducer)
133
134 # Map-Reduce using Spark
→ 135 return parallel_collection.map(spark_mapper).treeReduce(reducer)
136
137 def distribute_unique_paths(self, paths):

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
1295 return f(x[0], y[0]), False
1296
→ 1297 reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
1298 if reduced[1]:
1299 raise ValueError(“Cannot reduce empty RDD.”)

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
1437 )
1438
→ 1439 return partiallyAggregated.reduce(combOp)
1440
1441 @overload

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in reduce(self, f)
1248 yield reduce(f, iterator, initial)
1249
→ 1250 vals = self.mapPartitions(func).collect()
1251 if vals:
1252 return reduce(f, vals)

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in collect(self)
1195 with SCCallSiteSync(self.context):
1196 assert self.ctx._jvm is not None
→ 1197 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
1198 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
1199

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/java_gateway.py in call(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
→ 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) → Any:
189 try:
→ 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
→ 326 raise Py4JJavaError(
327 “An error occurred while calling {0}{1}{2}.\n”.
328 format(target_id, “.”, name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 12) (ithdp5004.cern.ch executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/worker.py”, line 668, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/worker.py”, line 85, in read_command
command = serializer._read_with_length(file)
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/serializers.py”, line 173, in _read_with_length
return self.loads(obj)
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/serializers.py”, line 471, in loads
return cloudpickle.loads(obj, encoding=encoding)
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py”, line 19, in
import ROOT
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/init.py”, line 38, in
_register_pythonizations()
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/init.py”, line 351, in _register_pythonizations
importlib.import_module(name + ‘.’ + module_name)
File “/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/importlib/init.py”, line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File “/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tmva/init.py”, line 25, in
hasRDF = gSystem.GetFromPipe(“root-config --has-dataframe”) == “yes”
ValueError: TString TSystem::GetFromPipe(const char* command) =>
ValueError: nullptr result where temporary expected

Hello,

That is a different error (no longer related to the lack of EOS credentials in the Spark executors). This seems application-related, so I’ll ask @vpadulan for help.

Cheers,

Enric