Skip to main content

Apache Spark: ETL

At a very high level, the architecture of Apache Spark consists of Driver Program, Cluster Manager, and Worker Nodes.  An application program for Apache Spark is submitted to Driver Program, where a spark context is created. Through this spark context, a user program can access the various services through API. This driver program requests for resources through cluster manager, distributes parallel operations on these resources, and returns the output back to user's program.

The data in Apache Spark is represented through Resilient Distributed Database (RDD), an abstract handle to data.  Using RDD, a user can transform the data into a different dataset or apply action on the data.

Similar to RDD, Apache Spark allows data to be represented as DataFrame or DataSet both of which allow to impose some kind of structure on the distributed data so that a higher level program can manipulate data without worrying about the optimizations in Spark framework.

With DataSet, the user can impose a schema on the data and can do data-type checking during compile time, whereas DataFrame provides an un-type, generic dataset, which is good when user does not the type ahead of time.  DataSet is only available for Scala and Java language in Spark, not for python or R.

PairRDD is an RDD representing the dataset having key-value pairs. Here is the python code to create a PairRDD and three transformation operations: reduceByKey(), groupByKey(), and combineByKey().

reduceByKey

Let us say you have a csv file called expense.csv with field ID and expense.

001,1000
002,2000
001,2000
...
$ ./bin/pyspark

# sc is the sparkContext provided by pyspark program

expenseID = sc.textFile("expense.csv"

# split each line and then return a tuple of key-value
expenseIDMap = expenseID.map(lambda line:line.split(",")).map(lambda fields: (fields[0],float(fields[1])))

#now we can aggregate the value by key
expenseIDMap.reduceByKey(lambda x,y:x+y).take(2)



groupByKey

Similarly, to the retreive elements by the group, we can use groupbyKey() function.

expenseID = sc.parallelize([("001", 1000),("002",2000),("003",5000),("001",2000)])

expenseID.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()

combineByKey

Here is an example of combineByKey() for key-value pair PairRDD.  We have data where for each key we have two values.
Now, we want to calculate the average for each key.

expenseID = sc.parallelize([("001", 1000),("002",2000),("003",5000),("001",2000),("003", 6000),("002",7000)])


expenseSumCount = expenseID.combineByKey(
   lambda expense: (expense,1), 
   lambda expenseCountPair,val: (expenseCountPair[0]+val, expenseCountPair[1]+1), 

   lambda expenseCountPairexpenseCountPairNext: ((expenseCountPair[0]+ expenseCountPairNext[0]), (expenseCountPair[1]+ expenseCountPairNext[1])))

First lambda function is called createCombiner, which in each partition for the first occurrence of the key create a tuple (key, (value,1)).  

The second lambda function is called mergeValue, which on subsequent occurrence of the same key in within the same partition, add the value associated with the subsequent key with previous values of the same key. This way we are adding all the values associated with the same key in each partition and incrementing the count of occurrence of the key, the +1 in the second lambda.  

The third lambda function is called to combine the value and counter from each partition for each key into a single value to be returned to the user program.


Extraction, Transformation, Loading


With connectors in Spark, we can combine the process of extraction and loading into one task. The common file format Spark supports are:
  • text file
  • JSON
  • CSV
  • sequence file
  • object file

We can use function textFile() to load and saveAsTextFile() to save data as text. 

Similary, for CSV file, spark.read.csv() and RDD.write.format() can be use to read and write csv format.

dataset = spark.read.csv("<filename>",header=False) 
dataset.write.format("csv").save("<directory name>”)

For JSON file, use spark.read.json() function.

dataset = spark.read.json("directoryname/filename.json")

Sequence file is made up of key-value pairs.

dataset = sc.sequenceFile("<directory>")
dataset.saveAsSequenceFile("<directory>”)

Object file contains java objects saved through some serialization process. 


RDD and DataFrame

We can convert a dataframe to RDD by calling rdd object of dataframe. For example,

someRDD = someDataFrame.rdd;


Converting an RDD to DataFrame in python.


from pyspark.sql import Row
#Load a text File
dataFile = sc.textFile("/some.csv")

#splitting each line to multiple column
dataFileWithColumns = dataFile.map(lambda l: l.split(","))

#Apply a Schema
dataWithSchema = dataFileWithColumns.map(lambda cd: Row(OriginNum=cd[0],TermNum=cd[1],Origin=cd[2],Term=cd[3],datetime=cd[4],callCharge=int(cd[5])))

#create DataFrame
someDF=spark.createDataFrame(dataWithSchema)

#Do some dataFrame operation
someDF.show(3) 





Comments

Popular posts from this blog

Decision Tree

Decision tree is a multi-class classification tool allowing a data point to be classified into one of many (two or more) classes available.  A decision tree divides the sample space into a rectilinear region. This will be more clear with an example. Let us say we have this auto-insurance claim related data as shown in the following table. We want to predict what type of customer profile may more likely lead to claim payout.  The decision tree model may first divide the sample space based on age. So, now we have two regions divided based on the age. Next, one of those regions will further sub-divided based Marital_status, and then that newly divided sub-regision may further get divide based on Num_of_vehicle_owned.  A decision tree is made up of a root node followed by intermediate node and leaf node.  Each leaf node represents one of the class into which data points have been classified to. An intermediate node represents the decision rule based...

Recommender System using Collaborative filtering

Recommender system using collaborative filtering approach uses the past users' behavior to predict what items the current user would like. We create a UxM matrix where U is the number of users and M is the number of different items or products. Uij is the rating expressed by the user-i for product-j. In the real world, not every user expresses an opinion about every product. For example, let us say there are five users including Bob has expressed their opinion about four movies as shown below Table 1: movie1 movie2 movie3 movie4 user1 1 3 3 5 user2 2 4 5 user3 3 2 2 user4 1 3 4 Bob 3 2 5 ?  Our goal is to predict what movies to recommend to Bob, or put it another way should we recommend movie4 to Bob, knowing the rating for four movies from other users including Bob. Traditionally, we could do item to item comparison, which means if the user has liked item1 in the past then that user may like other items similar to item1. Another way to recommend...

Sentimental Analysis Using Scikit-Learn and Neural Network

Using Scikit-Learn and NLTK for Sentimental Analysis Sentimental analysis is a way of categorizing text into subgroup based on the opinion or sentiments expressed in the text. For example, we would like to categorize review or comments of people about a movie to determine how many like the movie and how many don't. In a supervised sentimental analysis, we have some training data which is already categorized or sub-grouped into different categories, for example, into 'positive' or 'negative' sentiments. We used these training data to train our model to learn what makes a text to be part of a specific group. By text I mean a sentence or a paragraph. Using this labeled sentences, we are going to build a model. So, let us say we have following training text: training_positive = list() training_positive[0] =  "bromwell high is a nice cartoon comedy perfect for family" training_positive[1] =  " homelessness or houselessness as george carlin s...