Relational SetTheory Tranformation
By: Date: August 12, 2017 Categories: Apache Spark Tags: , , , , , , , , , , , , , ,

Relation/Set Theory transformations

We will be playing with this following program to understand the three important set theory based transformations.

package com.mishudi.learn.spark.dataframe;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class RelationalOrSetTheoryTransformations {

	public static void main(String[] args) {
		SparkConf sparkConf = new SparkConf().setAppName("RelationalOrSetTheoryTransformations");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);

		// SQLContext for creating dataframes
		SQLContext sc = new SQLContext(ctx.sc());

		// Union Transformation for combining results from two different data
		// sets(RDD or DataFrame)
		onDataframe(ctx, sc);
		onRDD(ctx, sc);
	}

	/**
	 * Relational Or Set Theory Transformations on dataframes Both JSON files
	 * can be copied from resources folder
	 * 
	 * @param ctx
	 * @param sc
	 */
	private static void onDataframe(JavaSparkContext ctx, SQLContext sc) {
		// Create a data frame from a json file, put the sample.json in
		// <SPARK_CONF>/bin
		DataFrame df1 = sc.read().json("sample1.json");
		df1.show();

		DataFrame df2 = sc.read().json("sample1_v2.json");
		df2.show();

		// Union of two dataframes
		DataFrame dfUnion = df1.unionAll(df2);
		// All the data combined from both the data frames
		dfUnion.show();

		// Intersection of two dataframes
		DataFrame dfIntersect = df1.intersect(df2);
		dfIntersect.show();

		// Except of two dataframes
		DataFrame dfExcept = df1.except(df2);
		dfExcept.show();
	}

	private static void onRDD(JavaSparkContext ctx, SQLContext sc) {
		JavaRDD<Integer> rdd1 = ctx.parallelize(Arrays.asList(1, 2, 3, 4));

		JavaRDD<Integer> rdd2 = ctx.parallelize(Arrays.asList(1, 2, 3, 4, 5));

		// Union of two RDDs
		JavaRDD<Integer> rddUnion = rdd1.union(rdd2);
		rddUnion.collect().forEach(r -> System.out.print(r + " "));

		// Intersection of two dataframes
		JavaRDD<Integer> rddIntersect = rdd1.intersection(rdd2);
		rddIntersect.collect().forEach(r -> System.out.print(r + " "));

		// Substraction of two dataframes
		JavaRDD<Integer> rddSubstract = rdd2.subtract(rdd1);
		rddSubstract.collect().forEach(r -> System.out.print(r + " "));

	}
}

Union

Union needs the two dataframes to have same number of columns and with similar types. Column names can be different, this is not a problem, it will just transform and put the rows from second dataframe in to first dataframe on which the unionAll() transformation was called. These rules don’t apply to RDDs. Union on RDD can be done with RDD’s with different size but the data type has to be same in RDDs. Union will bring in duplicates also if there are any.

// Union of two dataframes
DataFrame dfUnion = df1.unionAll(df2);
// All the data combined from both the data frames
dfUnion.show();

// Union of two RDDs
JavaRDD<Integer> rddUnion = rdd1.union(rdd2);
rddUnion.collect().forEach(r -> System.out.print(r + " "));

Following are the output from union transformation:

Union of Dataframes
+----+-----+----+------+
|  id| name| ppu|  type|
+----+-----+----+------+
|0001| Cake|0.55| donut|
|0002|Cake2|0.52|donut2|
|0003|Cake3|0.53|donut3|
|0001| Cake|0.55| donut|
|0004|Cake4|0.33| donu4|
|0005|Cake5|0.22|donut5|
|0006|Cake6|0.73|donut6|
+----+-----+----+------+

Union of two RDDs
1 2 3 4 1 2 3 4 5

Intersection

Intersection will only collect data that is common between two dataframes or RDDs. Intersection needs two dataframes to be with same number of columns and types.

// Intersection of two dataframes
DataFrame dfIntersect = df1.intersect(df2);
dfIntersect.show();		

// Intersection of two dataframes
JavaRDD<Integer> rddIntersect = rdd1.intersection(rdd2);
rddIntersect.collect().forEach(r -> System.out.print(r + " "));

Following is the out put from intersection transformation.

Intersection of two Dataframes
+----+----+----+-----+
|  id|name| ppu| type|
+----+----+----+-----+
|0001|Cake|0.55|donut|
+----+----+----+-----+

Intersection of two RDDs
4 1 3 2

Subtract/Except

Except transformation on dataframe is similar to subtraction performed on RDD. It will only collect data that is present in the first dataframe on which the operation is called. Except requires both the dataframes to have same number of columns and types. For RDD, framework provide substract transformation which behaves exactly the same way.

// Except of two dataframes
DataFrame dfExcept = df1.except(df2);
dfExcept.show();

// Substraction of two dataframes
JavaRDD<Integer> rddSubstract = rdd2.subtract(rdd1);
rddSubstract.collect().forEach(r -> System.out.print(r + " "));

Following is the output of subtract and except transformation.

Substraction of two dataframes
+----+-----+----+------+
|  id| name| ppu|  type|
+----+-----+----+------+
|0002|Cake2|0.52|donut2|
|0003|Cake3|0.53|donut3|
+----+-----+----+------+

Substraction of two RDDs
5

Errors you may encounter:
Exception in thread “main” org.apache.spark.sql.AnalysisException: unresolved operator; This will occur when either if dataframes number of columns don’t match or their types. To see the the schema we can call printSchema() on dataframe and inspect the discrepancies between schemas or two dataframes.

Both JSON files are present in the resources folder, copy them to <SPARK_HOME>/bin folder before running the program.

Recollect how to run the program? Download the maven project from below, run the pom.xml and then execute this command:

<SPARK_HOME>/bin/./spark-submit --class com.mishudi.learn.spark.dataframe.RelationalOrSetTheoryTransformations --master local /PATH/TO/PROJECT/target/spark-test-0.0.1-SNAPSHOT.jar

Inner Join

We will use two different JSON files to understand the Joins. Employee.json and Department.json are placed in resources folder. Please copy them in <SPARK_HOME>/bin folder before running the programs. Also, we wrote a new Java program just to be clear and precise. Following is the code we will discuss:

package com.mishudi.learn.spark.dataframe;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import com.google.common.base.Optional;

import scala.Tuple2;

public class RelationalOrSetTheoryTransformations2 {

	public static void main(String[] args) {
		SparkConf sparkConf = new SparkConf().setAppName("RelationalOrSetTheoryTransformations2");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);

		// SQLContext for creating dataframes
		SQLContext sc = new SQLContext(ctx.sc());

		// Different join transformations for sql querying
		// on (RDD or DataFrame)
		onDataframe(ctx, sc);
		onRDD(ctx, sc);
	}

	/**
	 * Relational Or Set Theory Transformations 2 on dataframes Both JSON files
	 * can be copied from resources folder
	 * 
	 * @param ctx
	 * @param sc
	 */
	private static void onDataframe(JavaSparkContext ctx, SQLContext sc) {
		// Create a data frame from a json file, put the sample.json in
		// <SPARK_CONF>/bin
		DataFrame employeeDf = sc.read().json("Employee.json");
		employeeDf.show();

		DataFrame deptDf = sc.read().json("Department.json");
		deptDf.show();

		// Inner join of two dataframes
		DataFrame dfInnerJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "inner");
		dfInnerJoin.show();

		// Outer join of two dataframes `inner`, `outer`, `left_outer`,
		// `right_outer`,
		DataFrame dfOuterJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "outer");
		dfOuterJoin.show();
		
		// left_outer join of two dataframes
		DataFrame dfLeftOuterJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "left_outer");
		dfLeftOuterJoin.show();
		
		// right_outer join of two dataframes
		DataFrame dfRightOuterJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "right_outer");
		dfRightOuterJoin.show();
	}

	private static void onRDD(JavaSparkContext ctx, SQLContext sc) {
		/**
		 * SQL style Join transformations are possible only on pair rdds
		 */
		JavaPairRDD<String, String> pairRdd1 = ctx.parallelizePairs(Arrays.asList(
				new Tuple2<String, String>("key1", "value1"), new Tuple2<String, String>("key2", "value2"),
				new Tuple2<String, String>("key3", "value3"), new Tuple2<String, String>("key4", "value10")));

		JavaPairRDD<String, String> pairRdd2 = ctx.parallelizePairs(Arrays.asList(
				new Tuple2<String, String>("key1", "value3"), new Tuple2<String, String>("key5", "value98"),
				new Tuple2<String, String>("key2", "value6"), new Tuple2<String, String>("key7", "value5")));

		// Simple Join, like an inner join
		JavaPairRDD<String, Tuple2<String, String>> pairRdd3 = pairRdd1.join(pairRdd2);
		pairRdd3.collect()
				.forEach(r -> System.out.println("Key ->" + r._1 + " Values->" + r._2._1() + "," + r._2._2()));

		// A full outer join
		JavaPairRDD<String, Tuple2<Optional<String>, Optional<String>>> pairRdd4 = pairRdd1.fullOuterJoin(pairRdd2);
		pairRdd4.collect()
				.forEach(r -> System.out.println("Key ->" + r._1 + " Values->" + r._2._1() + "," + r._2._2()));

		// A Left outer join
		JavaPairRDD<String, Tuple2<String, Optional<String>>> pairRdd5 = pairRdd1.leftOuterJoin(pairRdd2);
		pairRdd5.collect().forEach(r -> System.out.println("Key ->" + r._1 + " Values->" + r._2._1() + "," + r._2._2()));
		
		// A Right outer join
		JavaPairRDD<String, Tuple2<Optional<String>, String>> pairRdd6 = pairRdd1.rightOuterJoin(pairRdd2);
		pairRdd6.collect().forEach(r -> System.out.println("Key ->" + r._1 + " Values->" + r._2._1() + "," + r._2._2()));
	}
}

Inner Join, will collect only rows that are common between two dataframes and this is established by the expression passed in the second parameter employeeDf.col(“dept”).$eq$eq$eq(deptDf.col(“id”)). Whenever, dept column from employeeDf and id column from deptDf are equal that row will joined and collected in the resulting dataframes. Third parameter ‘inner’ sets the joinType. In case of RDDs, join is implicit to be a inner join.

// Inner join of two dataframes
DataFrame dfInnerJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "inner");
dfInnerJoin.show();

// Simple Join, like an inner join
JavaPairRDD<String, Tuple2<String, String>> pairRdd3 = pairRdd1.join(pairRdd2);
pairRdd3.collect().forEach(r -> System.out.println("Key ->" + r._1 + " value ->" + r._2));
Following are the outputs from inner join:
//Inner join two dataframes
+--------------------+------+----+--------------+------+-------------+
|             address|  dept|  id|          name|    id|         name|
+--------------------+------+----+--------------+------+-------------+
|  123 W. Avenue, U.K|Dep_01|0001|      Lex Raul|Dep_01|Human Reource|
|342 Birch grove, U.K|Dep_01|0002|Robinson Scott|Dep_01|Human Reource|
|010 Downtown Sout...|Dep_02|0003|   Matt Wilson|Dep_02|        Sales|
|French Ave, 1 St....|Dep_04|0004|     William J|Dep_04|     Products|
|Missian Bay, Lond...|Dep_05|0005|     Tyron Sr.|Dep_05|   Engg R & D|
+--------------------+------+----+--------------+------+-------------+

//inner join two dataframes
Key ->key2 Values->value2,value6
Key ->key1 Values->value1,value3
Tuple: tuples combine a fixed number of items together so that they can 
be passed around as a whole. Based on the number of items in the tuple, 
the number as an extension is appended. As we just had two strings to be 
stored in the tuple we defined our tuple as Tuple2<String, String>. 
If we had 4 then it would've been Tuple4<String, String, String, String>. 
beauty of the Tuple class is that it is open to mixing various different
types together and pass it around.

Outer/Full Outer Join

Outer Join and full outer join are same and they bring everything from both the dataframes/RDD in context. Fields where join annot be performed are marked null. Similarly, in case of RDD the framework returns a Optional wrapper around the Strings which takes care of marking fields as absent, which actually means null/no value found.

// Outer join of two dataframes 
DataFrame dfOuterJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "outer");
dfOuterJoin.show();

// A full outer join
JavaPairRDD<String, Tuple2<Optional<String>, Optional<String>>> pairRdd4 = pairRdd1.fullOuterJoin(pairRdd2);
pairRdd4.collect().forEach(r -> System.out.println("Key ->" + r._1 + " Values->" + r._2._1() + "," + r._2._2()));
Following are the outputs from outer/full outer join:
//Outer join two dataframes
+--------------------+------+----+--------------+------+-------------+
|             address|  dept|  id|          name|    id|         name|
+--------------------+------+----+--------------+------+-------------+
|  123 W. Avenue, U.K|Dep_01|0001|      Lex Raul|Dep_01|Human Reource|
|342 Birch grove, U.K|Dep_01|0002|Robinson Scott|Dep_01|Human Reource|
|010 Downtown Sout...|Dep_02|0003|   Matt Wilson|Dep_02|        Sales|
|                null|  null|null|          null|Dep_03|           IT|
|French Ave, 1 St....|Dep_04|0004|     William J|Dep_04|     Products|
|Missian Bay, Lond...|Dep_05|0005|     Tyron Sr.|Dep_05|   Engg R & D|
|                null|  null|null|          null|Dep_06|  Vendor Mgmt|
|                null|  null|null|          null|Dep_07|     Security|
|St. Luis Street, ...|Dep_11|0017|     Tyron Sr.|  null|         null|
+--------------------+------+----+--------------+------+-------------+

//Full outer join two RDDs
Key ->key4 Values->Optional.of(value10),Optional.absent()
Key ->key5 Values->Optional.absent(),Optional.of(value98)
Key ->key7 Values->Optional.absent(),Optional.of(value5)
Key ->key2 Values->Optional.of(value2),Optional.of(value6)
Key ->key3 Values->Optional.of(value3),Optional.absent()
Key ->key1 Values->Optional.of(value1),Optional.of(value3)

Left Outer Join

Left Outer join will bring all the data from employee dataframe and and the rows that match the join condition in deptDf are also joined. Look, in case of RDD, the Optional wrapper is applied only to the 2nd parameter which actually is the data from 2nd(pairRdd2) RDD because if the join condition is not met for those fields that actually belong to the pairRdd2 are marked absent.

// left_outer join of two dataframes
		DataFrame dfLeftOuterJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "left_outer");
		dfLeftOuterJoin.show();
		
		// A Left outer join
		JavaPairRDD<String, Tuple2<String, Optional<String>>> pairRdd5 = pairRdd1.leftOuterJoin(pairRdd2);
		pairRdd5.collect().forEach(r -> System.out.println("Key ->" + r._1 + " Values->" + r._2._1() + "," + r._2._2()));
Following are the outputs from Left Outer join:
//Left Outer join two dataframes
+--------------------+------+----+--------------+------+-------------+
|             address|  dept|  id|          name|    id|         name|
+--------------------+------+----+--------------+------+-------------+
|  123 W. Avenue, U.K|Dep_01|0001|      Lex Raul|Dep_01|Human Reource|
|342 Birch grove, U.K|Dep_01|0002|Robinson Scott|Dep_01|Human Reource|
|010 Downtown Sout...|Dep_02|0003|   Matt Wilson|Dep_02|        Sales|
|French Ave, 1 St....|Dep_04|0004|     William J|Dep_04|     Products|
|Missian Bay, Lond...|Dep_05|0005|     Tyron Sr.|Dep_05|   Engg R & D|
|St. Luis Street, ...|Dep_11|0017|     Tyron Sr.|  null|         null|
+--------------------+------+----+--------------+------+-------------+

//Left outer join two RDDs
Key ->key4 Values->value10,Optional.absent()
Key ->key2 Values->value2,Optional.of(value6)
Key ->key3 Values->value3,Optional.absent()
Key ->key1 Values->value1,Optional.of(value3)

Right Outer Join

For right outer join, he derivation happens from right side just like how it happened in left out join from left side RDD/dataframe.

// right_outer join of two dataframes
DataFrame dfRightOuterJoin = employeeDf.join(deptDf, employeeDf.col("dept").$eq$eq$eq(deptDf.col("id")), "right_outer");
dfRightOuterJoin.show();

// A Right outer join
JavaPairRDD<String, Tuple2<Optional<String>, String>> pairRdd6 = pairRdd1.rightOuterJoin(pairRdd2);
pairRdd6.collect().forEach(r -> System.out.println("Key ->" + r._1 + " Values->" + r._2._1() + "," + r._2._2()));
Following are the outputs from Right Outer join:
//Right Outer join two dataframes
+--------------------+------+----+--------------+------+-------------+
|             address|  dept|  id|          name|    id|         name|
+--------------------+------+----+--------------+------+-------------+
|  123 W. Avenue, U.K|Dep_01|0001|      Lex Raul|Dep_01|Human Reource|
|342 Birch grove, U.K|Dep_01|0002|Robinson Scott|Dep_01|Human Reource|
|010 Downtown Sout...|Dep_02|0003|   Matt Wilson|Dep_02|        Sales|
|French Ave, 1 St....|Dep_04|0004|     William J|Dep_04|     Products|
|                null|  null|null|          null|Dep_03|           IT|
|Missian Bay, Lond...|Dep_05|0005|     Tyron Sr.|Dep_05|   Engg R & D|
|                null|  null|null|          null|Dep_06|  Vendor Mgmt|
|                null|  null|null|          null|Dep_07|     Security|
+--------------------+------+----+--------------+------+-------------+


//Right outer join two RDDs
Key ->key5 Values->Optional.absent(),value98
Key ->key7 Values->Optional.absent(),value5
Key ->key2 Values->Optional.of(value2),value6
Key ->key1 Values->Optional.of(value1),value3

For more details and full understanding SQL Joins read https://www.w3schools.com/sql/sql_join.asp.

Click here to download full source code so far.