Apache Spark is great for processing JSON files, you can right away create DataFrames and start issuing SQL queries agains them by registering them as temporary tables. This works very good when the JSON strings are each in line, where typically each line represented a JSON object. In such a happy path JSON can be read using context.read().json(“path to file”). On such a file, Spark will happily run any transformations/actions in standard fashion. However, what if the file is not a standard JSON file but a multiline JSON file? There can be two variations I can think of here:
- JSON file is one big multiline file which represents one big JSON object.
- JSON file is one big multiline file which has more than one JSON object but each object spawns multiple lines
One big JSON object spawning multiple lines in a file
The problem with multiline JSON files is that they cannot be read into as a DataFrame as the DataFrameReader cannot infer the schema. Following is the JSON file we will try to process, it represents one big JSON object that spawns across multiple lines in a file. It can be a array node or object node of JSON.
{ "id": "0001", "type": "donut", "name": "Cake", "ppu": 0.55, "batters": { "batter": { "id": "1001", "type": "Regular" } } }
Save the above as MultiLineSample.json in <SPARK_HOME>/bin folder. This file is also present in the resources folder in the project download link provided below. Now, lets look at the Spark program that will process this multiline JSON file:
/** * */ package com.mishudi.learn.spark.dataframe; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import scala.Tuple2; public class MultiLineJsonProcessor { public static void main(String[] args) throws ClassNotFoundException { SparkConf sparkConf = new SparkConf().setAppName("MultiLineJsonProcessor"); sparkConf.registerKryoClasses(new Class<?>[]{ Class.forName("scala.Tuple2") }); JavaSparkContext ctx = new JavaSparkContext(sparkConf); SQLContext sc = new SQLContext(ctx.sc()); // process a multi-line json file multiLineJsonProcessing(ctx, sc); } private static void multiLineJsonProcessing(JavaSparkContext ctx, SQLContext sc) { // TODO Auto-generated method stub JavaPairRDD<String, String> pairRdd = ctx.wholeTextFiles("MultiLineSample.json"); // Process multiline JSON files using mapPartitionsToPair JavaPairRDD<String, JsonNode> lines = pairRdd .mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, JsonNode>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<Tuple2<String, JsonNode>> call(Iterator<Tuple2<String, String>> itr) throws Exception { ObjectMapper mapper = new ObjectMapper(); Tuple2<String, String> tuple = itr.next(); String fileName = tuple._1; System.out.println("name of the file "+fileName+" Content "+tuple._2); JsonNode node = mapper.readTree(tuple._2); // Add to a list to return List<Tuple2<String, JsonNode>> list = new ArrayList<Tuple2<String, JsonNode>>(); list.add(new Tuple2<String, JsonNode>(fileName, node)); return list; } }); //Extract out the content using map JavaRDD<String> content = lines.map(new Function<Tuple2<String,JsonNode>, String>() { /** * */ private static final long serialVersionUID = -1164237831337131583L; @Override public String call(Tuple2<String, JsonNode> v1) throws Exception { return v1._2.toString(); } }); DataFrame df = sc.read().json(content); df.registerTempTable("SAMPLE"); df = sc.sql("SELECT * FROM SAMPLE LIMIT 5"); df.printSchema(); df.show(); } }
On line 42, the wholeTextFiles(“file path”) transformation will load entire file content as a string. So, if in hdfs://a-hdfs-path directory you had two files namely, part-00000 and part-00001. Calling sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”) will result in Spark returning a JavaPairRDD which has key as file name and value as the content of the file. Since, this operation loads entire file, there may be performance issues when working with extremely big JSON file. Also, the pairRdd will only have one partition, so the executor processing this file will consume considerable resources based on the file size. On line 46, we used mapPartitionsToPair() transformation to create a JSON object out of the JSON String we loaded using wholeTextFiles().
We could have used map() instead mapPartitionsToPair() and it would perform the same way, at least in this case. But, mapPartitions..() operations are great when you have to instantiate a resource which is expensive to create like a database connection, resource look up or in our case the JSON ObjectMapper. So, we create only one ObjectMapper per partition and keep reusing it as mappartitions..() is only called once per partition and all the data that has to be processed for that partition is provided. But, map() works differently, as it is called every time for each record in a given partition, so if you created a database connection per record, just imagine how many connection you’d end up creating? It is proportional to the number of records for each partition.
On line 57, we directly call next() on the iterator, note we are not iterating, this is because we are reading only one file called MultiLineSample.json, and so our iterator will only have one tuple, i.e Tuple2<String, String> (fileName, content). tuple._1 on line 58 gives you filename and tuple._2 gives the entire file content, which we feed to the mapper and create a JSON node.
On line 71, we convert this JSON node to a string and read that as a DataFrame and play with it.
Output of line 89, printSchema() call is
root |-- batters: struct (nullable = true) | |-- batter: struct (nullable = true) | | |-- id: string (nullable = true) | | |-- type: string (nullable = true) |-- id: string (nullable = true) |-- name: string (nullable = true) |-- ppu: double (nullable = true) |-- type: string (nullable = true)
Output of line 90, show() is
+----------------+----+----+----+-----+ | batters| id|name| ppu| type| +----------------+----+----+----+-----+ |[[1001,Regular]]|0001|Cake|0.55|donut| +----------------+----+----+----+-----+
Multiple JSON objects spawning multiple lines in a file
{"id": "0001", "type": "donut", "name": "Cake", "ppu": 0.55, "batters": { "batter": { "id": "1001", "type": "Regular" } } } {"id": "0002", "type": "donut2", "name": "Cake2", "ppu": 0.55, "batters": { "batter": { "id": "1002", "type": "Regular" } } } {"id": "0003", "type": "donut3", "name": "Cake3", "ppu": 0.55, "batters": { "batter": { "id": "1003", "type": "Regular" } } } {"id": "0004", "type": "donut4", "name": "Cake4", "ppu": 0.55, "batters": { "batter": { "id": "1004", "type": "Regular" } } }
Copy the above into a file and save as MultiLineSample2.json in <SPARK_HOME>/bin its available in the resources folder in the download link below. This is a perfect example of a JSON file which has multiple JSON objects, and each one spawning to multiple lines. The wholeTextFiles() transformation can be used here, if we can use process this file in a distributed fashion then we should and though JSON object info is present in multiple lines, we at least know the beginning of each JSON string. This information is enough for us to configure the spark job so that it considers the beginning characters of JSON object as start of a new line rather than the default ‘\n’ new line character.
Lets look at the following program that will process the above JSON:
/** * */ package com.mishudi.learn.spark.dataframe; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import scala.Tuple2; public class MultiLineJsonProcessor2 { private static final String DELIMETER = "{\"id\":"; public static void main(String[] args) throws ClassNotFoundException { SparkConf sparkConf = new SparkConf().setAppName("MultiLineJsonProcessor"); sparkConf.registerKryoClasses(new Class<?>[] { Class.forName("scala.Tuple2") }); JavaSparkContext ctx = new JavaSparkContext(sparkConf); SQLContext sc = new SQLContext(ctx.sc()); // process a multi-line json file multiLineJsonProcessing(ctx, sc); } private static void multiLineJsonProcessing(JavaSparkContext ctx, SQLContext sc) { // TODO Auto-generated method stub Configuration hadoopConf = new Configuration(); // we explicitly are setting that each new line (json object) begins // with '{\"id\":', // This is a string and you could set anything based on your file format hadoopConf.set("textinputformat.record.delimiter", DELIMETER); // this extra configruation is for the below file only and for this jon // context alone JavaRDD<String> linesRDD = ctx.newAPIHadoopFile("MultiLineSample2.json", TextInputFormat.class, LongWritable.class, Text.class, hadoopConf).map(new Function<Tuple2<LongWritable, Text>, String>() { /** * */ private static final long serialVersionUID = -3365791706086925082L; @Override public String call(Tuple2<LongWritable, Text> v1) throws Exception { return v1._2.toString(); } }); // Print the lines JavaRDD<String> jsonLinesRdd = linesRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { /** * */ private static final long serialVersionUID = 249664889132507998L; @Override public Iterable<String> call(Iterator<String> itr) throws Exception { List<String> jsonStrings = new ArrayList<String>(); while (itr.hasNext()) { String jsonStrFromFile = itr.next(); if (!jsonStrFromFile.isEmpty()) { String newJsonStrFromFile = DELIMETER + jsonStrFromFile; jsonStrings.add(newJsonStrFromFile); }else{ } } return jsonStrings; } }); DataFrame df = sc.read().json(jsonLinesRdd); df.registerTempTable("SAMPLE"); df = sc.sql("SELECT * FROM SAMPLE LIMIT 5"); df.printSchema(); df.show(); } }
On line 51 we set the textinputformat.record.delimiter property to ‘{\”id\”:’. This is the config now the record reader will use when reading the blocks of file from HDFS. Then on line 55 we pass this configuration along with file path to be read. LongWritable is the byteoffset and TextInputFormat is the input format for reading. On line 70 we again use mapPartitions..() transformation as we want to avoid creating the ObjectMapper for each record. Online 83 we do a isEmpty check because the first byteoffset value is 0 and associated text with it is empty, as we don’t want to process this. Since, each line is actually a JSON, you can avoid the ObjectMapper and readTree and all that and just create dataframes. I did’t try this as I was reusing lot of code. Also, online 84 we append the delimiter because that is skipped by the record reader considering it is the new line character, but in our case it is also the beginning of the JSON object.
The call on line 100 give following output:
root |-- batters: struct (nullable = true) | |-- batter: struct (nullable = true) | | |-- id: string (nullable = true) | | |-- type: string (nullable = true) |-- id: string (nullable = true) |-- name: string (nullable = true) |-- ppu: double (nullable = true) |-- type: string (nullable = true)
The call on line 101 gives following output:
+----------------+----+-----+----+------+ | batters| id| name| ppu| type| +----------------+----+-----+----+------+ |[[1001,Regular]]|0001| Cake|0.55| donut| |[[1002,Regular]]|0002|Cake2|0.55|donut2| |[[1003,Regular]]|0003|Cake3|0.55|donut3| |[[1004,Regular]]|0004|Cake4|0.55|donut4| +----------------+----+-----+----+------+