Let's connect
Let's connect

Reconciling Spark APIs for Scala

Picture of Michał Pałka, Senior Scala Developer

Michał Pałka

Senior Scala Developer

10 minutes read

java

​​case class Measurement(
  stationId: Long,
  temperature: Int /* in °C */,
  pressure: Int /* in hPa */,
  timestamp: Long
)
Let's connect

java

measurements
    .groupByKey(_.stationId)
    .mapGroups { (stationId, measurementss) =>
      val temperatures = measurementss.map(_.temperature)
      val pressures = measurementss.map(_.pressure)
      (
        stationId,
        temperatures.min,
        temperatures.max,
        pressures.sum.toDouble / pressures.length
      )
    }
    .filter(entry => entry._3 - entry._2 < 20)
    .map(entry => (entry._1, entry._4))

java

case class AggregatedMeasurement(
  stationId: Long,
  minTemperature: Int,
  maxTemperature: Int,
  avgPressure: Double
)

/* … */

  measurements
    .groupByKey(_.stationId)
    .mapGroups { (stationId, measurementss) =>
      val temperatures = measurementss.map(_.temperature)
      val pressures = measurementss.map(_.pressure)
      AggregatedMeasurement(
        stationId = stationId,
        minTemperature = temperatures.min,
        maxTemperature = temperatures.max,
        avgPressure = pressures.sum.toDouble / pressures.length
      )
    }
    .filter(aggregated => aggregated.maxTemperature - aggregated.minTemperature < 20)
    .map(aggregated => (aggregated.stationId, aggregated.avgPressure))

java

measurements
    .groupBy($"stationId")
    .agg(
      min($"temperature").as("minTemperature"),
      max($"temperature").as("maxTemperature"),
      avg($"pressure").as("avgPressure")
    )
    .where($"maxTemperature" - $"minTemperture" < lit(20))
    .select($"stationId", $"avgPressure")

java

  measurements
    .groupBy($.stationId)
    .agg(
      min($.temperature).as("minTemperature"),
      max($.temperature).as("maxTemperature"),
      avg($.pressure).as("avgPressure")
    )
    .where($.maxTemperature - $.minTemperature < lit(20))
    .select($.stationId, $.avgPressure)

java

import scala.language.dynamics
import org.apache.spark.sql.functions.col

object $ extends Dynamic {
  def selectDynamic(name: String) = col(name)
}

java

import org.apache.spark.sql.{ Column => UntypedColumn }
import org.apache.spark.sql.functions.col

class Column[T](val untyped: UntypedColumn) extends AnyVal

trait RowModel extends Selectable {
  def selectDynamic(name: String) = Column(col(name))
}

def $: RowModel { /* ... */ } = new RowModel { /* .. */ }

java

RowModel {
  def stationId: Column[Long]
  def temperature: Column[Int]
  def pressure: Column[Int],
  def timestamp: Column[Long]
}

java

def bar(fun: Context => Int) = ???
def baz(implicit context: Context): Int = ???

bar { implicit context =>
  baz
}

java

def bar(fun: Context ?=> Int) = ???
def baz(using context: Context): Int = ???

bar {
  baz
}

java

def $(using rowModel: RowModel): rowModel.type = rowModel

java

//> using scala "3.2.0"
//> using lib "org.virtuslab::iskra:0.0.2"

import org.virtuslab.iskra.api.*

case class Measurement(
  stationId: Long,
  temperature: Int /* in °C */,
  pressure: Int /* in hPa */,
  timestamp: Long
)

@main def run() =
  given spark: SparkSession = SparkSession.builder()
    .master("local")
    .appName("weather-stations")
    .getOrCreate()

  val measurements = Seq(
    Measurement(1, 10, 1020, 1641399107),
    Measurement(2, -5, 1036, 1647015112),
    Measurement(1, 19, 996, 1649175104),
    Measurement(2, 25, 1015, 1657030348),
    /* more data … */
  ).toTypedDF

  import functions.{avg, min, max, lit}

  measurements
    .groupBy($.stationId)
    .agg(
      min($.temperature).as("minTemperature"),
      max($.temperature).as("maxTemperature"),
      avg($.pressure).as("avgPressure")
    )
    .where($.maxTemperature - $.minTemperature < lit(20))
    .select($.stationId, $.avgPressure)
    .show()
iskra-coding
Let's connect

Curated by

Sebastian Synowiec

Liked the article?

Share it with others!

explore more on

Take the first step to a sustained competitive edge for your business

Let's connect

VirtusLab's work has met the mark several times over, and their latest project is no exception. The team is efficient, hard-working, and trustworthy. Customers can expect a proactive team that drives results.

Stephen Rooke
Stephen RookeDirector of Software Development @ Extreme Reach

VirtusLab's engineers are truly Strapi extensions experts. Their knowledge and expertise in the area of Strapi plugins gave us the opportunity to lift our multi-brand CMS implementation to a different level.

facile logo
Leonardo PoddaEngineering Manager @ Facile.it

VirtusLab has been an incredible partner since the early development of Scala 3, essential to a mature and stable Scala 3 ecosystem.

Martin_Odersky
Martin OderskyHead of Programming Research Group @ EPFL

The VirtusLab team's in-depth knowledge, understanding, and experience of technology have been invaluable to us in developing our product. The team is professional and delivers on time – we greatly appreciated this efficiency when working with them.

Michael_Grant
Michael GrantDirector of Development @ Cyber Sec Company