Npartitions in PyRDF

Dear experts,

I’m new to using Spark clusters and PyRDF on SWAN and I’m confused by the behaviour I’m seeing for setting different values for npartitions in PyRDF.use(‘spark’,{‘npartitions’:4}). I’m using 99 on CentOS 7 (gcc8) with the K8s containers, and I check all 6 configuration boxes when connecting to the spark cluster (not sure if this is necessary, but checking none of them didn’t seem to work). Here’s my minimal example code:

import PyRDF, ROOT
PyRDF.use('spark', {'npartitions':4})
#PyRDF.use('local')
df = PyRDF.RDataFrame("Events", ['https://root.cern/files/HiggsTauTauReduced/W1JetsToLNu.root','https://root.cern/files/HiggsTauTauReduced/W2JetsToLNu.root'])

df_histogram = df.Histo1D(ROOT.RDF.TH1DModel("h","h",30,0,150),"Muon_pt")

canvas = ROOT.TCanvas()
df_histogram.Draw()
canvas.Draw()

Everything works fine when npartitions is set to 4 or less. Increasing it above 4 causes the second stage of treeReduce jobs to fail.


(I’ve put the exact error message at https://nihaubri.web.cern.ch/error_message/spark.txt)
Strangely, if I simply count events (e.g. df.Count().GetValue()) instead of making and drawing a histogram, the jobs succeed with npartitions>4.

So is it possible (and encouraged) to go above 4 partitions? I’d imagine the performance increase would be necessary for analyzing datasets much larger than my example.

Thank you for the help. SWAN has been useful in my research and I’m glad to see it’s being improved still.

Best,
Nick

Dear Nick,

I already have a similar issue some time ago, and as far as I remember, this is due to the option spark.python.worker.reuse = False which is activated through the first option called “LongRunningAnalysis”. If you remove this first option when connecting the spark cluster, your snippet works even with 5 partitions.

To be honest I do not remember why this behavour, maybe Prasanth will have a clear idea on that ?

Just a small remark also for this npartition parameter : you can totally set it to an indecent number (I usually use 32 :stuck_out_tongue: ), but in the end, if you ask too many partitions, PyRDF will scale this number based on the number of “clusters” inside your ROOT file, so you do not have to worry about this I guess…

Also a final remark, I do not know whether it is good or not to enable different options through the spark connector which are setting the same spark parameters. I do not know the exact behaviour of that… anyway the main option you need is Spark3Shuffle option.

Cheers,
Brian

Dear @nihaubri ,

Sorry for this late reply, I didn’t check the notification from this forum. @vbrian already pointed out a correct approach, that is first trying without invoking the various options of the cluster connection in SWAN, then activating some of them when needed. I tried your reproducer on SWAN right now, connecting to the Spark cluster without any extra option, and indeed it worked even with 32 partitions, so there should be no issues there.
Cheers,
Vincenzo

Thank you both for the help. I’ll pay closer attention to the options next time. I followed Brian’s suggestion and everything worked with many partitions. The code runs much faster now!

Best,
Nick