Apache Spark Transformation – DataFrame
DataFrame can be create from any structured dataset like JSON, relational table, parquet or an existing RDD with defined schema. Following program creates a DataFrame and queries using sql.
Here is the json we will use to play with, copy these following lines into a file and save it in <SPARK_HOME>/bin directory as sample.json.
{"id": "0001", "type": "donut", "name": "Cake", "ppu": 0.55, "batters": {"batter": {"id": "1001", "type": "Regular"} } } {"id": "0002", "type": "donut2", "name": "Cake2", "ppu": 0.52, "batters": {"batter": {"id": "1002", "type": "Regular"} } } {"id": "0003", "type": "donut3", "name": "Cake3", "ppu": 0.53, "batters": {"batter": {"id": "1003", "type": "Regular"} } }
This is our Spark Java program which creates a DataFrame from a JSON file:
package com.mishudi.learn.spark.dataframe; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class CreateDataFrame { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("CreateDataFrame"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); // SQLContext for creating dataframes SQLContext sc = new SQLContext(ctx.sc()); // From a json file createDataFrameFromJson(sc); // From a RDD after createDataFrameFromRDD(ctx, sc); } private static void createDataFrameFromJson(SQLContext sc) { // Create a data frame from a json file, put the sample.json in // <SPARK_CONF>/bin DataFrame df1 = sc.read().json("sample.json"); // show() is an action, Show details of the dataframe with first 20 rows df1.show(); // Register the DataFrame as a table. Use table name to query using sql df1.registerTempTable("SAMPLE"); df1 = sc.sql("SELECT type FROM SAMPLE"); df1.show(); } private static void createDataFrameFromRDD(JavaSparkContext ctx, SQLContext sc) { // personDetails contains fname, lname, age and address for persons JavaRDD<String> personDetails = ctx .parallelize(Arrays.asList("George, Bally, 36, The University of Melbourne Victoria 3010", "George, Maxwell, 29, 123 W Beach Avenue Perth 1010")); // Split the lines at comma. JavaRDD<String[]> splitRecordsPersonDetials = personDetails.map(r -> r.split(",")); // Now convert JavaRDD<String> to JavaRDD<Row> JavaRDD<Row> personDetailsRows = splitRecordsPersonDetials.map(r -> RowFactory.create(String.valueOf(r[0]), String.valueOf(r[1]), Integer.valueOf(r[2].trim()), String.valueOf(r[3]))); StructType schema = DataTypes.createStructType( new StructField[] { new StructField("fname", DataTypes.StringType, false, Metadata.empty()), new StructField("lname", DataTypes.StringType, false, Metadata.empty()), new StructField("age", DataTypes.IntegerType, false, Metadata.empty()), new StructField("address", DataTypes.StringType, false, Metadata.empty()) }); DataFrame df = sc.createDataFrame(personDetailsRows, schema); df.show(); // DataFrame as temp table df.registerTempTable("PERSON"); // Query DataFrame using SQL Query df = sc.sql("SELECT fname, lname FROM PERSON"); df.show(); } }
Create DataFrame from JSON
Lets focus on createDataFrameFromJson() method. We first read the JSON using the SparkContext on line 38. Since, the file is placed in the <SPARK_HOME>/bin, name of the file is enough. When you call show() action on a DataFrame, Spark will show first 20 records. Following is the output for our show() call on line 40.
+----------------+----+-----+----+------+ | batters| id| name| ppu| type| +----------------+----+-----+----+------+ |[[1001,Regular]]|0001| Cake|0.55| donut| |[[1002,Regular]]|0002|Cake2|0.52|donut2| |[[1003,Regular]]|0003|Cake3|0.53|donut3| +----------------+----+-----+----+------+
On line 43 we registered the DataFrame as a table, its data is now available for us to query using raw sql. registerTempTable() method call creates a in-memory table. Nothing is physically stored. The SQL query against the temp table named SAMPLE on line 45 outputs following:
+------+ | type| +------+ | donut| |donut2| |donut3| +------+
Notice that we kept reassigning df1 variable, instead creating a new DataFrame variable. This is cool as DataFrame itself is immutable and if an operation returns a DataFrame like the one on line 45 (sc.sql(“”)) it will be a new DataFrame.
Using similar approach and by calling right operations, you can create a DataFrame from a parquet or a Avro or a jdbc etc as they all have schema information handy.
Create DataFrame from RDD
In createDataFrameFromRDD() method we create personDetails RDD which contains details such as fname, lname, age, address of a person. For simplicity sake, I have used a List of Strings in which each String is comma delimited person detail. You can load a csv file using textFile(String path) operation. We first populate each person detail into a org.apache.spark.sql.Row object on line 59 and then created schema on line 62. When show() on line 62 is called following output is produced:
+------+--------+---+--------------------+ | fname| lname|age| address| +------+--------+---+--------------------+ |George| Bally| 36| The University o...| |George| Maxwell| 29| 123 W Beach Aven...| +------+--------+---+--------------------+
Similarly, after registering this DataFrame as temp table as we did before, following output is produced on line 76:
+------+--------+ | fname| lname| +------+--------+ |George| Bally| |George| Maxwell| +------+--------+
Notice that the only difference in JSON and RDD is the way we created the DataFrame, after that API remains same for all other operations.