Spark Java WordCount
By: Date: July 29, 2017 Categories: Apache Spark Tags: , , , ,

Spark API allows you to write programs in Scala, Python, Java and R. Through out we will be working with Java 8. Following code snippet is WordCount program written in Java. Open the Maven project created in the Setting up eclipse spark project. In package that you added while creating the project, create a new Java class classed SparkJavaWordCount and copy the following code snippet.

import java.util.Arrays;
import java.util.Calendar;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class SparkJavaWordCount {

	public static void main(String[] args) throws Exception {

		SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);
		JavaRDD<String> lines = ctx.textFile(args[0]);

		JavaPairRDD<String, Integer> counts = lines.flatMap(x -> Arrays.asList(x.split(" ")))
				.mapToPair(x -> new Tuple2<String, Integer>(x, 1)).reduceByKey((x, y) -> x + y);

		counts.saveAsTextFile(args[1] + Calendar.getInstance().getTimeInMillis());

		ctx.stop();
	}
}

Don’t worry about the details yet, we will get into them once we have a successful run. Above file needs two inputs, one is the input file location on which the word count has to be run and second is the output dir location. The program will append time in milliseconds to your output dir so that you can run the program without having to delete output directory from previous run.

Now, lets create the jar that we will run to execute our program. Right click your pom.xml and click install this will package a jar for you under target folder. We will be using spark-submit command to submit our program. Now, navigate to <SPARK_HOME>/bin directory and run the following command:

./spark-submit --class your.package.SparkJavaWordCount --master local /PATH_TO_MVN_PROJECT/target/NAME_OF_THE_JAR.jar /INPUT_FILE_LOCATION/ /OUTPUT_FILE_DIRECTORY_LOCATION/

Here is my input file that I used:

John John John Alex Alex Alex 
Alex Alex Alex John John John
A A A A 
B B B C 
D A C B

This is the output that I got after running the program:

(B,4)
(A,5)
(Alex,6)
(C,2)
(John,6)
(D,1)

Amazing!! If you can see the words and their count in the part-00000 file in output directory. Thats great!


Lets get into details of the execution now.

Lets go line by line and see whats going on in the program:

Line 16 creates a SparkContext, it is the entry point for all Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. We will see what accumulators and broadcast are later? for now lets see what RDDs are? Referring the definition from Spark documentation, “Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.” Driver program is the program with main() function in it.

So, in this program our first RDD is created on line 17 by reading the text file. Then we apply series of transformations on this RDD. Transformations are functions that take a RDD as the input and produce one or many RDDs as the output. They cannot change input RDD as an RDD itself is immutable. Our first transformation is lines.flatMap(x -> Arrays.asList(x.split(" "))) which splits each line it reads on space(” “) as token and returns a Iterator of strings and keeps doing this for all the lines. Once all the lines are processed, this entire output(one big Iterator) is sent to next transformation that is  .mapToPair(x -> new Tuple2<String, Integer>(x, 1)) here we loop through and for each word, we create a tuple of String ‘key’ which is the word itself and give it a value of 1. We will give all the words a value of 1, so if a word appeared three times in our iteration each time it will be given a value of 1. Finally, we send this RDD of of tuples (word and value 1) to our next transformation which is .reduceByKey((x, y) -> x + y); which does aggregation and sum by key.

Lets track a word ‘John’ which is present 6 times (3 times on line one and 3 times on line 2) in the input file that I used. We read it, and now for each time we give it a value of 1, now in final transformation we reduce the results by key (John, [1,1,1,1,1,1,1]) and pass values as input to this function. First param is accumulated value and second is current value. This is done until last value for John. Here’s how John word is processed in our reduceByKey function:

x y -> (x + y)
0 1 -> (0 + 1)
1 1 -> (1 + 1)
2 1 -> (2 + 1)
3 1 -> (3 + 1)
4 1 -> (4 + 1)
5 1 -> (5 + 1)

Final word count for word John is (John 6)

Similar logic is applied for all the other words. Then finally all the counts are saved to a directory on line 22, saveAsTextFile() is an example of a Spark action. If this line is missing the program won’t run because Spark will not execute a job if an action is not called on the RDD in action.

Goal is to simply explain the WordCount algorithm as neat as possible. This is a basic program, but is very important for understanding what happens during an execution.

Click here to download the Maven project

Leave a Reply

Your email address will not be published. Required fields are marked *