Ausgabe
Ich habe diese ähnliche, aber 7 Jahre alte Frage überprüft, sie gilt jedoch nicht für neuere Flink-Versionen.
Ich versuche, einen einfachen Flink Kafka-Job zum Laufen zu bringen, und habe verschiedene Versionen ausprobiert, die jeweils unterschiedliche Kompilierungsfehler erhalten. Ich verwende sbt, um meine Abhängigkeiten zu verwalten:
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-clients" % flinkVersion,
"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion
)
Erprobte Versionen:
scala 2.11.12 und 2.12.15
flink 1.14.6
Der Code, den ich zu kompilieren versuche (relevante Bits):
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
...
val env = ExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = new KafkaSource.builder[String]
.setBootstrapservers("localhost:9092")
.setGroupId("flink")
.setTopics("test")
.build()
val text = env.fromSource(kafkaConsumer)
Ich habe kein offizielles Beispiel dafür gefunden, dass dies tatsächlich so ist, wie man das verwenden soll, KafkaSource
aber ich habe dieses Setup hier und hier gefunden . Für meine noch sehr neuen Java-Augen sieht dies mit den API-Dokumenten ausgerichtet aus . Aber ja, ich kann es mit keiner der Scala-Versionen zum Laufen bringen:
[error] somepathwithmyfile: type builder is not a member of object org.apache.flink.connector.kafka.source.KafkaSource
[error] val kafkaConsumer = new KafkaSource.builder[String]
[error] ^
[error] somepathwithmyfile: value fromSource is not a member of org.apache.flink.api.scala.ExecutionEnvironment
[error] val text = env.fromSource(kafkaConsumer)
[error] ^
[error] two errors found
Lösung
Lassen Sie für das erste Problem Folgendes fallen new
:
val kafkaConsumer = KafkaSource.builder[String]
...
Für das zweite Problem fromSource
sind drei Argumente erforderlich:
/** Create a DataStream using a [[Source]]. */
@Experimental
def fromSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
}
Beachten Sie auch, dass Flink scala 2.12.15 (noch) nicht unterstützt. Siehe https://issues.apache.org/jira/browse/FLINK-20969 . Flink 1.15 kann jedoch mit neueren Versionen von Scala (einschließlich Scala 3) verwendet werden, wenn Sie die integrierte Scala-API-Unterstützung von Flink ausschließen. Siehe https://flink.apache.org/2022/02/22/scala-free.html für mehr dazu.
Beantwortet von – David Anderson
Antwort geprüft von – Mary Flores (FixError Volunteer)