Flink(บ docker)ที่กินดอกข้อมูลจาก Kafka(บ docker)

0

คำถาม

ฉันต้อง Flink(งานตัวจัดการแล้วงานตัวจัดการ)และ Kafka ลังวิ่งอย่างที่ docker ภาพของฉันแม็ค.
ฉันต้องสร้าง Flink งานแล้วย้ายไปมัน ทำงานใช้ FlinkKafkaConsumer แล้ว FlinkKafkaProducer และควรจะกัดกินจาก kafka และผักลับไป kafka.

ดูเหมือนว่า "bootstrap.servers" ฉันใช้(kafka:9092)ไม่มีความหมายสำหรับ Flink ซึ่งล้มเหลวด้วย:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

docker ps ผล

CONTAINER ID   IMAGE                    COMMAND                  CREATED        STATUS        PORTS                                                NAMES
b0cb56cb1941   flink:latest             "/docker-entrypoint.…"   5 hours ago    Up 5 hours    6123/tcp, 8081/tcp                                   taskmanager
0c29ca57a5bb   flink:latest             "/docker-entrypoint.…"   5 hours ago    Up 5 hours    6123/tcp, 0.0.0.0:8081->8081/tcp                     jobmanager
c0e2a3ffc8e0   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   29 hours ago   Up 29 hours   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
b9ae03f8f026   wurstmeister/kafka       "start-kafka.sh"         29 hours ago   Up 29 hours   0.0.0.0:9092->9092/tcp                               kafka

docker เครือข่ายคือ ผลลัพธ์

NETWORK ID     NAME            DRIVER    SCOPE
1c09bfc0a2e9   bridge          bridge    local
268c4521f1de   flink-network   bridge    local
2479a63017ab   host            host      local
bb11245fba78   kafka_default   bridge    local
245dcc1e0d76   none            null      local

แล้วฉันก็วิ่งหนี:

docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka 

และใช้ IP ของฉันมีที่บู๊ทสแตร็พเครื่องแม่ข่าย(172.18.0.2:9092)

ปรับปรุง#1

ฉันใช้ subset ของ docker-compose.yml(โดย Martijn Visser)

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091

  schema-registry:
    image: confluentinc/cp-schema-registry:7.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8091:8091"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.0.0
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'

  jobmanager:
    image: flink:1.13.2-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.13.2-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

docker ps

CONTAINER ID   IMAGE                                   COMMAND                  CREATED          STATUS          PORTS                                            NAMES
2f465a0a4129   confluentinc/cp-kafka-rest:7.0.0        "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   0.0.0.0:8082->8082/tcp                           rest-proxy
eb25992c47d0   confluentinc/cp-schema-registry:7.0.0   "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   8081/tcp, 0.0.0.0:8091->8091/tcp                 schema-registry
1081319da296   confluentinc/cp-kafka:7.0.0             "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp   broker
de9056ee250c   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   30 minutes ago   Up 30 minutes   6123/tcp, 8081/tcp                               kafka_taskmanager_1
b38beefc35e3   confluentinc/cp-zookeeper:7.0.0         "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp       zookeeper
e6db23fa8842   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   30 minutes ago   Up 30 minutes   6123/tcp, 0.0.0.0:8081->8081/tcp                 kafka_jobmanager_1

มันคืออะไรที่ถูกต้องบู๊ทสแตร็พเครื่องแม่ข่ายค่าที่จะใช้?
ฉันจะทำให้ Flink"เห็น"Kafka?

apache-flink apache-kafka docker
2021-11-23 17:16:10
1

คำตอบที่ดีที่สุด

1

ส่วนใหญ่คุณจะต้องปรับแต่ง KAFKA_ADVERTISED_LISTENERS และประเด็น Flink ต้องปรับแต่งค่า. ตัวอย่างเช่นในกของฉัน Docker ตั้งค่าที่ https://github.com/MartijnVisser/flink-only-sql ฉันต้องต่อไปนี้ในการปรับแต่งของฉัน Docker compose แฟ้ม:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  jobmanager:
    image: flink:1.14.0-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
  taskmanager:
    image: flink:1.14.0-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

ฉันสามารถใช้แล้ว broker:29092 เหมือ Kafka bootstrap เซิร์ฟเวอร์.

2021-11-23 17:42:17

มาร์ติน ฉันเอา subset ของ docker compose และใช้มัน (ดูมอดูลปรับปรุง#1). ตอนนี้ฉันอล็อกอินเข้าไปในโบรกเกอร์ตู้คอนเทนเนอร์(ในการสร้างหัวข้อคุเป็นต้น)และฉันอย่าหา kafka*.ขอโทษที่มาสคริปต์. ได้ที่ไหนฉันเจอพวกเขาเหรอ?
balderman

สวัสดี Martijn Visser. ฉันจัดการทำงานเริ่ม(ใช้ broker:29092). ประเด็นคือของลูกค้าไม่มีข้อความ ฉันรู้ว่าฉันส่งจดหมายไปเรื่อยตั้งแต่ฉันมีโปรดิวเซอร์/ของลูกค้าสคริปต์นั่นได้ผล ยังไง-ฉันยอมรับคำตอบของคุณ
balderman

สวัสดี@balderman-คุณยังคงได้รับข้อผิดพลาดเกิดขึ้นระหว่างข้อความหรือเปล่าตอบสน? คุณสามารถใช้ต่อไปนี้จะเหมือนกัรายการเรื่องหรือเห็นข้อมูลอยู่บนหัวข้อ: docker-compose exec broker kafka-topics \ --list \ --bootstrap-server broker:29092 docker-compose exec broker kafka-console-consumer \ --bootstrap-server broker:29092 \ --topic notifications \ --from-beginning
Martijn Visser

ฉันต้องสร้างข้อความใหม่ที่พออธิบายสถานการณ์ปัจจุบัน. เจอ stackoverflow.com/questions/70100813/... -ขอบคุณ
balderman

ในภาษาอื่นๆ

หน้านี้อยู่ในภาษาอื่นๆ

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................

ดังอยู่ในนี้หมวดหมู่

ดังคำถามอยู่ในนี้หมวดหมู่