[FIXED] Akka Streams – Auffächern mit Filter

Ausgabe

Ich versuche, aufgrund eines Fanouts einen separaten Filter auf 2 Ausgabestreams anzuwenden. Ich gebe 3 Objekte der Typprüfung aus, die wie folgt definiert sind:

case class Test(test: String, tester: Double)

Das Ausführen des folgenden src führt zu keinen Ergebnissen:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source, Zip}

object TestFanOut extends App {
  implicit val actorSystem = ActorSystem()

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
  val output = Sink.foreach[(Test , Test)](println)
  val input = Source.repeat(Test("1" , 23)).take(3)

  case class Test(test: String, tester: Double)

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Test](2))
      val zip = builder.add(Zip[Test , Test])

      input ~> broadcast

      broadcast.out(0) ~> filter1 ~> zip.in0
      broadcast.out(1) ~> filter2 ~> zip.in1

      zip.out ~> output

      ClosedShape

    }
  )

  graph.run()
}

Ändern Sie jedoch die Filter zu:

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))

produziert:

(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))

Es scheint, dass meine Filterlogik nicht korrekt angewendet wird?

Wenn ich die Filter definiere als:

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))

Ich erwarte eine Ausgabe von:

(Test(1,23.0),Test(1,23.0))
(Test(1,23.0))

Da dies mit einem der Fan-Out-Streams übereinstimmt.

Lösung

Ziperfordert die Ausgabe von beiden Flows. In diesem Fall ist dies aufgrund des Filters nicht möglich.

Stattdessen könnten Sie verwenden Merge,

val output = Sink.foreach[Test](println)

val graph = RunnableGraph.fromGraph(
  GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val broadcast = builder.add(Broadcast[Test](2))
    val merge = builder.add(Merge[Test](2))

    input ~> broadcast

    broadcast.out(0) ~> filter1 ~> merge
    broadcast.out(1) ~> filter2 ~> merge

    merge ~> output

    ClosedShape

  }
) 

Dies würde drucken:

Test(1,23.0)
Test(1,23.0)
Test(1,23.0)

Auf der anderen Seite, wenn Sie gezippte Inhalte möchten, können Sie mit Option,

val filter1 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("2")))
val filter2 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("1")))
val output = Sink.foreach[(Option[Test] , Option[Test])](println)
val input = Source.repeat(Option(Test("1" , 23))).take(3)

val graph = RunnableGraph.fromGraph(
  GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val broadcast = builder.add(Broadcast[Option[Test]](2))
    val zip = builder.add(Zip[Option[Test] , Option[Test]])

    input ~> broadcast

    broadcast.out(0) ~> filter1 ~> zip.in0
    broadcast.out(1) ~> filter2 ~> zip.in1

    zip.out ~> output

    ClosedShape

  }
)

Dies würde drucken:

(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))


Beantwortet von –
Johny T. Koshy


Antwort geprüft von –
Marie Seifert (FixError Admin)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like