ฉันกำลังพยายามเขียนเป็นชิ้นส่วนของโค้ดซึ่งทำต่อไปนี้:-
- อ่านหนังสือขนาดใหญ่แฟ้ม csv จากแฟ้มระยะไกลแหล่งข่าวเหมือน s3.
- ประมวลผลแฟ้มค้องค้นบันทึกจากบันทึก
- ส่งการแจ้งให้ทราบต้องของผู้ใช้
- เขียนส่งผลไปยังเครือข่ายระยะไกล
ตัวอย่างอยู่ในบันทึกข้อมูลแฟ้ม csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
นำเข้าข้อมูลของฉันคดีชั้นเรียนซึ่งเป็นตัวแทนขอบันทึกอยู่ในนำเข้าแฟ้ม csv:
case class InputRecord(recordId: String, name: String, salary: Long)
ตัวอย่างอยู่ในบันทึกผลส่งออกแฟ้ม csv(นั่นต้องการที่จะถูกเขียน):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
ของฉันออกกรณีคลาสซึ่งเป็นตัวแทนขอบันทึกอยู่ในนำเข้าแฟ้ม csv:
case class OutputRecord(recordId: String, name: String, designation: String)
อ่านบันทึกการใช้ akka อสายข้อมูลแฟ้ม csv(ใช้ Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
ตอนนี้ฉันมีงานต้องการบันทึก:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
ฟังก์ชันจะเขียน OutputRecord เป็นแฟ้ม csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
ฟังก์ชันส่งอีเมลการแจ้งให้ทราบ:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
ตะเข็บมันทั้งหมดด้วยกัน
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
บนเส้นเวลา 15&16 ฉันได้ข้อผิดพลาดเกิดขึ้นระหว่างฉันเหมือนกัสามารถเพิ่มเส้นเวลา 15 หรือเส้นอายุ 16 แต่ไม่ใช่ทั้งสองตั้งแต่ทั้งสอง notify
& writeOutput
ต้องการ outputRecord
. แจ้งให้ทราบเมื่อคืนโทรหาฉันหลุดของฉัน outputRecord
.
มีทางที่ฉันสามารถเพิ่มทั้งสอง notify
แล้ว writeOutput
ที่เดียวกับกราฟ?
ฉันไม่ได้กำลังมองหาการประมวลผลคู่ขนานเหมือนกับที่ผมอยากเรียนแรก notify
และจากนั้นเท่านั้น writeOutput
. งั้นนี่ก็ไม่ช่วย: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
การใช้กรณีดูเหมือนธรรมดาสำหรับฉันหรอกแต่ยังไงฉันไม่สามารถค้นหาปลอดทางออกหรอกนะ