Apache Spark and Livy in Action
By: Date: March 18, 2018 Categories: Apache Spark Tags: , , , , ,

Quick introduction to Apache Livy

Apache Livy is a service that enables access to spark cluster over REST interface. It enables easy submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as Spark Context management, all via a simple REST interface or an RPC client library. There is brief documentation on architecture and other cool aspects of Apache Livy here and here. I will cut the intro short and move on to downloading and configuring livy locally.

Setting up Apache Livy

First step is to install Apache spark, you can use this for instructions on how to setup spark locally. Now, download livy from here. I am running 0.4.0 version for this post. Unzip the livy binary livy-0.4.0-incubating-bin to a folder. Now, we need following steps to integrate and configure livy with spark.

  1. Create a directory called logs in livy home folder
  2. Rename log4j.properties.template to log4j.properties and change log4j.rootCategory to DEBUG from INFO
  3. Rename livy.conf.template to livy.conf and add following line add the end, this is needed for livy to copy our spark job jars for each session we http establish with the server
    #on Mac the value is as follows
    livy.file.local-dir-whitelist = /Users/$your_user/.livy-sessions/

This configurations are enough and now start livy server by calling

$LIVY_HOME/bin/livy-server.sh

You should see following printed on your console:

18/03/17 19:33:23 INFO LineBufferedStream: stdout: Welcome to
18/03/17 19:33:23 INFO LineBufferedStream: stdout:       ____              __
18/03/17 19:33:23 INFO LineBufferedStream: stdout:      / __/__  ___ _____/ /__
18/03/17 19:33:23 INFO LineBufferedStream: stdout:     _\ \/ _ \/ _ `/ __/  '_/
18/03/17 19:33:23 INFO LineBufferedStream: stdout:    /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
18/03/17 19:33:23 INFO LineBufferedStream: stdout:       /_/
18/03/17 19:33:23 INFO LineBufferedStream: stdout:                         
18/03/17 19:33:23 INFO LineBufferedStream: stdout: Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_162
18/03/17 19:33:23 INFO LineBufferedStream: stdout: Branch master
18/03/17 19:33:23 INFO LineBufferedStream: stdout: Compiled by user sameera on 2018-02-22T19:24:29Z
18/03/17 19:33:23 INFO LineBufferedStream: stdout: Revision a0d7949896e70f427e7f3942ff340c9484ff0aab
18/03/17 19:33:23 INFO LineBufferedStream: stdout: Url git@github.com:sameeragarwal/spark.git
18/03/17 19:33:23 INFO LineBufferedStream: stdout: Type --help for more information.
18/03/17 19:33:23 WARN LivySparkUtils$: Current Spark (2,3) is not verified in Livy, please use it carefully
18/03/17 19:33:23 DEBUG MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful kerberos logins and latency (milliseconds)], valueName=Time)
18/03/17 19:33:23 DEBUG MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Rate of failed kerberos logins and latency (milliseconds)], valueName=Time)
18/03/17 19:33:23 DEBUG MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
18/03/17 19:33:23 DEBUG MetricsSystemImpl: UgiMetrics, User and group related metrics
18/03/17 19:33:23 DEBUG Shell: Failed to detect a valid hadoop home directory
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:326)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:351)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
	at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611)
	at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273)
	at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)
	at org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:338)
	at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:332)
	at org.apache.livy.server.LivyServer.start(LivyServer.scala:93)
	at org.apache.livy.server.LivyServer$.main(LivyServer.scala:339)

Ignore the Hadoop related exception, they are not relevant at this point as we are running spark in stand alone mode. Finally, try to access the livy UI at http://localhost:8998/ui and following web view should be rendered:

Great! we have configured Livy with spark!!

Configuring Spark jobs with Livy interface

Download this ready to go maven project . The spark job that gets submitted is explained in this post already. It is a simple spark job that creates a dataset from json and then transforms it using structtype and java bean. Read the linked post if you want to get more details on the spark job itself. The addition in this maven project are

  • Livy java client that submits the job to livy server
import java.io.File;
import java.net.URI;

import org.apache.livy.LivyClientBuilder;

public class LivyTestClient {

  public LivyTestClient() {
    invokeLivy();
  }

  public static void main(String... args) {
    new LivyTestClient();
  }

  private void invokeLivy() {

    String jarLocation = "/PATH_TO/spark-test/0.0.1-SNAPSHOT/spark-test-0.0.1-SNAPSHOT.jar";
    org.apache.livy.LivyClient client = null;
    try {

      client = new LivyClientBuilder(true).setURI(new URI("http://localhost:8998")).build();
      System.out.println("Uploading to the Spark context..."+ jarLocation);
      client.uploadJar(new File(jarLocation)).get();

      client.submit(new SparkDatasetTest()).get();

    } catch (Exception e) {
      e.printStackTrace();

    } finally {
      if (client != null) {
        client.stop(true);
      }

    }
  }
}

 

  • Livy dependencies defined in POM
<!-- https://mvnrepository.com/artifact/org.apache.livy/livy-client-http -->
    <dependency>
      <groupId>org.apache.livy</groupId>
      <artifactId>livy-client-http</artifactId>
      <version>0.4.0-incubating</version>
    </dependency>
  • Finally, the spark code is now implementing the livy interface.
public class SparkDatasetTest implements Job<String>{

//New addition this is the overridden methos after implementing Job interface of Livy

  @Override
  public String call(JobContext arg0) throws Exception {    
    SparkSession spark = arg0.sparkSession();

    JavaSparkContext ctx = new JavaSparkContext(spark.sparkContext());

    // From a json file
    createDatasetFromJson(spark.sqlContext());
    return "success";
  }

}

Once you have validated all the components, package the jar by running pom.xml. Now update the jarLocation string variable in the LivyTestClient.java with this path. Run the main method of the LivyTestClient. You should see your job submitted to the running livy-server and look for this line:

18/03/17 20:03:30 INFO LineBufferedStream: stdout: 2018-03-17 20:03:30 INFO  SparkInterpreter:39 - Created Spark session.

After this line you should see all the spark related outputs which are results of our show() and printSchema() calls on datasets. Note: with this current setup, we submitted the spark job where in the master/worker all run in the livy-servers jvm. This setup uses the configured spark binaries but doesn’t really submit the job to the possible spark cluster that you may have configured.

If you rather like the livy to submit the job to your spark standalone cluster then in the livy.conf folder add the property livy.spark.master and give it the value of the running spark master, that’s it. Once this is done you should be seeing your jobs getting submitted to the cluster and all the action calls to on datasets will now be written in worker logs.

Leave a Reply

Your email address will not be published. Required fields are marked *