Ausgabe
Ich versuche, aufgrund eines Fanouts einen separaten Filter auf 2 Ausgabestreams anzuwenden. Ich gebe 3 Objekte der Typprüfung aus, die wie folgt definiert sind:
case class Test(test: String, tester: Double)
Das Ausführen des folgenden src führt zu keinen Ergebnissen:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source, Zip}
object TestFanOut extends App {
implicit val actorSystem = ActorSystem()
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
val output = Sink.foreach[(Test , Test)](println)
val input = Source.repeat(Test("1" , 23)).take(3)
case class Test(test: String, tester: Double)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Test](2))
val zip = builder.add(Zip[Test , Test])
input ~> broadcast
broadcast.out(0) ~> filter1 ~> zip.in0
broadcast.out(1) ~> filter2 ~> zip.in1
zip.out ~> output
ClosedShape
}
)
graph.run()
}
Ändern Sie jedoch die Filter zu:
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
produziert:
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
Es scheint, dass meine Filterlogik nicht korrekt angewendet wird?
Wenn ich die Filter definiere als:
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
Ich erwarte eine Ausgabe von:
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0))
Da dies mit einem der Fan-Out-Streams übereinstimmt.
Lösung
Zip
erfordert die Ausgabe von beiden Flow
s. In diesem Fall ist dies aufgrund des Filters nicht möglich.
Stattdessen könnten Sie verwenden Merge
,
val output = Sink.foreach[Test](println)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Test](2))
val merge = builder.add(Merge[Test](2))
input ~> broadcast
broadcast.out(0) ~> filter1 ~> merge
broadcast.out(1) ~> filter2 ~> merge
merge ~> output
ClosedShape
}
)
Dies würde drucken:
Test(1,23.0)
Test(1,23.0)
Test(1,23.0)
Auf der anderen Seite, wenn Sie gezippte Inhalte möchten, können Sie mit Option
,
val filter1 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("2")))
val filter2 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("1")))
val output = Sink.foreach[(Option[Test] , Option[Test])](println)
val input = Source.repeat(Option(Test("1" , 23))).take(3)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Option[Test]](2))
val zip = builder.add(Zip[Option[Test] , Option[Test]])
input ~> broadcast
broadcast.out(0) ~> filter1 ~> zip.in0
broadcast.out(1) ~> filter2 ~> zip.in1
zip.out ~> output
ClosedShape
}
)
Dies würde drucken:
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))
Beantwortet von – Johny T. Koshy
Antwort geprüft von – Marie Seifert (FixError Admin)