Akka อสายข้อมูล continuously กิ websocket

0

คำถาม

ข้อความด่วนทันใจค่อนข้างแปลกใหม่ต้อง Language และ Akka อสายข้อมูลและข้อความด่วนทันใจพยายามจะ LANGUAGE ข้อความข้อความจาก websocket และผลักดันพวกเขาต้องเป็น Kafka กเรื่อง

สำหรับตอนนี้ฉันเป็นเพียงทำงานกับ"เอาจดหมายจาก ws"ส่วนหนึ่ง.

ข้อความมาจาก websocket ดูเหมือนนี้:

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

ฉันต้องการที่จะแยกกันนี่ LANGUAGE ข้อความไปหลายจดหมาย:

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

และจากนั้นผลักกันและของข้อความพวกนี้ต้องเป็น kafka กเรื่อง

นี่คือสิ่งที่ฉันทำจะประสบผลสำเร็จดังนั้นห่างไกล:

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

มันเป็นงานข้อความด่วนทันใจได้ที่คาดหวังผลส่งออก Language ข้อความแต่ฉันสงสัยว่าถ้าฉันสามารถเขียนโปรดิวเซอร์ในอีก"Akka-ค่อนข้า"รูปแบบเหมือนใช้ GraphDSL. ดังนั้นฉันอยากจะถามนิดหน่อ:

  • มันจะเป็นไปได้ continuously กิน WebSocket ใช้ GraphDSL? ถ้าใช่คุณสามารถแสดงให้ฉันเป็นตัวอย่างขอร้อง?
  • มันเป็นความคิดที่ดีที่จะเพียงพอที่ WS ใช้ GraphDSL?
  • ฉันควรจะเน่าไปซะก่อนครับ Language ข้อความชอบข้อความด่วนทันใจทำอะไรก่อนจะทำการส่งมันให้ kafka? หรือมันจะดีกว่าให้ส่งมันอย่างที่มันสำหรับด้านล่าง latency?
  • หลังจากร่วมข้อความต้อง Kafka ฉันวางแผนจะกัดกินมันใช้ปูมบันทึกของ apache พายุคือมันเป็นความคิดที่ดีงั้นเหรอ? หรือฉันควรจะอยู่กับ Akka?

ขอบคุณสำหรับอ่านผมฝากฝัง, Arès

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

คำตอบที่ดีที่สุด

1

โค้ดนั่นคือเหลือเฟือ Akka-ค่อนข้า: scaladsl เป็นเพียงเป็น Akka เหมือ GraphDSL หรือ implementing องการกำหนด GraphStage. เหตุผลเดียวที่,IMO/ส่งอีเมล์เพื่อไป GraphDSL คือถ้าคนรูปร่างแท้จริงของกราฟไม่ถึงวาระแห่งความ expressible ใน scaladsl.

ฉันจะส่วนตัวแล้วไปที่เส้นทางของ defining น CoinPrice ชั้นเรียนเพื่อทำให้คนรุ่น explicit

case class CoinPrice(coin: String, price: BigDecimal)

และจากนั้นมี Flow[Message, CoinPrice, NotUsed] ซึ่ง parses 1 คอยดูเอาไว้เจ็ตไฟร์บข้อความเข้าไปในศูนย์หรือมากกว่า CoinPrices. บางอย่าง(โดยการใช้เล่น LANGUAGE อยู่ที่นี่เหมือน:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

คุณอาจจะขึ้นอยู่กับว่าขนาดของ JSONs ในช่องเนื้อหาจดหมายเป็นต้องการที่จะทำลายมันเข้าไปในแตกต่างข้อมูลเสียงขั้นเพื่ออนุญาตให้สำหรับการ async ขอบเขตระหว่าง LANGUAGE การวิเคราะห์และคลายแฟ้มไป CoinPrices. สำหรับตัวอย่างเช่น

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

ในชั้นสู่ขั้องอีกด้านหนึ่งของ async ขอบเขตจะประมวลผลในแยกออกนักแสดงและดังนั้น,อาจจะ concurrently(ถ้ามีเพียงพอตัวประมวลผลหลักแกนที่มีอยู่เป็นต้น) ที่ต้องเสียของพิเศษอยู่เหนือศีรษะที่นักแสดงประสานงานและการแลกเปลี่ยนข้อความ มันพิเศษความร่วมมือกัน/การสื่อสารอยู่เหนือศีรษะ(ซีเอฟแอน. Gunther เป็นรูปแบบสากล Scalability กฏหมาย)นั้นจะไม่คุ้มค่ามันถ้า LANGUAGE วัตถุเป็นที่พัฒนาอย่างเหมาะสขนาดใหญ่และอลังอยู่ในที่พัฒนาอย่างเหมาะสองรี(อย่างต่อเนื่องมาก่อนก่อนหน้ามีเสร็จสิ้นการประมวลผล).

ถ้าความตั้งใจของคุณคือต้องกินเวลาที่ websocket จนกว่าโปรแกรมหยุดคุณอาจจะเจอมันชัดเจนที่เพิ่งใช้ Source.never[Message].

2021-11-21 12:42:30

ขอบคุณสำหรับคุณตอบมันชัดเจนมากฉันมีแค่คำถามเดี à.. จะให้ฉันพักของฉันตอบสนองไปในแตกต่างข้อมูลเสียงขั้น? คุณก็แค่แสดงให้ฉันหน่อยตัวอย่างขอร้อง? หรือ orient ฉันไปที่ที่เหมาะสมส่วนหนึ่งของเอกสาร?
Arès

ในภาษาอื่นๆ

หน้านี้อยู่ในภาษาอื่นๆ

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................