In Spark, how come we can broadcast a DataFrame but not a RDD? How do we use a broadcasted DataFrame?

I would like to know how come in Spark we are not allowed to broadcast a RDD but we can broadcast a DataFrame?

val df = Seq(("t","t"),("t","f"),("f","t"),("f","f")).toDF("x1", "x2")
val rdd = df.rdd
val b_df = spark.sparkContext.broadcast(df) //you can do this!
val b_rdd = spark.sparkContext.broadcast(rdd) //IllegalArgumentException!

What's the use of a broadcasted DataFrame? I know that we cannot operate on a RDD within another RDD transformation, but attempting to operate on a DataFrame within a RDD transformation is also forbidden.

rdd.map(r => b_df.value.count).collect //SparkException

I am trying to find ways to exploit Spark's capabilities for the situation where I have to operate over a parallelized collection through transformations that involve invoking transformations/actions of other parallelized collections.

2 answers

  • answered 2018-08-09 00:40 user8371915

    That's because DataFrame is not necessarily distributed. If you check carefully you'll see that Dataset provides isLocal method that:

    Returns true if the collect and take methods can be run locally (without any Spark executors).

    Local DataFrames can be even used, although it is not advertised, in a task - Why does this Spark code make NullPointerException?

    Broadcasting Dataset uses similar mechanism - it collects data to create local object and then broadcasts it. So it is not much more than a syntactic sugar for collect followed by broadcast (under the covers it uses more sophisticated approach than collect, to avoid transformation to external format) which can be done with RDD.

  • answered 2018-08-09 12:49 thebluephantom

    Simple answer: It's a paradigm thing. Formally it's a Dataset. Most important is that the sum total of data to be broadcast - underlying partitioning taken into account, needs to fit onto the Worker Node(s).