logo
Apache Spark
Using apache spark for extensive workload
12 January 2024
github

Introduction


Apache Spark is one of the most popular libary to process a large batch of data. It utilize the power of distributed system so that we can scale as much as we need it to be. It is also a lot faster rather than its predecessor Hadoop MapReduce since the data is stored on a RAM instead of a disk. It has a lot of supported programming language such as R, Python, SQL, Scala, and Java and an extensive API to do a complex query. Furthermore, the ecosystem is matured, you can use machine learning algorithm, visualize data, transform data, and many more. Apache Spark is designed to work on a cluster so that the workload can be divided into multiple compute units. More details for Apache Spark can be found in this link. In this article, I am going to demonstrate word stemming on William Shakespeare's work. The code can be found on the github icon.


How it works


Main concepts


The main concept of Apache Spark is RDD(resilient distributed dataset) which allows task partitioning and scheduling. There are three main operations that we can do with RDD which are create, transform, action. Create is quite self explanatory, we initially create the RDD by initializing it in a variable. Transform is where we specify operations that we want to do towards our RDD such as adding, and filtering. Action is when the transform is executed. We can trigger action by calling a specific function such as collect, count, and take.


Lazy evaluation


As you probably have noticed from previous paragraph, transform operations are not executed right away. This is intentional to further optimize all of our operations before it is executed in action phase. This behavior is called lazy evaluation. After all transforms are specified and action function is called, then Apache Spark will automatically optimize our transformation and display the result. Because the lazy evaluation, Apache Spark needs to keep track of which operation is going to be executed or have been executed on which compute unit so that RDD can be reconstructed/recovered in case of a failure. That is why there is a lineage graph that keeps track of these operations by logging each operations. Because of its comprehensive logging, we can also reschedule if there is an error happened.


Components


There are a few key components of apache spark that is being used in Apache Spark which are:

  • Driver Program(master): The main cluster that runs the Apache Spark.
  • Cluster Manager: The manager to orchestrate master slave relationship.
  • Node(slave): The worker node to execute task given from driver program.

trend

Practical Example


Now we are going to use Apache Spark to count stem words on William Shakespeare's work. The first thing that we need to do is create a session and a RDD. We can create a RDD by reading the file with Spark library. In this case, this is not specifically RDD since RDD is an old format. Instead, we use data frame which is a superset of RDD that gives us more functionallity. On that note, we can still convert from one format to the other in case we are working with legacy application.


import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode,regexp_replace,col,lower, monotonically_increasing_id,lit, when

#Create a session
spark = SparkSession.builder.appName('Practise').getOrCreate()

#Read file
shakespeare = spark.read.text("shakespeare.txt")

Next, we clean up the rows by removing white space, remove punctuation, convert to lower case, and split each words into one record.


# Remove white space rows
shakespeare = shakespeare.filter("value != ''")

# Added new column "words" which splits the content of each row into arrays of strings
shakespeare = shakespeare.withColumn("words", split(shakespeare["value"], " "))

# Convert those words to each rows
shakespeare = shakespeare.select("value", explode(shakespeare["words"]).alias("word"))
shakespeare = shakespeare.select("word").filter("word != ''")

# Remove punctuation
shakespeare = shakespeare.withColumn("no_punc", regexp_replace(col("word"), r'[^\w\s]', ''))
shakespeare = shakespeare.select("no_punc")

# Generelize to all lower case
shakespeare = shakespeare.select("no_punc", lower(shakespeare["no_punc"]))
shakespeare = shakespeare.select("lower(no_punc)")
shakespeare = shakespeare.withColumnRenamed("lower(no_punc)","value")

Now our data is pretty much clean and we can start counting the verbs. As we can see from the verb_dict.txt, we can notice a few patterns. Each words is segregated by a carriage return, the root word is always the first entry and followed by non-root word seperated with comma. The idea is I am going to give the same index if it is a comma, and increment the index if the character is carriage return. Based on those two column(word & index), I am going to count each word occurences on previous table. Another thing that we need to handle is the word "be" which pops out on a different words which I am going to handle by combining all records of "be" into one index.


# Give index on each dictionary & remove any duplicates
verb_dict = spark.read.text("verb_dict.txt")
verb_dict = verb_dict.withColumn("row_id", monotonically_increasing_id())
verb_dict = verb_dict.withColumn("split_line", split(verb_dict["value"], ","))
verb_dict = verb_dict.select("row_id", "split_line")
verb_dict = verb_dict.selectExpr("row_id", "explode(split_line) as form")
verb_dict = verb_dict.dropDuplicates(['row_id', 'form'])

# Combine all "be" combinations to one index
condition = (col("row_id") == 49)
updated_df = verb_dict.withColumn("row_id", when(condition, 50).otherwise(col("row_id")))
condition = (col("row_id") == 47)
updated_df = updated_df.withColumn("row_id", when(condition, 50).otherwise(col("row_id")))
condition = (col("row_id") == 48)
updated_df = updated_df.withColumn("row_id", when(condition, 50).otherwise(col("row_id")))

Finally, we can just use group by to sum each words that has the same index and replace the index with the original word. This way, we will know how many occurences of the root word.


# Count verb occurences
count = shakespeare.join(updated_df,"value","inner")
result = count.groupBy("row_id").count()

# Make lookup table
lookup = spark.read.text("verb_dict.txt")
lookup = lookup.withColumn("row_id", monotonically_increasing_id())
split_col = split(col("value"), ",")
lookup = lookup.withColumn("first_value", split_col.getItem(0))
lookup = lookup.select(["row_id","first_value"])
lookup = lookup.withColumnRenamed("first_value","value")

#Replace index with lookup
final = lookup.join(result,"row_id","inner")
final = final.select(["value","count"])