Akka ไหลเวียนข้อมูลเข้า(`ไว้ใน`)เป็นผลส่งออก(`ออก`)

0

คำถาม

ฉันกำลังพยายามเขียนเป็นชิ้นส่วนของโค้ดซึ่งทำต่อไปนี้:-

  1. อ่านหนังสือขนาดใหญ่แฟ้ม csv จากแฟ้มระยะไกลแหล่งข่าวเหมือน s3.
  2. ประมวลผลแฟ้มค้องค้นบันทึกจากบันทึก
  3. ส่งการแจ้งให้ทราบต้องของผู้ใช้
  4. เขียนส่งผลไปยังเครือข่ายระยะไกล

ตัวอย่างอยู่ในบันทึกข้อมูลแฟ้ม csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

นำเข้าข้อมูลของฉันคดีชั้นเรียนซึ่งเป็นตัวแทนขอบันทึกอยู่ในนำเข้าแฟ้ม csv:

case class InputRecord(recordId: String, name: String, salary: Long)

ตัวอย่างอยู่ในบันทึกผลส่งออกแฟ้ม csv(นั่นต้องการที่จะถูกเขียน):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

ของฉันออกกรณีคลาสซึ่งเป็นตัวแทนขอบันทึกอยู่ในนำเข้าแฟ้ม csv:

case class OutputRecord(recordId: String, name: String, designation: String)

อ่านบันทึกการใช้ akka อสายข้อมูลแฟ้ม csv(ใช้ Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

ตอนนี้ฉันมีงานต้องการบันทึก:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

ฟังก์ชันจะเขียน OutputRecord เป็นแฟ้ม csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

ฟังก์ชันส่งอีเมลการแจ้งให้ทราบ:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

ตะเข็บมันทั้งหมดด้วยกัน

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

บนเส้นเวลา 15&16 ฉันได้ข้อผิดพลาดเกิดขึ้นระหว่างฉันเหมือนกัสามารถเพิ่มเส้นเวลา 15 หรือเส้นอายุ 16 แต่ไม่ใช่ทั้งสองตั้งแต่ทั้งสอง notify & writeOutput ต้องการ outputRecord. แจ้งให้ทราบเมื่อคืนโทรหาฉันหลุดของฉัน outputRecord.

มีทางที่ฉันสามารถเพิ่มทั้งสอง notify แล้ว writeOutput ที่เดียวกับกราฟ?

ฉันไม่ได้กำลังมองหาการประมวลผลคู่ขนานเหมือนกับที่ผมอยากเรียนแรก notify และจากนั้นเท่านั้น writeOutput. งั้นนี่ก็ไม่ช่วย: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

การใช้กรณีดูเหมือนธรรมดาสำหรับฉันหรอกแต่ยังไงฉันไม่สามารถค้นหาปลอดทางออกหรอกนะ

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

คำตอบที่ดีที่สุด

1

การส่งออกของ notify เป็ PushResultแต่ข้อมูลของ writeOutput นี่ ByteString. เมื่อคุณเปลี่ยนแปลงมันจะ compile. ในกรณีที่คุณต้องการ ByteStringไปจากเดียวกัน OutputRecord.

BTW,อยู่ตัวอย่างรหัสนั่นคุณต้องเตรียมไว้ให้เป็นที่คล้ายกันเกิดข้อผิดพลาดอยู่ readCSV แล้ว process.

2021-11-24 03:36:16

ในภาษาอื่นๆ

หน้านี้อยู่ในภาษาอื่นๆ

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................