Apache Spark introduced Dataset API that unified the programming experience, improving upon the performance/experience and reducing the learning curve for spark developers. This is a great link to get familiar with Dataset. If the link doesn’t work at when you are reading this post, google is your friend. I want to save time and get straight into programming Datasets with Java api.
Download this ready to run maven project. Import/Set it up in the IDE of your choice, it needs Java 8.
Before you run/compile/package this maven project, update the PATH_TO/test.json on line 35, with fully qualified path to test.json, it is present in resources folder. You can add it to a folder of your choice, but if it is outside the $SPARK_HOME then you must provide the full path. Refer to one of my previous posts if you’d rather like to put inside the $SPARK_HOME/bin folder and refer by name.
Create a Dataset with custom schema using StructType
Unlike Dataframe where a lamda transformation would return a RDD, Datasets can return a new Dataset, thus giving us the freedom to write custom transformations just as if you’d on a RDD.
Dataset ds1 = sc.read().json("/PATH_TO/test.json"); ds1.printSchema(); /* output root |-- batters: struct (nullable = true) | |-- batter: struct (nullable = true) | | |-- id: string (nullable = true) | | |-- type: string (nullable = true) |-- id: string (nullable = true) |-- name: string (nullable = true) |-- ppu: double (nullable = true) |-- type: string (nullable = true) */ ds1.show(); /* output +-----------------+----+-----+----+------+ | batters | id | name| ppu| type| +-----------------+----+-----+----+------+ |[[1001, Regular]]|0001| Cake|0.55| donut| |[[1002, Regular]]|0002|Cake2|0.52|donut2| |[[1003, Regular]]|0003|Cake3|0.53|donut3| +-----------------+----+-----+----+------+ */
Above lines are the simplest way you can initialize a dataset in spark. Unless you call the show(), the dataset will not be created as show() is an action.
//Custom Struct Type for the encoder StructType structType = new StructType(); structType = structType.add("id1", DataTypes.LongType, false); structType = structType.add("id2", DataTypes.LongType, false); ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); Encoders.kryo(Row.class); Dataset ds2 = ds1.map(new MapFunction<Row, Row>() { @Override public Row call(Row v1) { System.out.println(v1.schema().toString()); System.out.println(v1.toString()); return RowFactory.create(1l, 2l); } }, encoder); ds2.printSchema(); /* output root |-- id1: long (nullable = false) |-- id2: long (nullable = false) */ ds2.show(false); /* output +---+---+ |id1|id2| +---+---+ |1 |2 | |1 |2 | |1 |2 | +---+---+ */
In the above example we create a structType, and using that we create a ExpressionEncoder of type Row, now all the rows that are created using the RowFactory in the MapFunction will have the supplied schema. Also, notice that return type of MapFunction is a Dataset! Now, any errors/problems with Ecoding or in the transformation would be detected during compile time thus reducing the time and effort for debugging. If this was done in spark using sql these errors wouldn’t be detected until run time.
Create a Dataset with custom schema using Java Bean
Next we create a Dataset using a Java bean. Following is the bean class in action:
package com.mishudi.learn.spark; import java.io.Serializable; public class TestBean implements Serializable{ private long field1; private long field2; public long getField1() { return field1; } public void setField1(long field1) { this.field1 = field1; } public long getField2() { return field2; } public void setField2(long field2) { this.field2 = field2; } }
Dataset ds2 that was create in previous step is now transformed into a new typed Dataset<TestBean> in following way. See the output of the action calls on this new dataset:
//Create a typed dataset from an existing dataset using bean encoder Dataset<TestBean> ds3 = ds2.map(new MapFunction<Row, TestBean>() { @Override public TestBean call(Row v1) throws Exception { TestBean t = new TestBean(); t.setField1((Long) v1.get(0)); t.setField2((Long) v1.get(1)); return t; } }, Encoders.bean(TestBean.class)); ds3.printSchema(); /** output root |-- field1: long (nullable = true) |-- field2: long (nullable = true) **/ ds3.show(false); /** output +------+------+ |field1|field2| +------+------+ |1 |2 | |1 |2 | |1 |2 | +------+------+ **/
Dataset ds3 can be accessed using spark sql by field names, example:
ds3.createTempView("temp_view"); sc.sql("select field1, field2 from temp_view");
You can also call transformations on ds3 and then inside the function you can access the dataset using the reference to the typed class that is TestBean.java. In following way:
ds3.map(new MapFunction<TestBean, String>() { @Override public String call(TestBean arg0) throws Exception { // TODO Auto-generated method stub return null; } }, Encoders.STRING());
To run the job after packaging the downloaded maven project, use the good old spark-submit:
spark-submit --class com.mishudi.learn.spark.SparkDatasetTest /PATH_TO_JAR