[FIXED] Scala Spark Encoders.product[X] (wobei X eine Case-Klasse ist) gibt mir immer wieder den Fehler „No TypeTag available for X“.

Ausgabe

Ich arbeite mit Intellij Idea in einem Scala-Arbeitsblatt. Ich möchte einen Encoder für eine Scala-Case-Klasse erstellen. Aus verschiedenen Beiträgen im Internet fand ich den Vorschlag, Encoders.product zu verwenden. Aber bei mir hat es nie funktioniert.

Der folgende Code

import org.apache.spark.sql.*

val spark: SparkSession =
    SparkSession
      .builder()
      .appName("test")
      .master("local")
      .getOrCreate()

import scala3encoders.given

case class classa(i: Int, j: Int, s: String)

val enc = Encoders.product[classa]

Fehler weiter werfen:

-- Error: ----------------------------------------------------------------------
1 |val enc = Encoders.product[classa]
  |                                  ^
  |                                  No TypeTag available for classa
1 error found

Weiß jemand was da los ist?

Der Inhalt der Datei build.sbt ist:

scalaVersion := "3.1.3"
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
libraryDependencies ++= Seq(
  excludes(("org.apache.spark" %% "spark-core" % "3.2.0").cross(CrossVersion.for3Use2_13)),
  excludes(("org.apache.spark" %% "spark-sql" % "3.2.0").cross(CrossVersion.for3Use2_13)),
  excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
  "org.scalameta" %% "munit" % "0.7.26" % Test
)

//netty-all replaces all these excludes
def excludes(m: ModuleID): ModuleID =
  m.exclude("io.netty", "netty-common").
    exclude("io.netty", "netty-handler").
    exclude("io.netty", "netty-transport").
    exclude("io.netty", "netty-buffer").
    exclude("io.netty", "netty-codec").
    exclude("io.netty", "netty-resolver").
    exclude("io.netty", "netty-transport-native-epoll").
    exclude("io.netty", "netty-transport-native-unix-common").
    exclude("javax.xml.bind", "jaxb-api").
    exclude("jakarta.xml.bind", "jaxb-api").
    exclude("javax.activation", "activation").
    exclude("jakarta.annotation", "jakarta.annotation-api").
    exclude("javax.annotation", "javax.annotation-api")

// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true

Lösung

Encoders.product[classa]ist eine Sache von Scala 2. Diese Methode akzeptiert eine implizite TypeTag. Es gibt keine TypeTags in Scala 3. In Scala 3 schlagen die Bibliotheksbetreuer vor, auf folgende Weise zu arbeiten:

https://github.com/vincenzobaz/spark-scala3/blob/main/examples/src/main/scala/sql/StarWars.scala

package sql

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql._


object StarWars extends App:
  val spark = SparkSession.builder().master("local").getOrCreate
  import spark.implicits.localSeqToDatasetHolder
  import scala3encoders.given

  extension [T: Encoder] (seq: Seq[T])
    def toDS: Dataset[T] =
      localSeqToDatasetHolder(seq).toDS

  case class Friends(name: String, friends: String)
  val df: Dataset[Friends] = Seq(
      ("Yoda",             "Obi-Wan Kenobi"),
      ("Anakin Skywalker", "Sheev Palpatine"),
      ("Luke Skywalker",   "Han Solo, Leia Skywalker"),
      ("Leia Skywalker",   "Obi-Wan Kenobi"),
      ("Sheev Palpatine",  "Anakin Skywalker"),
      ("Han Solo",         "Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi, Chewbacca"),
      ("Obi-Wan Kenobi",   "Yoda, Qui-Gon Jinn"),
      ("R2-D2",            "C-3PO"),
      ("C-3PO",            "R2-D2"),
      ("Darth Maul",       "Sheev Palpatine"),
      ("Chewbacca",        "Han Solo"),
      ("Lando Calrissian", "Han Solo"),
      ("Jabba",            "Boba Fett")
    ).toDS.map((n,f) => Friends(n, f))


  val friends = df.as[Friends]
  friends.show()
  case class FriendsMissing(who: String, friends: Option[String])
  val dsMissing: Dataset[FriendsMissing] = Seq( 
      ("Yoda",             Some("Obi-Wan Kenobi")),
      ("Anakin Skywalker", Some("Sheev Palpatine")),
      ("Luke Skywalker",   Option.empty[String]),
      ("Leia Skywalker",   Some("Obi-Wan Kenobi")),
      ("Sheev Palpatine",  Some("Anakin Skywalker")),
      ("Han Solo",         Some("Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi"))
    ).toDS
     .map((a, b) => FriendsMissing(a, b)) 

  dsMissing.show()

  case class Character(
    name: String, 
    height: Int, 
    weight: Option[Int], 
    eyecolor: Option[String], 
    haircolor: Option[String], 
    jedi: String,
    species: String
  )

  val characters: Dataset[Character] = spark.sqlContext
    .read
    .option("header", "true")
    .option("delimiter", ";")
    .option("inferSchema", "true")
    .csv("StarWars.csv")
    .as[Character]

  characters.show()
  val sw_df = characters.join(friends, Seq("name"))
  sw_df.show()

  case class SW(
    name: String,
    height: Int,
    weight: Option[Int],
    eyecolor: Option[String],
    haircolor: Option[String],
    jedi: String,
    species: String,
    friends: String
  )

  val sw_ds = sw_df.as[SW]

Encoders.product[classa]Wenn Sie diesen Teil Ihres Codes also wirklich mit Scala 2 kompilieren müssen

src/main/scala/App.scala

// this is Scala 3
object App {
  def main(args: Array[String]): Unit = {
    println(App1.schema)
    // Seq(StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(s,StringType,true))
  }
}

scala2/src/main/scala/App1.scala

import org.apache.spark.sql._

// this is Scala 2
object App1 {
  val schema = Encoders.product[classa].schema
}

common/src/main/scala/classa.scala

case class classa(i: Int, j: Int, s: String)

build.sbt

lazy val sparkCore = "org.apache.spark" %% "spark-core" % "3.2.0"
lazy val sparkSql = "org.apache.spark" %% "spark-sql" % "3.2.0"
lazy val scala3V = "3.1.3"
lazy val scala2V = "2.13.8"

lazy val root = project
  .in(file("."))
  .settings(
    scalaVersion := scala3V,
    scalacOptions ++= Seq("-language:implicitConversions", "-deprecation"),
    libraryDependencies ++= Seq(
      excludes(sparkCore.cross(CrossVersion.for3Use2_13)),
      excludes(sparkSql.cross(CrossVersion.for3Use2_13)),
      excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
      "org.scalameta" %% "munit" % "0.7.26" % Test
    )
  )
  .dependsOn(scala2, common)

lazy val scala2 = project
  .settings(
    scalaVersion := scala2V,
    libraryDependencies ++= Seq(
      sparkCore,
      sparkSql
    )
  )
  .dependsOn(common)

lazy val common = project
  .settings(
    scalaVersion := scala3V,
    crossScalaVersions := Seq(scala2V, scala3V)
  )

//netty-all replaces all these excludes
def excludes(m: ModuleID): ModuleID =
  m.exclude("io.netty", "netty-common").
    exclude("io.netty", "netty-handler").
    exclude("io.netty", "netty-transport").
    exclude("io.netty", "netty-buffer").
    exclude("io.netty", "netty-codec").
    exclude("io.netty", "netty-resolver").
    exclude("io.netty", "netty-transport-native-epoll").
    exclude("io.netty", "netty-transport-native-unix-common").
    exclude("javax.xml.bind", "jaxb-api").
    exclude("jakarta.xml.bind", "jaxb-api").
    exclude("javax.activation", "activation").
    exclude("jakarta.annotation", "jakarta.annotation-api").
    exclude("javax.annotation", "javax.annotation-api")

// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true

Aktualisieren. Alternativ können Sie in Scala 3 TypeTagmit der Scala 2-Laufzeitkompilierung (reflektierende Toolbox) rechnen: Wie kompiliere und führe ich Scala-Code zur Laufzeit in Scala3 aus?

Scala 2-Makros funktionieren nicht, also können wir nicht runtime.currentMirroroder q"..."aber können universe.runtimeMirror, tb.parse. Es stellt sich heraus, dass dies in Scala 3 immer noch funktioniert.

// this is Scala 3
import org.apache.spark.sql.*
import scala.tools.reflect.ToolBox
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe.*
import mypackage.classa

val rm = universe.runtimeMirror(getClass.getClassLoader)
val tb = rm.mkToolBox()
val typeTag = tb.eval(tb.parse(
    "scala.reflect.runtime.universe.typeTag[mypackage.classa]"
  )).asInstanceOf[TypeTag[classa]]

Encoders.product[classa](typeTag).schema
// Seq(StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(s,StringType,true))

build.sbt

lazy val sparkCore = "org.apache.spark" %% "spark-core" % "3.2.0"
lazy val sparkSql = "org.apache.spark" %% "spark-sql" % "3.2.0"
lazy val scala3V = "3.1.3"
lazy val scala2V = "2.13.8"

lazy val root = project
  .in(file("."))
  .settings(
    scalaVersion := scala3V,
    scalacOptions ++= Seq(
      "-language:implicitConversions",
      "-deprecation"
    ),
    libraryDependencies ++= Seq(
      excludes(sparkCore.cross(CrossVersion.for3Use2_13)),
      excludes(sparkSql.cross(CrossVersion.for3Use2_13)),
      excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
      "org.scalameta" %% "munit" % "0.7.26" % Test,
      scalaOrganization.value % "scala-reflect" % scala2V,
      scalaOrganization.value % "scala-compiler" % scala2V,
    ),
  )

def excludes(m: ModuleID): ModuleID =
  m.exclude("io.netty", "netty-common").
    exclude("io.netty", "netty-handler").
    exclude("io.netty", "netty-transport").
    exclude("io.netty", "netty-buffer").
    exclude("io.netty", "netty-codec").
    exclude("io.netty", "netty-resolver").
    exclude("io.netty", "netty-transport-native-epoll").
    exclude("io.netty", "netty-transport-native-unix-common").
    exclude("javax.xml.bind", "jaxb-api").
    exclude("jakarta.xml.bind", "jaxb-api").
    exclude("javax.activation", "activation").
    exclude("jakarta.annotation", "jakarta.annotation-api").
    exclude("javax.annotation", "javax.annotation-api")

// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true


Beantwortet von –
Dmytro Mitin


Antwort geprüft von –
Robin (FixError Admin)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like