首页 后端开发 Python教程 Kafka协议实用指南

Kafka协议实用指南

Dec 28, 2024 pm 05:11 PM

我在低级别上使用过很多 Apache Kafka 协议。仅按照官方指南开始执行此操作并不容易,而且我阅读了很多代码。通过这篇文章,我想一步步指导您从原始值到有意义的请求,为您提供一个良好的开端。

在这篇文章中:

  1. 探索 Kafka 协议代码以及使用 Wireshark 运行的协议。
  2. 学习如何读取和写入原始值。
  3. 组合基元来执行有意义的请求。

我们将使用Python作为编程语言。但是,代码将是零依赖性的,并且可以轻松移植到您选择的语言。

简介

Apache Kafka 有一个自定义的二进制协议,该协议是有版本的,具有各种数据类型、可选字段等。不幸的是,它没有使用像 Protobuf 这样众所周知的序列化格式。协议消息架构以 JSON 格式描述。执行序列化和反序列化的实际 Java 代码是根据此描述生成的。

当你身处Java世界时,你可以使用官方的客户端库。但如果您使用其他平台,则需要依赖第三方实现。它们存在,但主要关注生产者和消费者,很少关注管理客户端的某些方面。如果您需要做其他事情,您就得靠自己了。

这篇文章将帮助您开始破解 Kafka 协议。 (如果您正在为 Kafka 协议寻找现成的 Python(反)序列化库,请查看 Kio1。对于 Rust,请查看我正在开发的库。)

您可以在 Github 上的这个存储库中找到这篇文章中的代码以及更多类似的测试。

协议概述

您可以在此页面找到官方协议说明。我鼓励您熟悉它,至少阅读“预备知识”和“协议”部分。

以下是一些亮点。 Kafka 协议是基于 TCP 的二进制请求-响应协议:

  • 基于 TCP:Kafka 代理侦听 TCP 堆栈上的端口(这提供了一些好处,例如排序保证)。
  • 二进制:消息以二进制形式编码,需要根据预定义的模式进行特殊的序列化和反序列化。
  • 请求-响应:交换由客户端发起,服务器端是被动的,只回复请求。

每种 API 消息类型都由请求和响应对组成,并由称为 API 密钥的数值进行标识。例如,Kafka 最具特色的 RPC 的 Produce 和 Fetch 对应的 API 密钥为 0 和 1。如今,API 消息类型接近 90 种(其中一些是经纪商间的消息类型,而不是客户经纪商间的消息类型)。

请求和响应由版本化模式描述。版本控制允许协议演变,例如添加或删除字段或更改其数据类型。

第一步

您可以执行以下一些操作来开始使用 Kafka 协议。

学习Kafka协议代码

Kafka 代码是该协议(实际上)的真相来源。从 Github 查看 Kafka 代码并切换到您感兴趣的版本(例如 3.8.0):

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

您可以在clients/src/main/resources/common/message中找到JSON格式的API消息定义。每个 JSON 文件包含一条消息2 类型及其所有版本的定义。 client/src/main/resources/common/message/README.md 很好地概述了模式定义格式。注意默认值、灵活版本和标记字段等内容。

除了您感兴趣的具体API消息类型之外,还可以查看clients/src/main/resources/common/message/RequestHeader.json和ResponseHeader.json,它们描述了每个请求-响应交换中使用的标头.

让我们运行代码生成器:

./gradlew processMessages
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

现在你可以在clients/src/ generated/java/org/apache/kafka/common/message中找到生成的类。

看看clients/src/ generated/java/org/apache/kafka/common/message/ApiMessageType.java。该实用程序:

  • 描述了整套现有的 API 消息类型及其架构和版本;
  • 将API消息版本映射到requestHeaderVersion和responseHeaderVersion函数中的请求和响应标头版本。

其他文件是从相应的架构 JSON 一对一生成的(有时带有数据后缀,这是一个兼容性问题)。在这些文件中,您会发现:

  1. 版本化模式定义 SCHEMA_0、SCHEMA_1 等。有时模式在版本之间保持相同。这是正常的,意味着只有请求-响应对应部分发生了变化。
  2. 读取和写入方法,您可以在其中找到协议序列化和反序列化的基本事实。

注意内部类,它们代表了消息的复杂结构。

在 Docker 中运行 Kafka

在 Docker 中运行 Kafka 是一种让代理运行来测试协议或捕获网络交换的便捷方法。从 3.7.0 版本开始,Kafka 团队构建了官方 Docker 镜像,您可以通过以下方式运行:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

如果您对旧版本感兴趣,请在 Docker Hub 中搜索其他镜像。然而,考虑到 Kafka 协议是向后和向前兼容的,这可能是不需要的:新的代理将很好地识别旧的协议版本,并且旧的客户端可以与新的代理进行通信。

如果您阅读本文,您的计算机上可能已经安装了 Kafka 命令行工具,但为了以防万一,您也可以在 Docker 中运行它们。例如,运行此命令来创建一个主题:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

使用 Wireshark 检查协议

熟悉了 Kafka 代码后,让我们看看实际的协议。 Wireshark 是此类检查广泛使用的工具。它可以剖析 Kafka 协议(如果您的版本足够新,则支持最新版本)。

我从4.5.0版本的源代码构建了Wireshark,因为我的操作系统包很旧,无法用新版本解析Kafka协议。 Wireshark 4.5.0 应该主要支持 Kafka 3.7 协议版本。不过,您可以尝试可用的版本,看看它如何适合您。

让我们在环回接口上运行 Wireshark,使用端口 9092 捕获过滤器 (1) 和 kafka 显示过滤器 (2):

Kafka protocol practical guide

创建一个主题,看看 Wireshark 向我们展示了什么:

./gradlew processMessages
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

Kafka protocol practical guide

显示过滤器删除所有不相关的内容,只留下 Kafka 请求和响应。由于 Wireshark 可以理解协议中的大多数消息版本(当然取决于 Wireshark 版本),因此您可以方便地查看每个消息的结构。 Wireshark 也会显示相应的字节。

Wireshark 是一个很棒的调试工具,可以帮助您了解协议在特定情况下如何工作以及您的实现存在什么问题。

读取和写入原始值

该协议定义了许多原始类型,您可以在此处找到完整的描述。让我们为它们实现读写代码。你可以在这个文件中找到所有的函数,也可以查看相应的测试文件。

固定长度整数值:INT8、INT16、INT32、INT64 和 UINT16

这些是已知固定长度的整数:1、2、4 或 8 字节。当然,您可以在整个协议中找到很多这样的字段。在本课程中,您可能会(简单地)看到他们的读写是如何在 Kafka 中实现的。

我们首先定义从缓冲区读取确切字节数的函数3:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

Python 中的 BinaryIO 类型提示表示一个可以从中读取字节并可以写入字节的对象。它有 read、write、tell(用于获取当前位置)、seek(用于更改位置)等方法。

现在我们可以实现读取INT8了:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登录后复制
登录后复制
登录后复制
登录后复制

Kafka 使用 big-endian(又名网络)字节排序,因此 byteorder="big"。

现在写:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

对于 INT16、INT32 和 INT64,我不会重复这一点:唯一显着的区别是字节数(分别为 2、4 和 8)和检查范围 ([-(2**15), 2* *15 - 1]、[-(2**31)、2**31 - 1] 和 [-(2**63)、2**63 - 1]相应地)。

UINT16 与 INT16 类似:

./gradlew processMessages
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

注意这里的signed=False。

布尔值

BOOLEAN 本质上是带有额外逻辑的 INT8:== 0 表示 false,!= 0 表示 true。

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

您可以在MetadataRequestData生成的类的allowAutoTopicCreation字段中看到BOOLEAN的示例。

浮点数64

FLOAT64 是双精度 64 位 IEEE 754 值。 Python 不像 int 那样有用于 float 的 to_bytes 和 from_bytes 。因此,我们将使用标准库中的 struct 模块。

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登录后复制
登录后复制
登录后复制
登录后复制

>d 表示“大端字节顺序中的双精度值”。

UNSIGNED_VARINT:可变长度整数值

可变长度整数是一种允许在值较小时每个值使用较少位数的方法。 Kafka 使用 Protocol Buffers 的 Varint 方法。这个想法很简单:

varint 中的每个字节都有一个连续位,指示其后面的字节是否是 varint 的一部分。这是字节的最高有效位 (MSB)(有时也称为符号位)。低7位是有效负载;生成的整数是通过将其组成字节的 7 位有效负载附加在一起而构建的。

详细信息可以查看Protobuf规范和Kafka实现(读、写)。

此类型本身不用于协议字段,但它用于下面描述的紧凑集合。

让我们实现它。为了信心起见,我们直接从事实来源,Kafka 的 ByteUtils 类中获取一些示例:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登录后复制
登录后复制
登录后复制

运行这个,我们会得到:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
登录后复制
登录后复制

让我们以可能不是最高效但最简单的方式来实现它:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
登录后复制
登录后复制

通用唯一识别码

UUID 是用于唯一标识实体的 128 位值。例如,它们用于在 CreateTopicsResponse 中传递主题 ID。

你可以看到它们在Kafka代码中是如何读写的。重现起来很简单:

def write_int8(value: int, buffer: BinaryIO) -> None:
    if -(2**7) <= value <= 2**7 - 1:
        buffer.write(value.to_bytes(1, byteorder="big", signed=True))
    else:
        raise ValueError(f"Value {value} is out of range for INT8")
登录后复制

请注意,Kafka 将 null/None 视为零 UUID,因此我们在这里也这样做。

弦乐

Kafka协议有4种类型的字符串:

compact non-compact
nullable COMPACT_NULLABLE_STRING NULLABLE_STRING
non-nullable COMPACT_STRING STRING

紧凑性指示字符串长度是使用 INT16 还是使用 UNSIGNED_VARINT 编码。这取决于消息版本(2017年左右推出)。可空性是指该值是否可以为空。这也取决于消息的目的和版本(有时字符串字段在协议演变过程中变得可选)。

字符串在协议中无处不在。例如,查看生成的类MetadataRequestData.MetadataRequestTopic中的字段名称。

字符串的编码非常简单:首先是长度,然后是 UTF-8 编码的正文。允许的最大长度为 32767 字节。空字符串的长度为 -1 并且显然没有正文。

由于紧凑型和非紧凑型之间的唯一区别在于字符串长度的编码方式,因此我们可以为两种模式使用一个函数。

让我们从读取和写入可为空字符串开始:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

不可为 null 的字符串函数可以构建在这些函数之上:

./gradlew processMessages
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

字节数组

字节数组与字符串非常相似。它们具有相同的潜在可空性和紧凑性:

compact non-compact
nullable COMPACT_NULLABLE_BYTES NULLABLE_BYTES
non-nullable COMPACT_BYTES BYTES

它们也以相同的方式编码:长度主体。当然,主体不会被视为 UTF-8 字符串,而是被视为不透明的字节数组。字节数组的最大长度为 2147483647;

您可以在生成的类JoinGroupRequestData.JoinGroupRequestProtocol中找到字段元数据中字节的示例。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

如您所见,这些函数与字符串对应的函数之间的差异很小。

其他阵列

该协议支持字节以外类型的数组:字符串、数字、结构(但不包括嵌套数组):ARRAY 和 COMPACT_ARRAY。 紧凑性与字节数组和字符串相同。

出于某种原因,协议规范中没有明确提及可空性。但是,数组可以为空。这是由架构定义中的 nullableVersions 控制的,如下所示。

考虑到我们已经实现了 read_array_length 和 write_array_length,让我们实现 reader 和 writer 函数:

./gradlew processMessages
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

记录

RECORDS 对 Kafka 记录进行编码。该结构非常复杂,我不会在本指南中描述它(但是,如果您想要它,请在评论中告诉我?️。)为了简单起见,我们可以将记录视为 NULLABLE_BYTES 或 COMPACT_NULLABLE_BYTES (取决于消息版本)。

标记字段

标记字段是 Kafka 协议的扩展,它允许将可选数据附加到消息中。这个想法是双重的:

  1. 如果服务客户端不理解标记的字段,它会将其保存为未知并忽略它。
  2. 如果某个字段很少使用,可以跳过其默认值传输。

例如,看看这个字段。它有 taggedVersions,它表示从哪个版本开始标记该字段(在大多数情况下,它与添加该字段时的版本相同)。

标记字段包含:

  1. UNSIGNED_VARINT 类型的标签。
  2. COMPACT_BYTES类型的数据。

您可以在 KIP-482 中找到有关标记字段的更多详细信息。

让我们实现:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

这里它们的标题是“未知”。已知字段需要在其结构内进行构建。

消息结构

高级消息结构非常简单。根据规范:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登录后复制
登录后复制
登录后复制
登录后复制

也就是说,它是一条消息本身,前面有其大小(以字节为单位)。请求和响应消息均由紧随其后的标头组成。由于某种原因,这没有明确记录4,但你可以相信我吗?或查看代码。

请求和响应头

请求头存在三个版本:0、1、2。它们在协议中指定为:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登录后复制
登录后复制
登录后复制

TAG_BUFFER 就是前面提到的标记字段。

让我们将它们实现为 Python 数据类:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

如您所见,版本 2 中有一些标记字段,没有预期的已知字段。如果某些标记字段被错误地发送到代理,它将被忽略。

响应头存在两个版本:0和1。它们在协议中指定为:

./gradlew processMessages
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

让我们也实现它们:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

我们没有实现请求标头的读取和响应标头的写入。这是为了简洁起见:在我们的示例中,我们不会发送响应标头并接收请求标头,因为我们不会对服务器端进行编程。但是,如果您也对服务器端感兴趣,则需要实现这两个功能(这应该很简单)。

相关ID

特别注意请求和响应标头中的correlation_id 字段。该协议支持管道:客户端每个连接可以有多个未完成的请求。相关 ID 允许其将响应与请求进行匹配。

标头版本选择

必须使用哪个版本是API密钥和消息版本的函数。目前协议指南中尚未明确记录5.
参考生成的类ApiMessageType中的requestHeaderVersion和responseHeaderVersion函数。

发送请求和接收响应

现在,掌握了所有这些知识和代码,让我们最终发送 ApiVersions 请求并接收和读取响应。 ApiVersions 通常是客户端发送的第一个请求。其目的是查找代理支持的 API 版本和功能。我们实施了最新版本 3。

在协议规范中,定义为:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登录后复制
登录后复制
登录后复制
登录后复制

让我们创建数据类:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
登录后复制
登录后复制
登录后复制

回复:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
登录后复制
登录后复制

[api_keys] 表示“api_keys 数组”,其中 api_keys 是下面两行定义的结构体。

将其转换为 Python 数据类:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
登录后复制
登录后复制

当我们谈论数组时,我们需要知道我们是否需要紧凑数组或非紧凑数组。为了找到答案,我们来看看 ApiVersionsRequest.json 中的架构定义。可以看到 "flexibleVersions": "3 ",这意味着从版本 3 开始使用紧凑数组(更多信息请参见 schema 目录中的 README.md)。由于我们在这里使用版本 3,因此我们使用紧凑数组。

实现请求和响应类后,我们可以发送和接收这些请求。对于此 ApiVersions v3,我们需要 v2 请求标头和 v0 响应标头(检查生成的 ApiMessageType.java)。您可以在 ApiVersionsRequest.json 或协议规范中找到 API 密钥 (18)。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制
登录后复制

如果运行此代码,您将看到控制台中打印的响应标头和消息。恭喜,您已经与 Kafka Broker 进行了正确的网络交换!

您会注意到 _unknownTaggedFields 中放入了三个标记字段。生成的 ApiVersionsResponseData 类的读写方法以及 ApiVersionsResponse.json 中的消息定义将帮助您解释它们。考虑一下这个作业吗?


  1. 在我的日常工作中,我们开发了一个开源库 Kio。它允许我们轻松地从 Python 进行任意 Kafka API 调用。序列化/反序列化代码,就像 Kafka 本身一样,是根据 JSON 协议定义生成的。生成的代码经过严格测试,包括针对真实 Java Kafka 代码的属性测试。 ↩

  2. 如果您愿意,也可以使用“消息”:某些模式不适用于 API,但例如对于磁盘上的数据。 ↩

  3. read_exact 函数有一个缺点,即当底层缓冲区已在内存中时,它会复制数据。然而,它对于教育目的来说更方便。 ↩

  4. 我做了一个 PR 来解决这个问题。 ↩

  5. 我再次发布了 PR 来解决这个问题。 ↩

以上是Kafka协议实用指南的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

如何解决Linux终端中查看Python版本时遇到的权限问题? 如何解决Linux终端中查看Python版本时遇到的权限问题? Apr 01, 2025 pm 05:09 PM

Linux终端中查看Python版本时遇到权限问题的解决方法当你在Linux终端中尝试查看Python的版本时,输入python...

如何在使用 Fiddler Everywhere 进行中间人读取时避免被浏览器检测到? 如何在使用 Fiddler Everywhere 进行中间人读取时避免被浏览器检测到? Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere进行中间人读取时如何避免被检测到当你使用FiddlerEverywhere...

在Python中如何高效地将一个DataFrame的整列复制到另一个结构不同的DataFrame中? 在Python中如何高效地将一个DataFrame的整列复制到另一个结构不同的DataFrame中? Apr 01, 2025 pm 11:15 PM

在使用Python的pandas库时,如何在两个结构不同的DataFrame之间进行整列复制是一个常见的问题。假设我们有两个Dat...

Uvicorn是如何在没有serve_forever()的情况下持续监听HTTP请求的? Uvicorn是如何在没有serve_forever()的情况下持续监听HTTP请求的? Apr 01, 2025 pm 10:51 PM

Uvicorn是如何持续监听HTTP请求的?Uvicorn是一个基于ASGI的轻量级Web服务器,其核心功能之一便是监听HTTP请求并进�...

如何在10小时内通过项目和问题驱动的方式教计算机小白编程基础? 如何在10小时内通过项目和问题驱动的方式教计算机小白编程基础? Apr 02, 2025 am 07:18 AM

如何在10小时内教计算机小白编程基础?如果你只有10个小时来教计算机小白一些编程知识,你会选择教些什么�...

在Linux终端中使用python --version命令时如何解决权限问题? 在Linux终端中使用python --version命令时如何解决权限问题? Apr 02, 2025 am 06:36 AM

Linux终端中使用python...

如何绕过Investing.com的反爬虫机制获取新闻数据? 如何绕过Investing.com的反爬虫机制获取新闻数据? Apr 02, 2025 am 07:03 AM

攻克Investing.com的反爬虫策略许多人尝试爬取Investing.com(https://cn.investing.com/news/latest-news)的新闻数据时,常常�...

See all articles