CERN Accelerating science

Reading from/writing to HDFS on NXCALS

Hello,

I would like to ask you for some examples and/or references regarding reading/writing files to HDFS while connected to the NXCALS cluster. I have already a project space there.

Thank you in advance for sharing your experience.

Best regards, Michał.

Dear Maciej,

It depends on what you want to write. If you already have a schema for your data, and the data is small you can just use CSV

The code to append and read is as below

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType

schema = StructType([
    StructField("timestamp", IntegerType()),
    StructField("col1", DoubleType()),
    StructField("col2", DoubleType()),
    StructField("col3", DoubleType()),
])

# write some initial data
input_data = [[11111,0.4,0.4,0.0], [11112,0.0,0.0,0.7], [11113,0.2,0.0,0.3], [11114,0.0,0.0,0.0]]
df1 = sc.parallelize(input_data).toDF(schema)
df1.write.csv(path="/user/pmrowczy/measurement.csv", mode="overwrite", header=True)

# add some more data
input_data = [[11115,0.4,0.4,0.0]]
df1 = sc.parallelize(input_data).toDF(schema)
df1.write.csv(path="/user/pmrowczy/measurement.csv", mode="append", header=True)

# read it all
spark.read.format('csv').load("/user/pmrowczy/measurement.csv", schema=schema, header=True).collect()

You can also repartition your csv to a single file and download with HDFS Browser to your computer if you wish as in the picture above:

spark.read.format("csv").load("/user/pmrowczy/measurement.csv", schema=schema, header=True).repartition(1).write.csv(path="/user/pmrowczy/all-measurements.csv", mode="overwrite", header=True)

Dear Piotr,

thank you for your prompt reply. In the first step, I’d like to play with csv files and your answer exhausts the topic. In the next steps, I’d like to explore Parquet files.

Thank you again,
Michał.

@mmacieje yes, it all depends on your use case.

Just as a hint, it might happen that you will make a mistake and write twice the same data. It might be good idea to drop duplicates with .dropDuplicates(['timestamp'])

@pmrowczy, thank you for an additional tip.
Indeed, I was wondering what shall happen in this case. Would the query take most recent file and drop the older ones?

Yes, so when you write to your csv in append mode, you have to be prepared for eventual duplicates. Thus, your reading job with input from that CSV file should account for duplicates and make sure to filter them.

No, the query is just s shuffling job that makes sure all your records are unique.

@pmrowczy
I tried to run your code, however it seems to miss initialisation of the “sc” variable. Could you please have a look. Thanks!

@mmacieje please remember to have a spark connection done (star button->Connect). This initializes sc and spark variables - you can read more about them under star button.

That was it, thank you. I was not aware about the sc variable.