Ausgabe
Ich führe einen Spark-Scala-Job auf dem GCP DataProc-Cluster aus. Nach der Verarbeitung von Daten muss ich Nachrichten im PubSub-Thema veröffentlichen, aber ich erhalte eine Fehlermeldung, wie unten erwähnt.
Kein funktionaler Kanaldienstanbieter gefunden. Versuchen Sie, eine Abhängigkeit vom grpc-okhttp-, grpc-netty- oder grpc-netty-shaded-Artefakt hinzuzufügen
Alles funktioniert gut bis zur Funkenverarbeitung. Sobald ich eine Nachricht an PubSub veröffentliche, erhalte ich diesen Fehler. Schau dir den Code an…
Try {
val topicName = TopicName.of(projectName, pubSubTopicName)
val scope = new ArrayList[String]()
scope.add("https://www.googleapis.com/auth/pubsub")
val googleCredentials = GoogleCredentials
.fromStream(getClass.getResourceAsStream("file path")
.createScoped(scope)
val batchingSettings = BatchingSettings
.newBuilder()
.setElementCountThreshold(elementCountThreshold)
.setRequestByteThreshold(requestByteThreshold)
.setDelayThreshold(delayDuration)
.build()
val publisher = getPublisher(
topicName,
batchingSettings,
googleCredentials
)
val publishedData: MutableList[String] = MutableList()
for (pubMessage <- dataToBePublished) {
val pubSubMessage =
getPubSubMessage(
ByteString.copyFromUtf8(pubMessage)
)
val messageIdFuture = publisher.publish(pubSubMessage)
publishedData.+=(messageIdFuture.get)
}
}
def getPublisher(
topicName: TopicName,
batchingSettings: BatchingSettings,
googleCredentials: GoogleCredentials
): Publisher = {
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setBatchingSettings(batchingSettings)
.build()
}
def getPubSubMessage (Daten: ByteString): PubsubMessage = {
PubsubMessage
.newBuilder()
.setData(data)
.build()
}
Da es einen Kanalfehler zeigt, habe ich die folgende Änderung in Publisher versucht, aber derselbe Fehler
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setChannelProvider(
TopicAdminSettings
.defaultGrpcTransportProviderBuilder()
.build()
)
.build()
Ich habe auch versucht, Abhängigkeiten in sbt hinzuzufügen, aber immer noch derselbe Fehler
"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"
Alle drei vorgeschlagenen Abhängigkeiten sind in Bibliotheken vorhanden, immer noch Fehler.
Bitte helfen Sie bei diesem Problem, danke im Voraus.
Lösung
Das Problem besteht also darin, Fat Jar aufgrund der Pubsub-Bibliothek zusammenzustellen.
Hier sind die Änderungen, die in build.sbt erforderlich sind
- Add dependency of grpc-netty only.
“io.grpc” % “grpc-netty” % “1.49.2”
- change merge strategy in jar assembling.
assemblyShadeRules in assembly := Seq( ShadeRule
.rename(“com.google.common.” -> “repackaged.com.google.common.@1”)
.inAll, ShadeRule
.rename(“com.google.protobuf.” -> “repackaged.com.google.protobuf.@1”)
.inAll )
- Also, shade (repackage) some of com.google’s libraries which cause issue at runtime in publisher creation.
assemblyMergeStrategy in assembly := { case x if
Assembly.isConfigFile(x) =>
MergeStrategy.concat case PathList(ps @ _)
if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) =>
MergeStrategy.rename case PathList(“META-INF”, xs @ _) =>
(xs map { _.toLowerCase }) match {
case (“manifest.mf” :: Nil) | (“index.list” :: Nil) |
(“dependencies” :: Nil) =>
MergeStrategy.discard
case ps @ (x :: xs)
if ps.last.endsWith(“.sf”) || ps.last.endsWith(“.dsa”) =>
MergeStrategy.discard
case “plexus” :: xs =>
MergeStrategy.discard
case “services” :: xs =>
MergeStrategy.filterDistinctLines
case (“spring.schemas” :: Nil) | (“spring.handlers” :: Nil) =>
MergeStrategy.filterDistinctLines
case _ => MergeStrategy.first
} case _ => MergeStrategy.first }
Dies funktioniert ohne Laufzeitfehler.
Beantwortet von – Arjun V
Antwort geprüft von – Jay B. (FixError Admin)