Apache Spark Transformation – DataFrame
By: Date: August 7, 2017 Categories: Apache Spark Tags: , , , , , , , ,

Apache Spark Transformation – DataFrame

DataFrame can be create from any structured dataset like JSON, relational table, parquet or an existing RDD with defined schema. Following program creates a DataFrame and queries using sql.

Here is the json we will use to play with, copy these following lines into a file and save it in <SPARK_HOME>/bin directory as sample.json.

{"id": "0001", "type": "donut", "name": "Cake", "ppu": 0.55, "batters": {"batter": {"id": "1001", "type": "Regular"} } }
{"id": "0002", "type": "donut2", "name": "Cake2", "ppu": 0.52, "batters": {"batter": {"id": "1002", "type": "Regular"} } }
{"id": "0003", "type": "donut3", "name": "Cake3", "ppu": 0.53, "batters": {"batter": {"id": "1003", "type": "Regular"} } }

This is our Spark Java program which creates a DataFrame from a JSON file:

package com.mishudi.learn.spark.dataframe;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CreateDataFrame {

	public static void main(String[] args) {
		SparkConf sparkConf = new SparkConf().setAppName("CreateDataFrame");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);

		// SQLContext for creating dataframes
		SQLContext sc = new SQLContext(ctx.sc());

		// From a json file
		createDataFrameFromJson(sc);

		// From a RDD after
		createDataFrameFromRDD(ctx, sc);

	}

	private static void createDataFrameFromJson(SQLContext sc) {

		// Create a data frame from a json file, put the sample.json in
		// <SPARK_CONF>/bin
		DataFrame df1 = sc.read().json("sample.json");
		// show() is an action, Show details of the dataframe with first 20 rows
		df1.show();

		// Register the DataFrame as a table. Use table name to query using sql
		df1.registerTempTable("SAMPLE");

		df1 = sc.sql("SELECT type FROM SAMPLE");
		df1.show();
	}

	private static void createDataFrameFromRDD(JavaSparkContext ctx, SQLContext sc) {
		// personDetails contains fname, lname, age and address for persons
		JavaRDD<String> personDetails = ctx
				.parallelize(Arrays.asList("George, Bally, 36, The University of Melbourne Victoria 3010",
						"George, Maxwell, 29, 123 W Beach Avenue Perth 1010"));

		// Split the lines at comma.
		JavaRDD<String[]> splitRecordsPersonDetials = personDetails.map(r -> r.split(","));

		// Now convert JavaRDD<String> to JavaRDD<Row>
		JavaRDD<Row> personDetailsRows = splitRecordsPersonDetials.map(r -> RowFactory.create(String.valueOf(r[0]),
				String.valueOf(r[1]), Integer.valueOf(r[2].trim()), String.valueOf(r[3])));

		StructType schema = DataTypes.createStructType(
				new StructField[] { new StructField("fname", DataTypes.StringType, false, Metadata.empty()),
						new StructField("lname", DataTypes.StringType, false, Metadata.empty()),
						new StructField("age", DataTypes.IntegerType, false, Metadata.empty()),
						new StructField("address", DataTypes.StringType, false, Metadata.empty()) });

		DataFrame df = sc.createDataFrame(personDetailsRows, schema);
		df.show();

		// DataFrame as temp table
		df.registerTempTable("PERSON");

		// Query DataFrame using SQL Query
		df = sc.sql("SELECT fname, lname FROM PERSON");
		df.show();
	}

}

Create DataFrame from JSON

Lets focus on createDataFrameFromJson() method. We first read the JSON using the SparkContext on line 38. Since, the file is placed in the <SPARK_HOME>/bin, name of the file is enough. When you call show() action on a DataFrame, Spark will show first 20 records. Following is the output for our show() call on line 40.

+----------------+----+-----+----+------+
|         batters|  id| name| ppu|  type|
+----------------+----+-----+----+------+
|[[1001,Regular]]|0001| Cake|0.55| donut|
|[[1002,Regular]]|0002|Cake2|0.52|donut2|
|[[1003,Regular]]|0003|Cake3|0.53|donut3|
+----------------+----+-----+----+------+

On line 43 we registered the DataFrame as a table, its data is now available for us to query using raw sql. registerTempTable() method call creates a in-memory table. Nothing is physically stored. The SQL query against the temp table named SAMPLE on line 45 outputs following:

+------+
|  type|
+------+
| donut|
|donut2|
|donut3|
+------+

Notice that we kept reassigning df1 variable, instead creating a new DataFrame variable. This is cool as  DataFrame itself is immutable and if an operation returns a DataFrame like the one on line 45 (sc.sql(“”)) it will be a new DataFrame.

Using similar approach and by calling right operations, you can create a DataFrame from a parquet or a Avro or a jdbc etc as they all have schema information handy.

Create DataFrame from RDD

In createDataFrameFromRDD() method we create personDetails RDD which contains details such as fname, lname, age, address of a person. For simplicity sake, I have used a List of Strings in which each String is comma delimited person detail. You can load a csv file using textFile(String path) operation. We first populate each person detail into a org.apache.spark.sql.Row object on line 59 and then created schema on line 62. When show() on line 62 is called following output is produced:

+------+--------+---+--------------------+
| fname|   lname|age|             address|
+------+--------+---+--------------------+
|George|   Bally| 36| The University o...|
|George| Maxwell| 29| 123 W Beach Aven...|
+------+--------+---+--------------------+

Similarly, after registering this DataFrame as temp table as we did before, following output is produced on line 76:

+------+--------+
| fname|   lname|
+------+--------+
|George|   Bally|
|George| Maxwell|
+------+--------+

Notice that the only difference in JSON and RDD is the way we created the DataFrame, after that API remains same for all other operations.

Download source code from here.

Leave a Reply

Your email address will not be published. Required fields are marked *