Datasets in Apache Spark - Java Spark API

In this article we will see what are Datasets in Apache Spark, how to create them and their operations using Spark Java API. We will also look into how to create a Datasets from different sources like RDD, Java Lits, JSON and MySql etc.

What is a Spark's Dataset ?

Datasets were added to Spark from "Spark 1.6", a Dataset is a distributed collection of data that provides the benefits of RDDs and Spark SQL's optimized execution engine.

Datasets API provides compile time type safety which was not available in Dataframes.

Datasets includes both the compile-time type-safety of the RDD and the same efficient off-heap storage mechanism along with the performance benefits of the Catalyst query optimizer of DataFrame API.

What is SparkSession ?

SparkSession is added since spark 2.X as a replacement of SqlContext and HiveContext. Since 2.X SparkSession can be used for both structured data (Dataset and DataFrame) operations and hive related SQL operations.

A SparkSession object can be created as shown below:
SparkSession sparkSession = SparkSession.builder().getOrCreate();
A Hive Supported SparkSession object can be created as shown below:
SparkSession sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();


How to create a Dataset in Spark ?

At the core of Dataset is a new concept called an encoder for conversion between JVM objects and tabular representation, Spark also provides automatic encoders for primitives like String, Integer, Long etc and Scala/Java beans.

Lets assume we have a class PageView as shown below, we will use this class and its encoder to explain below operations on DataSet.
public class PageView {
	private int pageId;
	private int countryId;
	private int count;

	// Constructor
	// Getter Setter
}
The encoder of the above class can be obtained as shown below:
Encoder encoder = Encoders.bean(PageView.class);

How to create a DataSet from a Java List in Spark ?

In Spark Java API, a Java List of objects can be converted to a DataSet using appropriate encoder, as shown in the code below:
	public Dataset<T> dataset(List<T> list, Encoder<T> encoder) {
		Dataset<T> dataset = sparkSession.createDataset(list, encoder);
		return dataset;
	}

How to convert a DataFrame to DataSet in Spark

In Spark Java API, a DataFrame can be converted to a DataSet by calling .as() method on DataFrame, an encoder is passed as argument to the method as shown below:
	public Dataset<T> dataset(Dataset dataFrame, Encoder<T> encoder) {
		Dataset<T> dataset = dataFrame.as(encoder);
		return dataset;
	}

RDD to Dataset in Spark

In Spark Java API, a Dataset can be obtained from an existing JavaRDD as shown below:
	public Dataset<T> dataset(JavaRDD<T> javaRdd, Class<T> clazz, Encoder<T> encoder) {
		Dataset<T> dataset = sparkSession.createDataFrame(javaRdd, clazz).as(encoder);
		return dataset;
	}

Read JSON to Dataset in Spark

In Spark Java API, a Dataset can be obtained from a .json file as shown below:
	public Dataset<T> dataset(String jsonFilePath, Encoder<T> encoder) {
		Dataset<T> dataset = sparkSession.read().json(jsonFilePath).as(encoder);
		return dataset;
	}

Read Apache Parquet to Dataset in Spark

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.

In Spark Java API, a Dataset can be obtained from a .parquet file as shown below:
	public Dataset<T> datasetParquet(String parquetFilePath, Encoder<T> encoder) {
		Dataset<T> dataset = sparkSession.read().load(parquetFilePath).as(encoder);
		return dataset;

	}

How to load a MySql table to Dataset in Spark ?

In Spark Java API, a Dataset can be obtained from MySql table or any other jdbc source as shown below:
	/*
	 * dbUrl - jdbc:mysql://127.0.0.0:3306/techdb 
	 * tableNameWithSchema - techdb.articles 
	 * userName - tech 
	 * password - burps
	 */
	public Dataset<T> dataset(String dbUrl, String tableNameWithSchema, String userName, String password,
			Encoder<T> encoder) {
		Dataset<T> dataset = sparkSession.read().format("jdbc").option("url", dbUrl)
				.option("dbtable", tableNameWithSchema).option("user", userName).option("password", password).load()
				.as(encoder);
		return dataset;

	}

Spark's Datasets operations

Now, lets look into some basic operations that can be performed on Datasets, we will use some dummy data for this purpose as shown below:
		/* Dummy data*/
		List<Geo> dummy = new ArrayList<Geo>();
		
		dummy.add(new Geo(1, "London", 20));
		dummy.add(new Geo(2, "New York", 30));
		dummy.add(new Geo(3, "Delhi", 40));
		dummy.add(new Geo(3, "Delhi", 60));
Lets create a Dataset from this data with the help of SparkSession, we will use same Dataset to perform different operations:
Encoder<Geo> encoder = Encoders.bean(Geo.class);
Dataset<Geo> dataset = sparkSession.createDataset(dummy, encoder);

How to print Dataset content on console

We can use .show() to displays the content of the Dataset to stdout as shown below:
		/* Show complete data of Dataset*/
		dataset.show();

		/* Show specific number of rows of Dataset*/
		dataset.show(2);
Output: Following output will be shown to stdout:
+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     1|  London|  20|
|     2|New York|  30|
|     3|   Delhi|  40|
|     3|   Delhi|  60|
+------+--------+----+

+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     1|  London|  20|
|     2|New York|  30|
+------+--------+----+
only showing top 2 rows

How to print Dataset schema on console

We can use .printSchema() to displays the schema of the Dataset to stdout as shown below:
dataset.printSchema();
Output: Following output will be shown to stdout:
 |-- cityId: integer (nullable = false)
 |-- cityName: string (nullable = true)
 |-- hits: long (nullable = false)

How to filter values in Spark Dataset

We can use .filter() to filter out the values of a Dataset as shown below:
dataset.filter(i -> i.getCityId() == 3).show();
Output: Following output will be shown to stdout:
+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     3|   Delhi|  40|
|     3|   Delhi|  60|
+------+--------+----+

How to aggregate data using Dataset in Spark

We can aggregate data using groupBy() in Spark, here we will aggregate data by grouping cityId while the total of hits for each city will be aggregated.
dataset.groupBy("cityId").sum("hits").show();
Output: Following output will be shown to stdout:
+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     1|  London|  20|
|     2|New York|  30|
|     3|   Delhi|  40|
|     3|   Delhi|  60|
+------+--------+----+

+------+---------+
|cityId|sum(hits)|
+------+---------+
|     1|       20|
|     3|      100|
|     2|       30|
+------+---------+
In this article we have seen, what are Datasets in Spark, their properties, implementation and operations using Spark Java API.