使用 Kafka 和 Docker 開發事件驅動的應用程式

隨著微服務的興起,事件驅動的架構變得越來越流行。Apache Kafka 是一個分散式事件流平臺,通常是這些架構的核心。然而,為開發目的設定和部署自己的 Kafka 例項通常很棘手。幸運的是,Docker 和容器讓這一切變得容易得多。

在本指南中,您將學習如何

  1. 使用 Docker 啟動 Kafka 叢集
  2. 將非容器化的應用連線到叢集
  3. 將容器化的應用連線到叢集
  4. 部署 Kafka-UI 以幫助進行故障排除和除錯

先決條件

要學習本操作指南,需要滿足以下先決條件

啟動 Kafka

Kafka 3.3 開始,Kafka 的部署得到了極大的簡化,這要歸功於 KRaft(Kafka Raft),不再需要 Zookeeper。有了 KRaft,為本地開發設定 Kafka 例項變得容易得多。隨著 Kafka 3.8 的釋出,一個新的 kafka-native Docker 映象現已可用,其啟動速度顯著加快,記憶體佔用也更低。

提示

本指南將使用 apache/kafka 映象,因為它包含了許多用於管理和操作 Kafka 的實用指令碼。不過,您可能希望使用 apache/kafka-native 映象,因為它啟動更快,所需資源更少。

啟動 Kafka

按照以下步驟啟動一個基本的 Kafka 叢集。本示例將啟動一個叢集,將埠 9092 暴露到主機上,以便本地執行的應用程式可以連線到它。

  1. 透過執行以下命令啟動 Kafka 容器

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. 映象拉取完成後,您將在幾秒鐘內擁有一個執行中的 Kafka 例項。

  3. apache/kafka 映象在 /opt/kafka/bin 目錄中附帶了幾個有用的指令碼。執行以下命令以驗證叢集是否已啟動並執行,並獲取其叢集 ID

    $ docker exec -ti kafka /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server :9092
    

    這樣做會產生類似以下的輸出

    Cluster ID: 5L6g3nShT-eMCtK--X86sw
  4. 透過執行以下命令建立一個示例主題並生產(或釋出)幾條訊息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    執行後,您可以每行輸入一條訊息。例如,每行輸入幾條訊息。一些示例可能是

    First message

    Second message

    enter 鍵傳送最後一條訊息,完成後按 ctrl+c。訊息將被髮布到 Kafka。

  5. 透過消費訊息來確認訊息已釋出到叢集中

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic demo --from-beginning
    

    然後您應該會在輸出中看到您的訊息

    First message
    Second message

    如果需要,您可以開啟另一個終端併發布更多訊息,並看到它們出現在消費者中。

    完成後,按 ctrl+c 停止消費訊息。

您現在擁有一個本地執行的 Kafka 叢集,並且已經驗證了您可以連線到它。

從未容器化的應用連線到 Kafka

現在您已經展示了可以從命令列連線到 Kafka 例項,是時候從應用程式連線到叢集了。在本例中,您將使用一個簡單的 Node 專案,該專案使用 KafkaJS 庫。

由於叢集在本地執行並暴露在埠 9092,應用程式可以連線到 localhost:9092 上的叢集(因為它目前是在本地而不是在容器中執行)。連線後,這個示例應用程式將記錄它從 demo 主題消費的訊息。此外,當它在開發模式下執行時,如果主題不存在,它也會建立該主題。

  1. 如果您沒有執行上一步的 Kafka 叢集,請執行以下命令啟動一個 Kafka 例項

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. GitHub 倉庫克隆到本地。

    $ git clone https://github.com/dockersamples/kafka-development-node.git
    
  3. 進入專案目錄。

    cd kafka-development-node/app
    
  4. 使用 yarn 安裝依賴項。

    $ yarn install
    
  5. 使用 yarn dev 啟動應用程式。這會將 NODE_ENV 環境變數設定為 development 並使用 nodemon 來監視檔案更改。

    $ yarn dev
    
  6. 應用程式現在正在執行,它會將接收到的訊息記錄到控制檯。在一個新的終端中,使用以下命令釋出幾條訊息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    然後向叢集傳送一條訊息

    Test message

    完成後,請記得按 ctrl+c 停止生產訊息。

同時從容器和本地應用連線到 Kafka

現在您有了一個透過其暴露埠連線到 Kafka 的應用程式,是時候探索從另一個容器連線到 Kafka 需要進行哪些更改了。為此,您現在將從容器中而不是本地執行該應用程式。

但在開始之前,瞭解 Kafka 監聽器的工作原理以及這些監聽器如何幫助客戶端連線是很重要的。

理解 Kafka 監聽器

當客戶端連線到 Kafka 叢集時,它實際上連線到一個“broker”。雖然 broker 有很多角色,但其中之一是支援客戶端的負載均衡。當客戶端連線時,broker 會返回一組連線 URL,客戶端應該使用這些 URL 來生產或消費訊息。這些連線 URL 是如何配置的呢?

每個 Kafka 例項都有一組監聽器和宣告監聽器。“監聽器”是 Kafka 繫結的地址,而“宣告監聽器”則配置了客戶端應該如何連線到叢集。客戶端收到的連線 URL 是基於客戶端連線到哪個監聽器。

定義監聽器

為了更好地理解,我們來看看如何配置 Kafka 以支援兩種連線方式

  1. 主機連線(透過主機的對映埠傳入的連線) - 這些需要使用 localhost 連線
  2. Docker 連線(從 Docker 網路內部傳入的連線) - 這些不能使用 localhost 連線,而應使用 Kafka 服務的網路別名(或 DNS 地址)

由於客戶端需要透過兩種不同的方式進行連線,因此需要兩個不同的監聽器 - HOSTDOCKERHOST 監聽器會告訴客戶端使用 localhost:9092 連線,而 DOCKER 監聽器會通知客戶端使用 kafka:9093 連線。請注意,這意味著 Kafka 同時在埠 9092 和 9093 上監聽。但是,只有主機監聽器需要暴露給主機。

Diagram showing the DOCKER and HOST listeners and how they are exposed to the host and Docker networks

為了進行此設定,Kafka 的 compose.yaml 檔案需要一些額外的配置。一旦您開始覆蓋某些預設設定,您還需要指定其他一些選項才能使 KRaft 模式工作。

services:
  kafka:
    image: apache/kafka-native
    ports:
      - "9092:9092"
    environment:
      # Configure listeners for both docker and host communication
      KAFKA_LISTENERS: CONTROLLER://:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: HOST://:9092,DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT

      # Settings required for KRaft mode
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091

      # Listener to use for broker-to-broker communication
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

      # Required for a single node cluster
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

請按照以下步驟嘗試一下。

  1. 如果您正在執行上一步的 Node 應用程式,請在終端中按 ctrl+c 將其停止。

  2. 如果您正在執行上一節的 Kafka 叢集,請使用以下命令停止該容器

    $ docker rm -f kafka
    
  3. 在克隆的專案目錄的根目錄下執行以下命令來啟動 Compose 堆疊

    $ docker compose up
    

    片刻之後,應用程式將啟動並執行。

  4. 堆疊中還有另一個服務可以用來發布訊息。透過訪問 https://:3000 來開啟它。當您輸入訊息並提交表單時,您應該會看到應用程式接收到訊息的日誌記錄。

    這有助於演示容器化方法如何輕鬆地新增額外的服務來幫助測試和排查您的應用程式。

新增叢集視覺化工具

一旦您開始在開發環境中使用容器,您就會意識到新增僅專注於輔助開發的其他服務是多麼容易,例如視覺化工具和其他支援服務。既然您已經運行了 Kafka,那麼視覺化 Kafka 叢集中發生的事情可能會很有幫助。為此,您可以執行 Kafbat UI Web 應用程式

要將其新增到您自己的專案中(它已經包含在演示應用程式中),您只需將以下配置新增到您的 Compose 檔案中即可

services:
  kafka-ui:
    image: kafbat/kafka-ui:main
    ports:
      - 8080:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: "true"
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
    depends_on:
      - kafka

然後,一旦 Compose 堆疊啟動,您就可以在瀏覽器中開啟 https://:8080 並四處瀏覽以檢視有關叢集的更多詳細資訊、檢查消費者、釋出測試訊息等等。

使用 Kafka進行測試

如果您有興趣瞭解如何輕鬆地將 Kafka 整合到您的整合測試中,請檢視使用 Testcontainers 測試 Spring Boot Kafka 監聽器指南。本指南將教您如何使用 Testcontainers 來管理測試中 Kafka 容器的生命週期。

結論

透過使用 Docker,您可以簡化使用 Kafka 開發和測試事件驅動應用程式的過程。容器簡化了設定和部署開發所需各種服務的過程。一旦它們在 Compose 中定義,團隊中的每個人都可以從其易用性中受益。

如果您之前錯過了,所有的示例應用程式程式碼都可以在 dockersamples/kafka-development-node 中找到。