[FIXED] So verwenden Sie KafkaSource von Flink mit Scala im Jahr 2022

Ausgabe

Ich habe diese ähnliche, aber 7 Jahre alte Frage überprüft, sie gilt jedoch nicht für neuere Flink-Versionen.

Ich versuche, einen einfachen Flink Kafka-Job zum Laufen zu bringen, und habe verschiedene Versionen ausprobiert, die jeweils unterschiedliche Kompilierungsfehler erhalten. Ich verwende sbt, um meine Abhängigkeiten zu verwalten:

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
)

Erprobte Versionen:

scala 2.11.12 und 2.12.15


flink 1.14.6

Der Code, den ich zu kompilieren versuche (relevante Bits):

import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource

...

  val env = ExecutionEnvironment.getExecutionEnvironment
  val kafkaConsumer = new KafkaSource.builder[String]
    .setBootstrapservers("localhost:9092")
    .setGroupId("flink")
    .setTopics("test")
    .build()

  val text = env.fromSource(kafkaConsumer)

Ich habe kein offizielles Beispiel dafür gefunden, dass dies tatsächlich so ist, wie man das verwenden soll, KafkaSourceaber ich habe dieses Setup hier und hier gefunden . Für meine noch sehr neuen Java-Augen sieht dies mit den API-Dokumenten ausgerichtet aus . Aber ja, ich kann es mit keiner der Scala-Versionen zum Laufen bringen:

[error] somepathwithmyfile: type builder is not a member of object org.apache.flink.connector.kafka.source.KafkaSource
[error]     val kafkaConsumer = new KafkaSource.builder[String]
[error]                                         ^
[error] somepathwithmyfile: value fromSource is not a member of org.apache.flink.api.scala.ExecutionEnvironment
[error]     val text = env.fromSource(kafkaConsumer)
[error]                    ^
[error] two errors found

Lösung

Lassen Sie für das erste Problem Folgendes fallen new:

 val kafkaConsumer = KafkaSource.builder[String]
   ...

Für das zweite Problem fromSourcesind drei Argumente erforderlich:

  /** Create a DataStream using a [[Source]]. */
  @Experimental
  def fromSource[T: TypeInformation](
      source: Source[T, _ <: SourceSplit, _],
      watermarkStrategy: WatermarkStrategy[T],
      sourceName: String): DataStream[T] = {

    val typeInfo = implicitly[TypeInformation[T]]
    asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
  }

Beachten Sie auch, dass Flink scala 2.12.15 (noch) nicht unterstützt. Siehe https://issues.apache.org/jira/browse/FLINK-20969 . Flink 1.15 kann jedoch mit neueren Versionen von Scala (einschließlich Scala 3) verwendet werden, wenn Sie die integrierte Scala-API-Unterstützung von Flink ausschließen. Siehe https://flink.apache.org/2022/02/22/scala-free.html für mehr dazu.


Beantwortet von –
David Anderson


Antwort geprüft von –
Mary Flores (FixError Volunteer)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like