At a very high level, the architecture of Apache Spark consists of Driver Program, Cluster Manager, and Worker Nodes. An application program for Apache Spark is submitted to Driver Program, where a spark context is created. Through this spark context, a user program can access the various services through API. This driver program requests for resources through cluster manager, distributes parallel operations on these resources, and returns the output back to user's program.
The data in Apache Spark is represented through Resilient Distributed Database (RDD), an abstract handle to data. Using RDD, a user can transform the data into a different dataset or apply action on the data.
Similar to RDD, Apache Spark allows data to be represented as DataFrame or DataSet both of which allow to impose some kind of structure on the distributed data so that a higher level program can manipulate data without worrying about the optimizations in Spark framework.
With DataSet, the user can impose a schema on the data and can do data-type checking during compile time, whereas DataFrame provides an un-type, generic dataset, which is good when user does not the type ahead of time. DataSet is only available for Scala and Java language in Spark, not for python or R.
PairRDD is an RDD representing the dataset having key-value pairs. Here is the python code to create a PairRDD and three transformation operations: reduceByKey(), groupByKey(), and combineByKey().
001,1000
002,2000
001,2000
...
$ ./bin/pyspark
# sc is the sparkContext provided by pyspark program
expenseID = sc.parallelize([("001", 1000),("002",2000),("003",5000),("001",2000)])
Now, we want to calculate the average for each key.
expenseID = sc.parallelize([("001", 1000),("002",2000),("003",5000),("001",2000),("003", 6000),("002",7000)])
The data in Apache Spark is represented through Resilient Distributed Database (RDD), an abstract handle to data. Using RDD, a user can transform the data into a different dataset or apply action on the data.
Similar to RDD, Apache Spark allows data to be represented as DataFrame or DataSet both of which allow to impose some kind of structure on the distributed data so that a higher level program can manipulate data without worrying about the optimizations in Spark framework.
With DataSet, the user can impose a schema on the data and can do data-type checking during compile time, whereas DataFrame provides an un-type, generic dataset, which is good when user does not the type ahead of time. DataSet is only available for Scala and Java language in Spark, not for python or R.
PairRDD is an RDD representing the dataset having key-value pairs. Here is the python code to create a PairRDD and three transformation operations: reduceByKey(), groupByKey(), and combineByKey().
reduceByKey
Let us say you have a csv file called expense.csv with field ID and expense.001,1000
002,2000
001,2000
...
$ ./bin/pyspark
# sc is the sparkContext provided by pyspark program
expenseID = sc.textFile("expense.csv")
# split each line and then return a tuple of key-value
# split each line and then return a tuple of key-value
expenseIDMap = expenseID.map(lambda line:line.split(",")).map(lambda fields: (fields[0],float(fields[1])))
#now we can aggregate the value by key
expenseIDMap.reduceByKey(lambda x,y:x+y).take(2)
Similarly, to the retreive elements by the group, we can use groupbyKey() function.#now we can aggregate the value by key
expenseIDMap.reduceByKey(lambda x,y:x+y).take(2)
groupByKey
expenseID = sc.parallelize([("001", 1000),("002",2000),("003",5000),("001",2000)])
expenseID.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
combineByKey
Here is an example of combineByKey() for key-value pair PairRDD. We have data where for each key we have two values.Now, we want to calculate the average for each key.
expenseID = sc.parallelize([("001", 1000),("002",2000),("003",5000),("001",2000),("003", 6000),("002",7000)])
expenseSumCount = expenseID.combineByKey(
lambda expense: (expense,1),
lambda expenseCountPair,val: (expenseCountPair[0]+val, expenseCountPair[1]+1),
lambda expenseCountPair, expenseCountPairNext: ((expenseCountPair[0]+ expenseCountPairNext[0]), (expenseCountPair[1]+ expenseCountPairNext[1])))
First lambda function is called createCombiner, which in each partition for the first occurrence of the key create a tuple (key, (value,1)).
The second lambda function is called mergeValue, which on subsequent occurrence of the same key in within the same partition, add the value associated with the subsequent key with previous values of the same key. This way we are adding all the values associated with the same key in each partition and incrementing the count of occurrence of the key, the +1 in the second lambda.
The third lambda function is called to combine the value and counter from each partition for each key into a single value to be returned to the user program.
Extraction, Transformation, Loading
With connectors in Spark, we can combine the process of extraction and loading into one task. The common file format Spark supports are:
- text file
- JSON
- CSV
- sequence file
- object file
We can use function textFile() to load and saveAsTextFile() to save data as text.
Similary, for CSV file, spark.read.csv() and RDD.write.format() can be use to read and write csv format.
dataset = spark.read.csv("<filename>",header=False)
dataset.write.format("csv").save("<directory name>”)
For JSON file, use spark.read.json() function.
dataset = spark.read.json("directoryname/filename.json")
Sequence file is made up of key-value pairs.
dataset = sc.sequenceFile("<directory>")
dataset.saveAsSequenceFile("<directory>”)
Object file contains java objects saved through some serialization process.
RDD and DataFrame
We can convert a dataframe to RDD by calling rdd object of dataframe. For example,
someRDD = someDataFrame.rdd;
Converting an RDD to DataFrame in python.
from pyspark.sql import Row
#Load a text File
dataFile = sc.textFile("/some.csv")
#splitting each line to multiple column
dataFileWithColumns = dataFile.map(lambda l: l.split(","))
#Apply a Schema
dataWithSchema = dataFileWithColumns.map(lambda cd: Row(OriginNum=cd[0],TermNum=cd[1],Origin=cd[2],Term=cd[3],datetime=cd[4],callCharge=int(cd[5])))
#create DataFrame
someDF=spark.createDataFrame(dataWithSchema)
#Do some dataFrame operation
someDF.show(3)
Comments
Post a Comment