ข้อความด่วนทันใจค่อนข้างแปลกใหม่ต้อง Language และ Akka อสายข้อมูลและข้อความด่วนทันใจพยายามจะ LANGUAGE ข้อความข้อความจาก websocket และผลักดันพวกเขาต้องเป็น Kafka กเรื่อง
สำหรับตอนนี้ฉันเป็นเพียงทำงานกับ"เอาจดหมายจาก ws"ส่วนหนึ่ง.
ข้อความมาจาก websocket ดูเหมือนนี้:
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
ฉันต้องการที่จะแยกกันนี่ LANGUAGE ข้อความไปหลายจดหมาย:
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
และจากนั้นผลักกันและของข้อความพวกนี้ต้องเป็น kafka กเรื่อง
นี่คือสิ่งที่ฉันทำจะประสบผลสำเร็จดังนั้นห่างไกล:
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
มันเป็นงานข้อความด่วนทันใจได้ที่คาดหวังผลส่งออก Language ข้อความแต่ฉันสงสัยว่าถ้าฉันสามารถเขียนโปรดิวเซอร์ในอีก"Akka-ค่อนข้า"รูปแบบเหมือนใช้ GraphDSL. ดังนั้นฉันอยากจะถามนิดหน่อ:
- มันจะเป็นไปได้ continuously กิน WebSocket ใช้ GraphDSL? ถ้าใช่คุณสามารถแสดงให้ฉันเป็นตัวอย่างขอร้อง?
- มันเป็นความคิดที่ดีที่จะเพียงพอที่ WS ใช้ GraphDSL?
- ฉันควรจะเน่าไปซะก่อนครับ Language ข้อความชอบข้อความด่วนทันใจทำอะไรก่อนจะทำการส่งมันให้ kafka? หรือมันจะดีกว่าให้ส่งมันอย่างที่มันสำหรับด้านล่าง latency?
- หลังจากร่วมข้อความต้อง Kafka ฉันวางแผนจะกัดกินมันใช้ปูมบันทึกของ apache พายุคือมันเป็นความคิดที่ดีงั้นเหรอ? หรือฉันควรจะอยู่กับ Akka?
ขอบคุณสำหรับอ่านผมฝากฝัง, Arès