The Word Count program in Java we saw here was written using lambda expression supported in Java 8. So, we passed functions are arguments to our transformation calls like mapToPair() and reduceByKey() etc. In this post we will try to write more detailed implementations of the lambda expressions that we used, as these are still fairly new for lot of Java folks. We will re-write the Java Word Count in as follows.
Lets see the first one with anonymous inner class implementation:
import java.util.Arrays; import java.util.Calendar; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkJavaWordCountAnonymousInnerclass { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0]); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 2406253308558702132L; @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 9148758391341561038L; @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 3613694033500852224L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //Action call counts.saveAsTextFile(args[1] + Calendar.getInstance().getTimeInMillis()); ctx.stop(); } }
Copy above Java program in your maven project that you created here, run the pom.xml and execute the jar using spark-submit as follows:
./spark-submit --class your.package.SparkJavaWordCountAnonymousInnerclass --master local /PATH_TO_MVN_PROJECT/target/NAME_OF_THE_JAR.jar /INPUT_FILE_LOCATION/ /OUTPUT_FILE_DIRECTORY_LOCATION/
You could actually separate and write these inner class implementation of functions into different java classes and then they can be totally reused.
Which implementation should you use and when?
Lambda expressions are compiled into private method of the class and Anonymous inner classes are compiled as classes. Imagine you are processing large Json data set, now each input that is read is a valid Json. If you had processing logic which is deep, going into the Json tree, may be need to call some other method/or external APIs, Lambda expression will not a be a fit in this case. So, if you need to process single unit of behavior as we saw in the WordCount program, Lambdas are a great choice but if you need to add new fields, methods etc go with inner classes.