[FIXED] Fehler beim Veröffentlichen von Daten in pubSub aus dem dataProc-Spark-Job: Kein funktionaler Kanaldienstanbieter gefunden

Ausgabe

Ich führe einen Spark-Scala-Job auf dem GCP DataProc-Cluster aus. Nach der Verarbeitung von Daten muss ich Nachrichten im PubSub-Thema veröffentlichen, aber ich erhalte eine Fehlermeldung, wie unten erwähnt.

Kein funktionaler Kanaldienstanbieter gefunden. Versuchen Sie, eine Abhängigkeit vom grpc-okhttp-, grpc-netty- oder grpc-netty-shaded-Artefakt hinzuzufügen

Alles funktioniert gut bis zur Funkenverarbeitung. Sobald ich eine Nachricht an PubSub veröffentliche, erhalte ich diesen Fehler. Schau dir den Code an…

Try {

  val topicName = TopicName.of(projectName, pubSubTopicName)

  val scope = new ArrayList[String]()
  scope.add("https://www.googleapis.com/auth/pubsub")

  val googleCredentials = GoogleCredentials
        .fromStream(getClass.getResourceAsStream("file path")
        .createScoped(scope)

  val batchingSettings = BatchingSettings
  .newBuilder()
  .setElementCountThreshold(elementCountThreshold)
  .setRequestByteThreshold(requestByteThreshold)
  .setDelayThreshold(delayDuration)
  .build()

  val publisher = getPublisher(
    topicName,
    batchingSettings,
    googleCredentials
  )

  val publishedData: MutableList[String] = MutableList()

  for (pubMessage <- dataToBePublished) {
    val pubSubMessage =
      getPubSubMessage(
        ByteString.copyFromUtf8(pubMessage)
      )

    val messageIdFuture = publisher.publish(pubSubMessage)

    publishedData.+=(messageIdFuture.get)
  }
}

def getPublisher(
      topicName: TopicName,
      batchingSettings: BatchingSettings,
      googleCredentials: GoogleCredentials
  ): Publisher = {

Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setBatchingSettings(batchingSettings)
  .build()

}

def getPubSubMessage (Daten: ByteString): PubsubMessage = {

PubsubMessage
  .newBuilder()
  .setData(data)
  .build()

}

Da es einen Kanalfehler zeigt, habe ich die folgende Änderung in Publisher versucht, aber derselbe Fehler

    Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setChannelProvider(
    TopicAdminSettings
      .defaultGrpcTransportProviderBuilder()
      .build()
  )
  .build()

Ich habe auch versucht, Abhängigkeiten in sbt hinzuzufügen, aber immer noch derselbe Fehler

"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"

Alle drei vorgeschlagenen Abhängigkeiten sind in Bibliotheken vorhanden, immer noch Fehler.

Bitte helfen Sie bei diesem Problem, danke im Voraus.

Lösung

Das Problem besteht also darin, Fat Jar aufgrund der Pubsub-Bibliothek zusammenzustellen.

Hier sind die Änderungen, die in build.sbt erforderlich sind

  • Add dependency of grpc-netty only.

“io.grpc” % “grpc-netty” % “1.49.2”

  • change merge strategy in jar assembling.

assemblyShadeRules in assembly := Seq( ShadeRule
.rename(“com.google.common.” -> “repackaged.com.google.common.@1”)
.inAll, ShadeRule
.rename(“com.google.protobuf.
” -> “repackaged.com.google.protobuf.@1”)
.inAll )

  • Also, shade (repackage) some of com.google’s libraries which cause issue at runtime in publisher creation.

assemblyMergeStrategy in assembly := { case x if
Assembly.isConfigFile(x) =>
MergeStrategy.concat case PathList(ps @ _)
if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) =>
MergeStrategy.rename case PathList(“META-INF”, xs @ _
) =>
(xs map { _.toLowerCase }) match {
case (“manifest.mf” :: Nil) | (“index.list” :: Nil) |
(“dependencies” :: Nil) =>
MergeStrategy.discard
case ps @ (x :: xs)
if ps.last.endsWith(“.sf”) || ps.last.endsWith(“.dsa”) =>
MergeStrategy.discard
case “plexus” :: xs =>
MergeStrategy.discard
case “services” :: xs =>
MergeStrategy.filterDistinctLines
case (“spring.schemas” :: Nil) | (“spring.handlers” :: Nil) =>
MergeStrategy.filterDistinctLines
case _ => MergeStrategy.first
} case _ => MergeStrategy.first }

Dies funktioniert ohne Laufzeitfehler.


Beantwortet von –
Arjun V


Antwort geprüft von –
Jay B. (FixError Admin)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like

[FIXED] Funkenfassade für CaseWhen

Ausgabe Ich versuche, eine Funktion zu erstellen, die wie eine Fassade für die CaseWhenSpark-Funktion wirkt. CaseWhen( branches: Seq[(Expression,…