Let's connect
Let's connect

Reconciling Spark APIs for Scala

Picture of Michał Pałka, Senior Scala Developer

Michał Pałka

Senior Scala Developer

10 minutes read

scala

​​case class Measurement(
  stationId: Long,
  temperature: Int /* in °C */,
  pressure: Int /* in hPa */,
  timestamp: Long
)
Let's connect

scala

measurements
    .groupByKey(_.stationId)
    .mapGroups { (stationId, measurementss) =>
      val temperatures = measurementss.map(_.temperature)
      val pressures = measurementss.map(_.pressure)
      (
        stationId,
        temperatures.min,
        temperatures.max,
        pressures.sum.toDouble / pressures.length
      )
    }
    .filter(entry => entry._3 - entry._2 < 20)
    .map(entry => (entry._1, entry._4))

scala

case class AggregatedMeasurement(
  stationId: Long,
  minTemperature: Int,
  maxTemperature: Int,
  avgPressure: Double
)

/* … */

  measurements
    .groupByKey(_.stationId)
    .mapGroups { (stationId, measurementss) =>
      val temperatures = measurementss.map(_.temperature)
      val pressures = measurementss.map(_.pressure)
      AggregatedMeasurement(
        stationId = stationId,
        minTemperature = temperatures.min,
        maxTemperature = temperatures.max,
        avgPressure = pressures.sum.toDouble / pressures.length
      )
    }
    .filter(aggregated => aggregated.maxTemperature - aggregated.minTemperature < 20)
    .map(aggregated => (aggregated.stationId, aggregated.avgPressure))

scala

measurements
    .groupBy($"stationId")
    .agg(
      min($"temperature").as("minTemperature"),
      max($"temperature").as("maxTemperature"),
      avg($"pressure").as("avgPressure")
    )
    .where($"maxTemperature" - $"minTemperture" < lit(20))
    .select($"stationId", $"avgPressure")

scala

  measurements
    .groupBy($.stationId)
    .agg(
      min($.temperature).as("minTemperature"),
      max($.temperature).as("maxTemperature"),
      avg($.pressure).as("avgPressure")
    )
    .where($.maxTemperature - $.minTemperature < lit(20))
    .select($.stationId, $.avgPressure)

scala

import scala.language.dynamics
import org.apache.spark.sql.functions.col

object $ extends Dynamic {
  def selectDynamic(name: String) = col(name)
}

scala

import org.apache.spark.sql.{ Column => UntypedColumn }
import org.apache.spark.sql.functions.col

class Column[T](val untyped: UntypedColumn) extends AnyVal

trait RowModel extends Selectable {
  def selectDynamic(name: String) = Column(col(name))
}

def $: RowModel { /* ... */ } = new RowModel { /* .. */ }

scala

RowModel {
  def stationId: Column[Long]
  def temperature: Column[Int]
  def pressure: Column[Int],
  def timestamp: Column[Long]
}

scala

def bar(fun: Context => Int) = ???
def baz(implicit context: Context): Int = ???

bar { implicit context =>
  baz
}

scala

def bar(fun: Context ?=> Int) = ???
def baz(using context: Context): Int = ???

bar {
  baz
}

scala

def $(using rowModel: RowModel): rowModel.type = rowModel

scala

//> using scala "3.2.0"
//> using lib "org.virtuslab::iskra:0.0.2"

import org.virtuslab.iskra.api.*

case class Measurement(
  stationId: Long,
  temperature: Int /* in °C */,
  pressure: Int /* in hPa */,
  timestamp: Long
)

@main def run() =
  given spark: SparkSession = SparkSession.builder()
    .master("local")
    .appName("weather-stations")
    .getOrCreate()

  val measurements = Seq(
    Measurement(1, 10, 1020, 1641399107),
    Measurement(2, -5, 1036, 1647015112),
    Measurement(1, 19, 996, 1649175104),
    Measurement(2, 25, 1015, 1657030348),
    /* more data … */
  ).toTypedDF

  import functions.{avg, min, max, lit}

  measurements
    .groupBy($.stationId)
    .agg(
      min($.temperature).as("minTemperature"),
      max($.temperature).as("maxTemperature"),
      avg($.pressure).as("avgPressure")
    )
    .where($.maxTemperature - $.minTemperature < lit(20))
    .select($.stationId, $.avgPressure)
    .show()
iskra-coding
Let's connect

Curated by

Sebastian Synowiec

Liked the article?

Share it with others!

explore more on

Take the first step to a sustained competitive edge for your business

Get your free consultation

VirtusLab's work has met the mark several times over, and their latest project is no exception. The team is efficient, hard-working, and trustworthy. Customers can expect a proactive team that drives results.

Stephen Rooke
Stephen RookeDirector of Software Development @ Extreme Reach

VirtusLab's engineers are truly Strapi extensions experts. Their knowledge and expertise in the area of Strapi plugins gave us the opportunity to lift our multi-brand CMS implementation to a different level.

facile logo
Leonardo PoddaEngineering Manager @ Facile.it

VirtusLab has been an incredible partner since the early development of Scala 3, essential to a mature and stable Scala 3 ecosystem.

Martin_Odersky
Martin OderskyHead of Programming Research Group @ EPFL

The VirtusLab team's in-depth knowledge, understanding, and experience of technology have been invaluable to us in developing our product. The team is professional and delivers on time – we greatly appreciated this efficiency when working with them.

Michael_Grant
Michael GrantDirector of Development @ Cyber Sec Company