General Spark Transformations
By: Date: July 31, 2017 Categories: Apache Spark Tags: , , , , , , , , , ,

Apache Spark Transformations

In this post we will be focussing on general Apache Spark transformation against RDDs. We will keep it simple but try to go as deep as we can. Download link is provided at the bottom for you to run the programs and try it with your input. Goal is to get familiar with transformations and sparks data processing. We will go deeper and learn different types of transformations (Narrow vs Wide) and other internal details once we have a base understanding of simpler transformations.

  • Map()

Map() transformation will pass through each element in the source rdd and it performs the implmented function. Cool thing about this transformation is that size of the output RDD and the RDD on which map() was applied is same. This transformation is good candidate when you want output count to be same as input and processing logic applied on each row in the input RDD.

/**
 * 
 */
package com.mishudi.learn.spark;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class MapTransformation {

	public static void main(String[] args) throws Exception {

		SparkConf sparkConf = new SparkConf().setAppName("MapTransformation");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);
		JavaRDD<String> initialRdd = ctx.parallelize(Arrays.asList("Alex-Sam", "Anil-Rahul", "Tom-Tim", "Rob-Jim"));

		System.out.println("initialRdd "+initialRdd.count());
		
		JavaRDD<Tuple2<String, String>> fatherToSonNameRDD = initialRdd.map(r -> new Tuple2<>(r.split("-")[0], r.split("-")[1]));
		fatherToSonNameRDD.collect().forEach(r -> System.out.println("Father is "+r._1+" and Son is "+r._2));
		
		System.out.println("fatherToSonNameRDD "+fatherToSonNameRDD.count());
		ctx.stop();
	}
}

Output of above program is

apache spark map transformation

  • Filter()

Filter() transformation will retain the data that passes the filter condition. In our example, any line in the webserver.log file that has ERROR keyword will be retained and everything else is dropped and will not be presetn in the new RDD that is created after filter successfully runs. Note that this wont modify the parent RDD and initial RDD lines will still retain all the lines it read from the text file. Since, RDDs themselves are immutable. Put the log file in <SPARK_HOME>/bin.

package com.mishudi.learn.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class FilterTransformation {
	public static void main(String[] args) throws Exception {

		SparkConf sparkConf = new SparkConf().setAppName("MapTransformation");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);
		JavaRDD<String> lines = ctx.textFile("webserver.log.txt");

		//Filter out errors first and count
		JavaRDD<String> errorLines = lines.filter(r -> r.contains("ERROR"));
		long errorCount = errorLines.count();
		System.out.println("Total errors in the log file are "+errorCount);
		
		//Filter and count timeout related errors
		JavaRDD<String> timeoutRelatedErrors = errorLines.filter(r -> r.contains("{timeout}"));
		long timeOutErrorCount = timeoutRelatedErrors.count();
		System.out.println("Total timeout related errors in the log file are "+timeOutErrorCount);
		
		ctx.stop();
	}
}

Output of above program is

Apache Spark Filter Transformation

 

  • flatMap()

flatMap() transformation will work on each element in the input RDD, creating a collection for each element and then finally flattening all these N collections into one output RDD. Size of output RDD is usually create than the input RDD in this transformation.

/**
 * 
 */
package com.mishudi.learn.spark;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class FlatMapTransformation {

	public static void main(String[] args) throws Exception {

		SparkConf sparkConf = new SparkConf().setAppName("FlatMapTransformation");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);
		JavaRDD<String> initialRdd = ctx.parallelize(Arrays.asList("Alex-Sam", "Anil-Rahul", "Tom-Tim", "Rob-Jim"));

		System.out.println("initialRdd "+initialRdd.count());
		
		//Flatten the intialRdd
		JavaRDD<String> splitFlattenedRDD = initialRdd.flatMap(r -> Arrays.asList(r.split("-")));
		splitFlattenedRDD.collect().forEach(r -> System.out.println(r));
		
		System.out.println("splitFlattenedRDD "+splitFlattenedRDD.count());
		
		ctx.stop();
	}
}

Output of above program is

Apache Spark FlatMap Transformation

  • mapPartitions()

mapPartition() works same way as the map but with one big difference, this transformation is only called once per partition, where as map() is called for each element in the partition. If you at this point do not know what partitions are? Its a good time to read my post on partitions. Imagine you had to connect to a database, retrieve some information and use it for transforming the elements of the RDD. If you had used map() here, you would be making calls to the database for each element in the RDD for the partition. But, if you use mapPartitions() instead you will be making the database call only once, as this operation is called only once per partition. In the below code, we try to retrieve valid values for names from database and then use it for each element in the RDD, but we call the database only once at the beginning of the transformation before iterating over the elements and Apache Spark framework guarantees that this transformation is only called once and an Iterator of elements is passed which belongs to the partition it is working on.

/**
 * 
 */
package com.mishudi.learn.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

public class MapParittionsTransformation {

	public static void main(String[] args) throws Exception {

		SparkConf sparkConf = new SparkConf().setAppName("MapParittionsTransformation");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);
		JavaRDD<String> initialRdd = ctx.parallelize(Arrays.asList("Sam", "Robert", "Rahul", "Mike", "Stepheny"));

		JavaRDD<String> validValuesRdd = initialRdd.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 303607085998713322L;

			/**
			 * 
			 * @param itr
			 *            represents all the data that is to be processed for
			 *            this partition
			 * @return
			 * @throws Exception
			 */
			@Override
			public Iterable<String> call(Iterator<String> itr) throws Exception {

				// Call to DB to get all valid values only once per partition
				List<String> validValues = getValidInfoFromDb();

				List<String> allProcessedDataFromPartition = new ArrayList<String>();
				while (itr.hasNext()) {
					String str = itr.next();
					if (validValues.contains(str)) {
						allProcessedDataFromPartition.add(str);
					}
				}
				return allProcessedDataFromPartition;
			}
		});

		//Collect will bring everything to the driver and it must fit in the memory
		validValuesRdd.collect().forEach(x -> System.out.println(x));
		
		ctx.stop();
	}

	/**
	 * A mock method which simulates DB call
	 * @return
	 */
	private static List<String> getValidInfoFromDb() {
		List<String> validValues = new ArrayList<String>();
		validValues.add("Robert");
		validValues.add("Tom");
		validValues.add("John");
		validValues.add("Mike");
		validValues.add("Alex");
		return validValues;
	}

}

Following is the output:

Robert
Mike
  • mapPartitionsWithIndex()

mapartitionsWithIndex() is another very useful transformation. On RDDs, it works same as the mapPartitions() transformation but only difference is that it also gives the user the partition number. See the following:

initialRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 3247292905849349696L;

			@Override
			public Iterator<String> call(Integer partitionNumber, Iterator<String> v2) throws Exception {
				// TODO Auto-generated method stub
				return null;
			}
		
		}, false);

This operation is called once per partition and partition number is also passed along with all the Iterator of elements for the partition. However, there is another argument user needs to pass apart from the function, that is preservesPartitioning, it indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn’t modify the keys.

Before we get to know other cool transformations of Spark, this is good point to introduce another beautiful data abstraction called DataFrame. Lets understand DataFrame in detail in the next post.

Click – Here to download above source code for RDD transformations discussed in this post.