Dear experts,
I am trying to run a python script that constructs a Spark distributed dataframe (ROOT.RDF.Experimental.Distributed.Spark.RDataFrame) inside a spark cluster (after clicking the star button in SWAN). Here is my script:
def process_weight(weight):
df_reco = extractDf(treename, reco_branches, f"{weight}")
reco, truth, mig, acc, eff = matching_and_make_hists(df_reco, df_truth, f"{weight}") ## df_reco and df_truth are pandas dataframe
... # further saves the output into a files
Now the extract_Df function is as follows:
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
def extract_Df(files,treename,branches):
rdf = RDataFrame(treename,files,npartitions=3,sparkcontext=sc)
ndf = pd.DataFrame(rdf_filtered.AsNumpy(branches))
return ndf ## which is now a pandas dataframe
What I want to do?
I want to run process_weight function for 300 weights which I would like to parallelize. Currently I just make a for loop saying for weight in weight_list, process_weight(weight). I allocated 5 executors with 10 cores each and 3g memory per executor but somehow it just utilizes 1 executos and queues up all the weights. Can you help me make spark utilize all the resources?
Thanks in advance!
Nilima