首頁 後端開發 Python教學 Kafka協議實用指南

Kafka協議實用指南

Dec 28, 2024 pm 05:11 PM

我在低階使用過很多 Apache Kafka 協定。僅按照官方指南開始執行此操作並不容易,而且我閱讀了很多程式碼。透過這篇文章,我想一步步指導您從原始值到有意義的請求,為您提供一個良好的開端。

在這篇文章中:

  1. 探索 Kafka 協定程式碼以及使用 Wireshark 運行的協定。
  2. 學習如何讀取和寫入原始值。
  3. 組合基元來執行有意義的請求。

我們將使用Python作為程式語言。但是,程式碼將是零依賴性的,並且可以輕鬆移植到您選擇的語言。

簡介

Apache Kafka 有一個自訂的二進位協議,該協議是有版本的,具有各種資料類型、可選欄位等。不幸的是,它沒有使用像 Protobuf 這樣眾所周知的序列化格式。協定訊息架構以 JSON 格式描述。執行序列化和反序列化的實際 Java 程式碼是根據此描述產生的。

當你身處Java世界時,你可以使用官方的客戶端函式庫。但如果您使用其他平台,則需要依賴第三方實作。它們存在,但主要關註生產者和消費者,很少關注管理客戶端的某些方面。如果您需要做其他事情,您就得靠自己了。

這篇文章將幫助您開始破解 Kafka 協定。 (如果您正在為 Kafka 協定尋找現成的 Python(反)序列化函式庫,請查看 Kio1。對於 Rust,請查看我正在開發的程式庫。)

您可以在 Github 上的這個儲存庫中找到這篇文章中的程式碼以及更多類似的測試。

協議概述

您可以在此頁面找到官方協議說明。我鼓勵您熟悉它,至少閱讀“預備知識”和“協議”部分。

以下是一些亮點。 Kafka 協定是基於 TCP 的二進位請求-回應協定:

  • 基於 TCP:Kafka 代理程式偵聽 TCP 堆疊上的連接埠(這提供了一些好處,例如排序保證)。
  • 二進位:訊息以二進位形式編碼,需要根據預先定義的模式進行特殊的序列化和反序列化。
  • 請求-回應:交換由客戶端發起,伺服器端是被動的,只回覆請求。

每種 API 訊息類型都由請求和回應對組成,並由稱為 API 金鑰的數值進行識別。例如,Kafka 最具特色的 RPC 的 Produce 和 Fetch 對應的 API 金鑰為 0 和 1。如今,API 訊息類型接近 90 種(其中一些是經紀商間的訊息類型,而不是客戶經紀商間的訊息類型)。

請求和回應由版本化模式描述。版本控制允許協定演變,例如新增或刪除欄位或變更其資料類型。

第一步

您可以執行以下一些操作來開始使用 Kafka 協定。

學習Kafka協議代碼

Kafka 程式碼是該協定(實際上)的真相來源。從 Github 查看 Kafka 程式碼並切換到您感興趣的版本(例如 3.8.0):

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

您可以在clients/src/main/resources/common/message中找到JSON格式的API訊息定義。每個 JSON 檔案包含一條訊息2 類型及其所有版本的定義。 client/src/main/resources/common/message/README.md 很好地概述了模式定義格式。注意預設值、靈活版本和標記欄位等內容。

除了您感興趣的特定API訊息類型之外,還可以查看clients/src/main/resources/common/message/RequestHeader.json和ResponseHeader.json,它們描述了每個請求-回應交換中使用的標頭.

讓我們運行程式碼產生器:

./gradlew processMessages
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

現在你可以在clients/src/ generated/java/org/apache/kafka/common/message中找到產生的類別。

看看clients/src/ generated/java/org/apache/kafka/common/message/ApiMessageType.java。該實用程式:

  • 描述了整套現有的 API 訊息類型及其架構和版本;
  • 將API訊息版本對應到requestHeaderVersion和responseHeaderVersion函數中的請求和回應標頭版本。

其他檔案是從對應的架構 JSON 一對一產生的(有時帶有資料後綴,這是相容性問題)。在這些文件中,您會發現:

  1. 版本化模式定義 SCHEMA_0、SCHEMA_1 等。有時模式在版本之間保持相同。這是正常的,這意味著只有請求-回應對應部分發生了變化。
  2. 讀取和寫入方法,您可以在其中找到協議序列化和反序列化的基本事實。

注意內部類,它們代表了訊息的複雜結構。

在 Docker 中運行 Kafka

在 Docker 中運行 Kafka 是一種讓代理程式運行來測試協定或捕獲網路交換的便捷方法。從 3.7.0 版本開始,Kafka 團隊建立了官方 Docker 映像,您可以透過以下方式運行:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

如果您對舊版本感興趣,請在 Docker Hub 中搜尋其他映像。然而,考慮到 Kafka 協定是向後和向前相容的,這可能是不需要的:新的代理將很好地識別舊的協定版本,並且舊的客戶端可以與新的代理進行通訊。

如果您閱讀本文,您的電腦上可能已經安裝了 Kafka 命令列工具,但為了以防萬一,您也可以在 Docker 中執行它們。例如,執行此命令來建立主題:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

使用 Wireshark 檢查協議

熟悉了 Kafka 程式碼後,讓我們看看實際的協定。 Wireshark 是此類檢查廣泛使用的工具。它可以剖析 Kafka 協定(如果您的版本足夠新,則支援最新版本)。

我從4.5.0版本的原始碼建置了Wireshark,因為我的作業系統套件很舊,無法用新版本解析Kafka協定。 Wireshark 4.5.0 應該主要支援 Kafka 3.7 協定版本。不過,您可以嘗試可用的版本,看看它如何適合您。

讓我們在環回介面上執行 Wireshark,使用連接埠 9092 擷取過濾器 (1) 和 kafka 顯示過濾器 (2):

Kafka protocol practical guide

建立一個主題,看看 Wireshark 向我們展示了什麼:

./gradlew processMessages
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

Kafka protocol practical guide

顯示過濾器刪除所有不相關的內容,只留下 Kafka 請求和回應。由於 Wireshark 可以理解協定中的大多數訊息版本(當然取決於 Wireshark 版本),因此您可以輕鬆查看每個訊息的結構。 Wireshark 也會顯示對應的位元組。

Wireshark 是一個很棒的偵錯工具,可以幫助您了解協定在特定情況下如何運作以及您的實作有什麼問題。

讀取和寫入原始值

該協定定義了許多原始類型,您可以在此處找到完整的描述。讓我們為它們實作讀寫程式碼。你可以在這個檔案中找到所有的函數,也可以查看對應的測試檔案。

固定長度整數值:INT8、INT16、INT32、INT64 和 UINT16

這些是已知固定長度的整數:1、2、4 或 8 個位元組。當然,您可以在整個協議中找到很多這樣的欄位。在本課程中,您可能會(簡單地)看到他們的讀寫是如何在 Kafka 中實現的。

我們先定義從緩衝區讀取確切位元組數的函數3:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

Python 中的 BinaryIO 類型提示表示一個可以從中讀取位元組並可以寫入位元組的物件。它有 read、write、tell(用於獲取當前位置)、seek(用於更改位置)等方法。

現在我們可以實作讀取INT8了:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登入後複製
登入後複製
登入後複製
登入後複製

Kafka 使用 big-endian(又稱網路)位元組排序,因此 byteorder="big"。

現在寫:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

對於INT16、INT32 和INT64,我不會重複這一點:唯一顯著的區別是位元組數(分別為2、4 和8)和檢查範圍([-(2**15), 2* *15 - 1]、[-(2**31)、2**31 - 1] 和[-(2**63)、2**63 - 1]相應地)。

UINT16 與 INT16 類似:

./gradlew processMessages
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

注意這裡的signed=False。

布林值

BOOLEAN 本質上是帶有額外邏輯的 INT8:== 0 表示 false,!= 0 表示 true。

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

您可以在MetadataRequestData產生的類別的allowAutoTopicCreation欄位中看到BOOLEAN的範例。

浮點數64

FLOAT64 是雙精確度 64 位元 IEEE 754 值。 Python 不像 int 有用於 float 的 to_bytes 和 from_bytes 。因此,我們將使用標準庫中的 struct 模組。

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登入後複製
登入後複製
登入後複製
登入後複製

>d 表示「大端位元組順序中的雙精確值」。

UNSIGNED_VARINT:可變長度整數值

可變長度整數是一種允許在數值較小時每個值使用較少位數的方法。 Kafka 使用 Protocol Buffers 的 Varint 方法。這個想法很簡單:

varint 中的每個位元組都有一個連續位,指示其後面的位元組是否是 varint 的一部分。這是位元組的最高有效位元 (MSB)(有時也稱為符號位元)。低7位元是有效負載;產生的整數是透過將其組成位元組的 7 位元有效負載附加在一起而建構的。

詳細資訊可以查看Protobuf規格和Kafka實作(讀、寫)。

此類型本身不用於協定字段,但它用於下面描述的緊湊集合。

讓我們實現它。為了信心起見,我們直接從事實來源,Kafka 的 ByteUtils 類別中取得一些範例:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登入後複製
登入後複製
登入後複製

運行這個,我們會得到:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
登入後複製
登入後複製

讓我們以可能不是最高效但最簡單的方式來實現它:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
登入後複製
登入後複製

通用唯一識別碼

UUID 是用於唯一識別實體的 128 位元值。例如,它們用於在 CreateTopicsResponse 中傳遞主題 ID。

你可以看到它們在Kafka程式碼中是如何讀寫的。重現很簡單:

def write_int8(value: int, buffer: BinaryIO) -> None:
    if -(2**7) <= value <= 2**7 - 1:
        buffer.write(value.to_bytes(1, byteorder="big", signed=True))
    else:
        raise ValueError(f"Value {value} is out of range for INT8")
登入後複製

請注意,Kafka 將 null/None 視為零 UUID,因此我們在這裡也這樣做。

弦樂

Kafka協定有4種類型的字串:

compact non-compact
nullable COMPACT_NULLABLE_STRING NULLABLE_STRING
non-nullable COMPACT_STRING STRING

緊湊性指示字串長度是使用 INT16 還是使用 UNSIGNED_VARINT 編碼。這取決於訊息版本(2017年左右推出)。可空性是指該值是否可以為空。這也取決於訊息的目的和版本(有時字串欄位在協定演變過程中變得可選)。

字串在協定中無所不在。例如,查看產生的類別MetadataRequestData.MetadataRequestTopic中的欄位名稱。

字串的編碼非常簡單:首先是長度,然後是 UTF-8 編碼的正文。允許的最大長度為 32767 位元組。空字串的長度為 -1 並且顯然沒有正文。

由於緊湊型和非緊湊型之間的唯一區別在於字串長度的編碼方式,因此我們可以為兩種模式使用一個函數。

讓我們從讀取和寫入可為空字串開始:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

不可為 null 的字串函數可以建構在這些函數之上:

./gradlew processMessages
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

位元組數組

位元組數組與字串非常相似。它們具有相同的潛在可空性和緊湊性:

compact non-compact
nullable COMPACT_NULLABLE_BYTES NULLABLE_BYTES
non-nullable COMPACT_BYTES BYTES

它們也以相同的方式編碼:長度主體。當然,主體不會被視為 UTF-8 字串,而是被視為不透明的位元組數組。位元組數組的最大長度為 2147483647;

您可以在產生的類別JoinGroupRequestData.JoinGroupRequestProtocol中找到字段元資料中位元組的範例。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

如您所見,這些函數與字串對應的函數之間的差異很小。

其他陣列

此協定支援位元組以外類型的陣列:字串、數字、結構(但不包括巢狀陣列):ARRAY 和 COMPACT_ARRAY。 緊湊性與位元組數組和字串相同。

出於某種原因,協議規範中沒有明確提及可空性。但是,數組可以為空。這是由架構定義中的 nullableVersions 控制的,如下所示。

考慮到我們已經實作了 read_array_length 和 write_array_length,讓我們實作 reader 和 writer 函數:

./gradlew processMessages
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

記錄

RECORDS 對 Kafka 記錄進行編碼。這個結構非常複雜,我不會在本指南中描述它(但是,如果您想要它,請在評論中告訴我?️。)為了簡單起見,我們可以將記錄視為NULLABLE_BYTES 或COMPACT_NULLABLE_BYTES (取決於訊息版本)。

標記字段

標記欄位是 Kafka 協定的擴展,它允許將可選資料附加到訊息中。這個想法是雙重的:

  1. 如果服務客戶端不理解標記的字段,它會將其保存為未知並忽略它。
  2. 如果某個欄位很少使用,可以跳過其預設值傳輸。

例如,看看這個欄位。它有 taggedVersions,它表示從哪個版本開始標記該欄位(在大多數情況下,它與新增該欄位時的版本相同)。

標記欄位包含:

  1. UNSIGNED_VARINT 類型的標籤。
  2. COMPACT_BYTES類型的資料。

您可以在 KIP-482 中找到有關標記欄位的更多詳細資訊。

讓我們實現:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

這裡它們的標題是「未知」。已知字段需要在其結構內進行建構。

訊息結構

進階訊息結構非常簡單。依規範:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登入後複製
登入後複製
登入後複製
登入後複製

也就是說,它是一條訊息本身,前面有其大小(以位元組為單位)。請求和回應訊息均由緊跟其後的標頭組成。由於某種原因,這沒有明確記錄4,但你可以相信我嗎?或查看代碼。

請求和回應頭

請求頭存在三個版本:0、1、2。它們在協定中指定為:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登入後複製
登入後複製
登入後複製

TAG_BUFFER 就是前面提到的標記欄位。

讓我們將它們實作為 Python 資料類別:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

如您所見,版本 2 中有一些標記字段,沒有預期的已知字段。如果某些標記欄位被錯誤地傳送到代理,它將被忽略。

回應頭存在兩個版本:0和1。它們在協定中指定為:

./gradlew processMessages
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

讓我們也實現它們:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

我們沒有實作請求標頭的讀取和回應標頭的寫入。這是為了簡潔起見:在我們的範例中,我們不會發送回應標頭並接收請求標頭,因為我們不會對伺服器端進行程式設計。但是,如果您也對伺服器端感興趣,則需要實現這兩個功能(這應該很簡單)。

相關ID

特別注意請求和回應標頭中的correlation_id 欄位。此協定支援管道:客戶端每個連線可以有多個未完成的請求。相關 ID 允許其將回應與請求進行配對。

標頭版本選擇

必須使用哪個版本是API金鑰和訊息版本的函數。目前協議指南中尚未明確記錄5.
參考生成的類別ApiMessageType中的requestHeaderVersion和responseHeaderVersion函數。

發送請求和接收回應

現在,掌握了所有這些知識和程式碼,讓我們最終發送 ApiVersions 請求並接收和讀取回應。 ApiVersions 通常是客戶端發送的第一個請求。其目的是查找代理程式支援的 API 版本和功能。我們實作了最新版本 3。

在協定規範中,定義為:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登入後複製
登入後複製
登入後複製
登入後複製

讓我們建立資料類別:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登入後複製
登入後複製
登入後複製

回覆:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
登入後複製
登入後複製

[api_keys] 表示“api_keys 陣列”,其中 api_keys 是下面兩行定義的結構體。

將其轉換為 Python 資料類別:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
登入後複製
登入後複製

當我們談論數組時,我們需要知道我們是否需要緊湊數組或非緊湊數組。為了找到答案,讓我們來看看 ApiVersionsRequest.json 中的架構定義。可以看到 "flexibleVersions": "3 ",這意味著從版本 3 開始使用緊湊數組(更多資訊請參閱 schema 目錄中的 README.md)。由於我們在這裡使用版本 3,因此我們使用緊湊數組。

實現請求和回應類別後,我們可以發送和接收這些請求。對於此 ApiVersions v3,我們需要 v2 請求標頭和 v0 回應標頭(檢查產生的 ApiMessageType.java)。您可以在 ApiVersionsRequest.json 或協定規格中找到 API 金鑰 (18)。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

如果執行此程式碼,您將看到控制台中列印的回應標頭和訊息。恭喜,您已經與 Kafka Broker 進行了正確的網路交換!

您會注意到 _unknownTaggedFields 中放入了三個標記欄位。產生的 ApiVersionsResponseData 類別的讀寫方法以及 ApiVersionsResponse.json 中的訊息定義將幫助您解釋它們。考慮一下這個作業嗎?


  1. 在我的日常工作中,我們開發了一個開源程式庫 Kio。它允許我們輕鬆地從 Python 進行任意 Kafka API 呼叫。序列化/反序列化程式碼,就像 Kafka 本身一樣,是根據 JSON 協定定義產生的。產生的程式碼經過嚴格測試,包括針對真實 Java Kafka 程式碼的屬性測試。 ↩

  2. 如果您願意,也可以使用「訊息」:某些模式不適用於 API,但例如磁碟上的資料。 ↩

  3. read_exact 函數有一個缺點,即當底層緩衝區已在記憶體中時,它會複製資料。然而,它對於教育目的來說更方便。 ↩

  4. 我做了一個 PR 來解決這個問題。 ↩

  5. 我再次發布了 PR 來解決這個問題。 ↩

以上是Kafka協議實用指南的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? 如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...

在Linux終端中使用python --version命令時如何解決權限問題? 在Linux終端中使用python --version命令時如何解決權限問題? Apr 02, 2025 am 06:36 AM

Linux終端中使用python...

如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? 如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? Apr 02, 2025 am 07:18 AM

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

如何繞過Investing.com的反爬蟲機制獲取新聞數據? 如何繞過Investing.com的反爬蟲機制獲取新聞數據? Apr 02, 2025 am 07:03 AM

攻克Investing.com的反爬蟲策略許多人嘗試爬取Investing.com(https://cn.investing.com/news/latest-news)的新聞數據時,常常�...

Python 3.6加載pickle文件報錯ModuleNotFoundError: No module named '__builtin__'怎麼辦? Python 3.6加載pickle文件報錯ModuleNotFoundError: No module named '__builtin__'怎麼辦? Apr 02, 2025 am 06:27 AM

Python3.6環境下加載pickle文件報錯:ModuleNotFoundError:Nomodulenamed...

使用Scapy爬蟲時,管道文件無法寫入的原因是什麼? 使用Scapy爬蟲時,管道文件無法寫入的原因是什麼? Apr 02, 2025 am 06:45 AM

使用Scapy爬蟲時管道文件無法寫入的原因探討在學習和使用Scapy爬蟲進行數據持久化存儲時,可能會遇到管道文�...

See all articles