Apache Spark Transformation – DataFrame
By: Date: August 7, 2017 Categories: Apache Spark Tags: , , , , , , , ,

Apache Spark Transformation – DataFrame

DataFrame can be create from any structured dataset like JSON, relational table, parquet or an existing RDD with defined schema. Following program creates a DataFrame and queries using sql.

Here is the json we will use to play with, copy these following lines into a file and save it in <SPARK_HOME>/bin directory as sample.json.

{"id": "0001", "type": "donut", "name": "Cake", "ppu": 0.55, "batters": {"batter": {"id": "1001", "type": "Regular"} } }
{"id": "0002", "type": "donut2", "name": "Cake2", "ppu": 0.52, "batters": {"batter": {"id": "1002", "type": "Regular"} } }
{"id": "0003", "type": "donut3", "name": "Cake3", "ppu": 0.53, "batters": {"batter": {"id": "1003", "type": "Regular"} } }

This is our Spark Java program which creates a DataFrame from a JSON file:

package com.mishudi.learn.spark.dataframe;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CreateDataFrame {

	public static void main(String[] args) {
		SparkConf sparkConf = new SparkConf().setAppName("CreateDataFrame");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);

		// SQLContext for creating dataframes
		SQLContext sc = new SQLContext(ctx.sc());

		// From a json file
		createDataFrameFromJson(sc);

		// From a RDD after
		createDataFrameFromRDD(ctx, sc);

	}

	private static void createDataFrameFromJson(SQLContext sc) {

		// Create a data frame from a json file, put the sample.json in
		// <SPARK_CONF>/bin
		DataFrame df1 = sc.read().json("sample.json");
		// show() is an action, Show details of the dataframe with first 20 rows
		df1.show();

		// Register the DataFrame as a table. Use table name to query using sql
		df1.registerTempTable("SAMPLE");

		df1 = sc.sql("SELECT type FROM SAMPLE");
		df1.show();
	}

	private static void createDataFrameFromRDD(JavaSparkContext ctx, SQLContext sc) {
		// personDetails contains fname, lname, age and address for persons
		JavaRDD<String> personDetails = ctx
				.parallelize(Arrays.asList("George, Bally, 36, The University of Melbourne Victoria 3010",
						"George, Maxwell, 29, 123 W Beach Avenue Perth 1010"));

		// Split the lines at comma.
		JavaRDD<String[]> splitRecordsPersonDetials = personDetails.map(r -> r.split(","));

		// Now convert JavaRDD<String> to JavaRDD<Row>
		JavaRDD<Row> personDetailsRows = splitRecordsPersonDetials.map(r -> RowFactory.create(String.valueOf(r[0]),
				String.valueOf(r[1]), Integer.valueOf(r[2].trim()), String.valueOf(r[3])));

		StructType schema = DataTypes.createStructType(
				new StructField[] { new StructField("fname", DataTypes.StringType, false, Metadata.empty()),
						new StructField("lname", DataTypes.StringType, false, Metadata.empty()),
						new StructField("age", DataTypes.IntegerType, false, Metadata.empty()),
						new StructField("address", DataTypes.StringType, false, Metadata.empty()) });

		DataFrame df = sc.createDataFrame(personDetailsRows, schema);
		df.show();

		// DataFrame as temp table
		df.registerTempTable("PERSON");

		// Query DataFrame using SQL Query
		df = sc.sql("SELECT fname, lname FROM PERSON");
		df.show();
	}

}

Create DataFrame from JSON

Lets focus on createDataFrameFromJson() method. We first read the JSON using the SparkContext on line 38. Since, the file is placed in the <SPARK_HOME>/bin, name of the file is enough. When you call show() action on a DataFrame, Spark will show first 20 records. Following is the output for our show() call on line 40.

+----------------+----+-----+----+------+
|         batters|  id| name| ppu|  type|
+----------------+----+-----+----+------+
|[[1001,Regular]]|0001| Cake|0.55| donut|
|[[1002,Regular]]|0002|Cake2|0.52|donut2|
|[[1003,Regular]]|0003|Cake3|0.53|donut3|
+----------------+----+-----+----+------+

On line 43 we registered the DataFrame as a table, its data is now available for us to query using raw sql. registerTempTable() method call creates a in-memory table. Nothing is physically stored. The SQL query against the temp table named SAMPLE on line 45 outputs following:

+------+
|  type|
+------+
| donut|
|donut2|
|donut3|
+------+

Notice that we kept reassigning df1 variable, instead creating a new DataFrame variable. This is cool as  DataFrame itself is immutable and if an operation returns a DataFrame like the one on line 45 (sc.sql(“”)) it will be a new DataFrame.

Using similar approach and by calling right operations, you can create a DataFrame from a parquet or a Avro or a jdbc etc as they all have schema information handy.

Create DataFrame from RDD

In createDataFrameFromRDD() method we create personDetails RDD which contains details such as fname, lname, age, address of a person. For simplicity sake, I have used a List of Strings in which each String is comma delimited person detail. You can load a csv file using textFile(String path) operation. We first populate each person detail into a org.apache.spark.sql.Row object on line 59 and then created schema on line 62. When show() on line 62 is called following output is produced:

+------+--------+---+--------------------+
| fname|   lname|age|             address|
+------+--------+---+--------------------+
|George|   Bally| 36| The University o...|
|George| Maxwell| 29| 123 W Beach Aven...|
+------+--------+---+--------------------+

Similarly, after registering this DataFrame as temp table as we did before, following output is produced on line 76:

+------+--------+
| fname|   lname|
+------+--------+
|George|   Bally|
|George| Maxwell|
+------+--------+

Notice that the only difference in JSON and RDD is the way we created the DataFrame, after that API remains same for all other operations.

Download source code from here.