Technically Speaking: Series Debut!

A highly technical blog series intended for highly technical people

Note from Zefr: We are nothing without the brilliant engineering and data science minds behind our product, so we’re launching a new, monthly series where they can share the latest trends in tech. Last month, we teased this idea with an article about how we use machine learning plus human review to fuel our content targeting capabilities on YouTube at massive scale.

Think of that article as Technically-Speaking-with-training-wheels. This article kicks off those training wheels and, along with them, a series that takes tech to a new level. This is not for the average reader, be forewarned. But, if you are ready to geek out on some seriously cool concepts and code—read on, Technically Speaking is for you.

My one-page mental model of Spark SQL

In my opinion, Spark’s success centers around two major concepts:

  1. the decoupling of program specification and execution   AND
  2. familiar (and terse) syntax for applying data transformations

The focus of this article is on the first of these concepts which relates to the concept of strictness (and non-strictness).

In this article, my goal is to give about a page of code that boils Spark’s programming model down to its essence (in my mind) in order to illustrate some pitfalls related to non-strictness. I’ll write in Scala since Spark is written mainly in Scala. To start, let’s create a model for non-strictness:

src/laziness/package.scala

Below are some models for laziness used in Spark. For those unfamiliar with the syntax of the type of deferred, it is a function that takes zero arguments. Referring to the deferred does not execute the function. It is executed only when empty parentheses are supplied (e.g., deferred()). Here, there are two implementations of laziness: Cached, which memoizes and AlwaysRecomputed, which as the name suggests, always recomputes the value returned by deferred.

package laziness

sealed trait Lazy[A] extends Product {
 def value: A
}

case class AlwaysRecomputed[A](deferred: () => A) extends Lazy[A] {
 def value: A = deferred()
}

case class Cached[A](deferred: () => A) extends Lazy[A] {
 lazy val value: A = deferred()
}

src/org/apache/spark/sql/Encoder.scala

Encoders are an important concept in Spark’s SQL API, as they are responsible for the serialization / deserialization of values in a Dataset. Notice the comment in the annotation. A lot of code in Spark relies on Encoders as we’ll see later.

package org.apache.spark.sql

@scala.annotation.implicitNotFound(
 “You really can’t do anything without an implicit Encoder[${A}]!\n” +
 “import spark.implicits._ to do anything.”
)
class Encoder[A](implicit val ct: scala.reflect.ClassTag[A])

src/org/apache/spark/sql/SparkSession.scala

To create a dataset, a SparkSession is what we want. It has methods to create Datasets, DataFrames, etc.

package org.apache.spark.sql

import laziness._

class SparkSession {
 object implicits {
   implicit def encoder[A: scala.reflect.ClassTag]: Encoder[A] = new Encoder[A]
 }

 // Not real read functions in Spark, but analogous.  It’s useful to
 // think of these 3 functions as being the same function resulting
 // in different outcomes, dependending on the value of `data`.

 /** Successful read */
 def read[A: Encoder](data: => Seq[A]): Dataset[A] =
   summon(data, withMagicalIncantations = true)

 /** Fail downstream. */
 def readThrowLazily[A: Encoder](data: => Seq[A]): Dataset[A] =
   summon(data, withMagicalIncantations = false)

 /** Fail immediately. */
 def readThrowEagerly[A: Encoder](data: => Seq[A]): Dataset[A] =
   throw new IllegalArgumentException(
     “Probably a org.apache.spark.sql.AnalysisException”
   )

 private[this] def summon[A: Encoder](
   data: => Seq[A],
   withMagicalIncantations: Boolean
 ): Dataset[A] = {
   if (withMagicalIncantations)
     new Dataset(new AlwaysRecomputed(() => data), this)
   else
     new Dataset(
       new AlwaysRecomputed[Seq[A]](() =>
         throw new IllegalArgumentException(
           “Esoteric message: failure happened earlier than you think.”
         )
       ),
       this
     )
 }
}

Perhaps the most important thing to notice in my mental model of SparkSession is the implementation of summon. This is my own understanding of Spark based on looking at certain failure scenarios. It’s important to understand that the data variable passed to summon (and the read functions) is what’s called a call-by-name parameter. This is another way to encode non-strictness. Unlike the mechanism of zero-argument functions used above in the deferred parameters in the Lazy class constructors, call-by-name parameters evaluate every time they are referenced.

This model is NOT exactly accurate and there are cases where Spark fails fast when reading data. To accommodate these scenarios, there are three variants of read in my model that should really be thought of as one function whose behavior differs based on the the characteristics of the data variable.

One such instance is when Spark determines that the specified data path is non-existent and failure occurs eagerly. For instance:

Non-existent path exceptions are thrown eagerly

spark.read.textFile(“/tmp/non_existent_path”)

results in an exception being thrown:

org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/non_existent_path;
 at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:715)
 at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
 …

If we create random garbage, sometimes Spark Dataset creation succeeds, sometimes it fails.

Reads involving corrupted JSON data succeed (sort of)

# Bash
mkdir -p /tmp/existent_path
cat /dev/random | head -n 100 > /tmp/existent_path/data.json
wc -l /tmp/existent_path/data.json    # Returns 100

The attempt to create the DataFrame succeeds even though the string representation of the DataFrame says “[_corrupt_record: string]”. Counting is even successful; however, the number of rows doesn’t match the number of rows in the corrupted input file.

// Results in   df: org.apache.spark.sql.DataFrame = [_corrupt_record: string]
val df =
 spark.read
   .option(“timestampFormat”, “yyyy/MM/dd HH:mm:ss ZZ”)
   .json(“/tmp/existent_path”)

df.count()                            // SUCCEEDS: Returns 198
df.map(x => x.toString).count()       // FAILS

Where things get weird is the call to df.map(x => x.toString).count(). This results in an exception since Spark 2.3.0:

Caused by: org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($”_corrupt_record”.isNotNull).count()
and spark.read.schema(schema).json(file).select(“_corrupt_record”).show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($”_corrupt_record”.isNotNull).count().;
 at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.buildReader(JsonFileFormat.scala:118)
 at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
 …

Reads involving corrupted CSV data seem to fail eagerly

If we try to read similarly corrupted data, the entire job can fail due to things like ArrayIndexOutOfBoundsExceptions:

val df =
 spark.read
   .option(“timestampFormat”, “yyyy/MM/dd HH:mm:ss ZZ”)
   .csv(“/tmp/existent_path”)

Creating data using SparkSessions has some oddities and seems to be a mixture of failing fast and silently propagating errors.

Since there are indeed read methods that fail fast, it’s probably worth wrapping the creation of DataFrames / Datasets in a scala.util.Try (or even better, an IO Monad) to control the flow of failures.

src/org/apache/spark/sql/Dataset.scala

Datasets are among the most fundamentally important data structures in Spark. This I think is pretty true to how Spark works. map is non-strict, cache mutates the Dataset, and the other operations (actions) that aggregate data or pull data to the driver access the data.

package org.apache.spark.sql

import laziness._

class Dataset[A] private[sql](
   // `data` is mutable due to caching; underlying data is “unchanged”.
   private var data: Lazy[Seq[A]],
   val sparkSession: SparkSession)(implicit
   encA: Encoder[A]
) {
 override def toString(): String = s“Dataset(${data.productPrefix})”

 def take(n: Int): Seq[A] = data.value.take(n)
 def count(): Long = data.value.size
 def collect(): Array[A] = data.value.toArray(encA.ct)

 def map[B: Encoder](f: A => B): Dataset[B] =
   new Dataset(new AlwaysRecomputed(() => data.value.map(f)), sparkSession)

 def cache(): this.type = {
   data match {
     case AlwaysRecomputed(deferred) =>
       data = Cached(deferred)  // <– Only place `data` is MUTATED
       this
     case Cached(deferred) =>
       println(“INFO: Asked to cache already cached data.”)
       this
   }
 }
}

Playing Around with the Model

import org.apache.spark.sql.{Dataset, SparkSession}
val myspark = new SparkSession
val ds = myspark.read(Seq(1, 2, 3))

Here, Encoders come into play. We need an Encoder of the type of data we want, so we need to import the implicits from the spark session used to create the Dataset. We see this at compile time in the compilation error warning of a missing implicit Encoder.

<console>:13: error: You really can’t do anything without an implicit Encoder[Int]!
import spark.implicits._ to do anything.
      val ds = spark.read(Seq(1, 2, 3))
                         ^

Let’s generate some (infinite) “big data” during the read. Big data doesn’t get much bigger than that. Notice that this returns quickly like in Spark. The call to map is also quick (since it is non-strict). The exceptions that were generated in the map call only show up when an “action” is performed. So the universe blows up in the call to take.

import org.apache.spark.sql.{Dataset, SparkSession}

val myspark = new SparkSession
import myspark.implicits._

val ds =
 myspark
   .read(Stream.iterate(0L)(_ + 1))        // BIG DATA! Returns fast.
   .map(x => throw new Exception(“oops”))  // Doesn’t fail.
   .take(1)                                // <– FAILS HERE!

This sort of laziness, while mostly beneficial, can have some consequences. One recent situtation I encountered was in some code that loads Avro data and turns it into case classes generated by Avrohugger. The issues was that the Databricksspark-avro loader succeeds at loading well formed Avro that has a different schema from the target type. So the following code succeeds even though it will fail downstream.

import com.databricks.spark.avro.AvroDataFrameReader
import org.apache.spark.sql.{DataFrame, SparkSession}

def dataframe[A: Encoder](avroSchema: String, dataPath: String)
                        (implicit spark: SparkSession): DataFrame = {
 spark.read
   .option(“avroSchema”, avroSchema)
   .avro(dataPath)
}

The call to dataframe succeeds but counting the resulting DataFrame fails downstream due to Avro schema incompatibility. Wrapping the call to dataframe in a Try doesn’t help because the creation is lazy.

One of the dumb things I did when I noticed this failure was to create retry logic, which in hindsight was of course unsuccessful (unless a Spark action was forced on the DataFrame). This is similar in spirit to the types of problems encountered in the scenarios enumerated above.

This is similar to the following, using my mental model:

import org.apache.spark.sql.{Dataset, SparkSession}
import scala.util.{Failure, Success, Try}

val myspark = new SparkSession
import myspark.implicits._

val datasetRecoveryAttempt =
 Try {
   val d = myspark.readThrowLazily {
     println(“INFO: Collecting data from disk”)
     “the quick brown fox jumps over the lazy dog”.split(” “)
   }
   // d.take(1)  // <– EAGERLY FORCE ERROR BY PERFORMING AN ACTION
   d
 } recoverWith { case err =>
   Try {
     myspark.read {
       println(“INFO: Recovering.  Collecting data from disk”)
       “the quick brown fox jumps over the lazy dog”.split(” “)
     }
   }
 }

datasetRecoveryAttempt.count()

I think the names illustrate the point quite nicely: “try to handle now errors that occur lazily, in the future.” This clearly doesn’t work. If we really wanted perform recovery, we’d have to do something to force the data like uncommenting the d.take(1) above. That doesn’t seem ideal, but since the semantics for reading data in Spark don’t seem rigorously defined or are at least inconsistent, we don’t really have a better option.

Conclusion

There’s a fantastic article, “The Curse of the Excluded Middle: ‘Mostly functional’ programming does not work” by Erik Meijer that talks about scenarios like these and I come back to it every time I encounter problems with laziness and especially when mixing laziness with effects.

Hopefully this exercise of creating a very paired down version of the important classes in Spark helps people wrap their head around the operational semantics of Spark and drives home the point that programming with laziness and errors (or effects in general) can be difficult and error prone. In Spark, some things are lazy, some are not. Knowing a little more than “transformations are non-strict, actions are strict” can be quite helpful, at least in my opinion.

 

Published by

wpengine

This is the "wpengine" admin user that our staff uses to gain access to your admin area to provide support and troubleshooting. It can only be accessed by a button in our secure log that auto generates a password and dumps that password after the staff member has logged in. We have taken extreme measures to ensure that our own user is not going to be misused to harm any of our clients sites.