Concatinating สองคน Flows ใน Akka อสายข้อมูล

0

คำถาม

ฉันกำลังพยายาม concat สองคน Flows และฉันไม่สามารถที่จะอธิบายเรื่องผลส่งออกของฉัน implementation.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

ฉันคาดหวังต่อไปนี้ส่งออกจากนี้เป็นรหัสมอส

2
3
4
.
.
.
11
10
20
.
.
.
100

แทนที่จะเป็นฉันเห็นเท่านั้น"2"การถูกตีพิมพ์ คุณสามารถช่วยอธิบายอะไรผิดของฉันอยู่ใน implmentation แล้วฉันควรจะเปลี่ยนโปรแกรมต้องไปที่ต้องการแสดงผลหน้าจอ

akka akka-stream scala
2021-10-21 17:29:00
2

คำตอบที่ดีที่สุด

3

จาก Akka อสายข้อมูลเป็นรูปแบบ api docs:

Concat:

มันปล่อตอนที่อสายข้อมูลปัจจุบันมีธาตุที่มีอยู่หากปัจจุบันข้อมูล completes มันพยายามต่อไป

Broadcast:

มันปล่อเมื่อเรื่องทั้งหมดที่แสดงผลหยุด backpressuring และนั่นคือการนำเข้าอีลีเมนต์ที่มีอยู่

สองคน operators จะไม่ทำงานใน the grammatical type of a word ที่มีความขัดแย้งในวิธีพวกเขาทำงาน-- Concat พยายามจะดึงทั้งส่วนประกอบจากหนึ่ง Broadcast's แสดงผลก่อนที่เปลี่ยนกลับอีกคนหนึ่งแต่ว่า Broadcast จะไม่ส่งเสีเว้นแต่ว่ามันต้องการสำหรับทั้งหมดของมันแสดงผล.

สำหรับสิ่งที่คุณต้องการคุณสามารถใช้ concatenate concat ที่แนะนำโดย commenters:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

หรือ equivalently ใช้ Source.combine เหมือนด้านล่างนี้:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

ใช้ GraphDSLซึ่งเป็นรุ่นพื้นฐานของ implementation ของ แหล่งข้อมูลรวม:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

ในภาษาอื่นๆ

หน้านี้อยู่ในภาษาอื่นๆ

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................