老子今天开始卷Kafka
本文翻译自《KAFKA CONNECT》
8. KAFKA CONNECT
8.1 概述
Kafka Connect是一个用于在Apache Kafka和其他系统之间可扩展和可靠的数据流的工具。它使快速定义connector变得简单,将大量的数据集合移入和移出Kafka。Kafka Connect可以摄取整个数据库或从你所有的应用服务器收集指标到Kafka topic,使数据可用于低延迟的流处理。输出作业可以将数据从Kafka主题传送到二级存储和查询系统中,或传送到批处理系统中进行离线分析。
Kafka Connect的特点包括:
- Kafkaconnector的通用框架 - Kafka Connect将其他数据系统与Kafka的集成标准化,简化了connector的开发、部署和管理
- 分布式和独立模式–可扩展为支持整个组织的大型集中管理服务,也可扩展为开发、测试和小型生产部署的模式
- REST接口–通过一个易于使用的REST API提交和管理connector到你的Kafka Connect集群
- 自动偏移管理–只需来自connector的少量信息,Kafka Connect就能自动管理偏移提交过程,因此connector开发人员不需要担心connector开发中这个容易出错的部分。
- 默认情况下是分布式和可扩展的–Kafka Connect建立在现有的组管理协议之上。可以添加更多的工作者来扩大Kafka Connect集群的规模。
- 流处理/批处理集成–利用Kafka现有的能力,Kafka Connect是桥接流流处理和批处理数据系统的理想解决方案。
8.2 用户指南
快速入门提供了一个简短的例子,说明如何运行独立版本的Kafka Connect。本节介绍了如何配置、运行和管理Kafka Connect的更多细节。
运行Kafka Connect
Kafka Connect目前支持两种执行模式:standalone(单进程)和分布式。
在standalone模式下,所有的工作都在一个单一的进程中执行。这种配置在设置和开始使用时比较简单,在只有一个工作者有意义的情况下可能很有用(例如收集日志文件),但它不能从Kafka Connect的一些功能中受益,例如容错。你可以用以下命令启动一个独立的进程。
1 | > bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...] |
第一个参数是worker的配置。这包括诸如Kafka连接参数、序列化格式以及提交偏移量的频率等设置。所提供的例子应该能够很好地适用于使用config/server.properties所提供的默认配置运行的本地集群。它将需要调整以用于不同的配置或生产部署。所有工作者(包括独立的和分布式的)都需要一些配置。
- bootstrap.services - 用于引导连接到Kafka的Kafka服务器的列表
- key.converter - 转换器类,用于转换Kafka Connect格式和写入Kafka的序列化形式。这可以控制写入Kafka或从Kafka读取的消息中的键的格式,由于这与connector无关,它允许任何connector与任何序列化格式一起工作。常见格式的例子包括JSON和Avro。
- value.converter - 转换器类,用于转换Kafka Connect格式和写入Kafka的序列化形式。这可以控制写入Kafka或从Kafka读取的消息中的值的格式,由于这与connector无关,它允许任何connector与任何序列化格式一起工作。常见格式的例子包括JSON和Avro。
standalone模式特有的重要配置选项是:
- offset.storage.file.filename - 储存偏移数据的文件
这里配置的参数是为Kafka Connect使用的生产者和消费者准备的,用于访问配置、偏移和状态topic。对于Kafka source任务使用的生产者和Kafka sink任务使用的消费者的配置,可以使用相同的参数,但需要分别以producer.和consumer.为前缀。唯一从worker配置中继承的没有前缀的Kafka客户端参数是bootstrap.services,在大多数情况下这就足够了,因为同一个集群经常被用于所有目的。一个值得注意的例外是安全集群,它需要额外的参数来允许连接。这些参数需要在worker配置中最多设置三次,一次用于管理访问,一次用于Kafka source,一次用于Kafka sink。
从2.3.0开始,客户端配置覆盖可以通过使用前缀producer.override.和consumer.override.分别针对Kafka source或Kafka sink单独配置每个connector。这些覆盖将与connector的其他配置属性一起包含。
剩下的参数是connector的配置文件。你可以包括你想要的数量,但所有的将在同一进程中执行(在不同的线程上)。
分布式模式处理工作的自动平衡,允许你动态地扩大(或缩小)规模,并在活动任务以及配置和偏移提交数据方面提供容错。执行方式与standalone模式非常相似。
1 | > bin/connect-distributed.sh config/connect-distributed.properties |
区别在于启动的类和配置参数,它们改变了Kafka Connect进程如何决定在哪里存储配置、如何分配工作以及在哪里存储偏移量和任务状态。在分布式模式下,Kafka Connect在Kafka topic中存储偏移量、配置和任务状态。建议为偏移量、配置和状态手动创建topic,以实现所需的分区数量和复制因子。如果在启动Kafka Connect时还没有创建话题,那么话题将以默认的分区数和复制系数自动创建,这可能不适合其使用。
特别是,除了上面提到的常见设置外,以下配置参数在启动集群前的设置至关重要:
- group.id (default connect-cluster) - 集群的唯一名称,用于形成连接集群组;注意,这不能与消费者组的ID冲突
- config.storage.topic (default connect-configs) - 用于存储connector和任务配置的topic;注意,这应该是一个单分区、高度复制、紧凑的topic。你可能需要手动创建topic以确保正确的配置,因为自动创建的topic可能有多个分区,或者自动配置为删除而不是压缩。
- offset.storage.topic(默认为connect-offsets) -用于存储偏移量的topic;这个topic应该有很多分区,被复制,并被配置为压实的。
- status.storage.topic (default connect-status) - 用于存储状态的topic;这个topic可以有多个分区,并且应该被复制和配置为压实。
注意,在分布式模式下,connector的配置不会在命令行上传递。相反,使用下面描述的REST API来创建、修改和销毁connector。
配置Connectors
连接器的配置是简单的键值映射。对于standalone模式,这些配置被定义在一个属性文件中,并在命令行中传递给Connect进程。在分布式模式下,它们将被包含在创建(或修改)连接器的请求的JSON有效载荷中。
大多数配置都与连接器有关,所以不能在此概述。然而,有几个常见的选项:
- name - 连接器的唯一名称。试图用相同的名字再次注册将会失败。
- connector.class - 连接器的Java类。
- tasks.max - 这个连接器应该创建的最大任务数。如果连接器不能达到这个水平的并行性,它可以创建更少的任务。
- key.converter - (可选)覆盖工作者设置的默认键转换器。
- value.converter - (可选) 覆盖工作者设置的默认值转换器。
Connector.class配置支持几种格式:这个连接器的类的全名或别名。如果连接器是org.apache.kafka.connect.file.FileStreamSinkConnector,你可以指定这个全名,或者使用FileStreamSink或FileStreamSinkConnector来使配置简短一些。
Sink连接器也有一些额外的选项来控制它们的输入。每个sink连接器必须设置以下内容之一。
- topics - 以逗号分隔的主题列表,作为该连接器的输入。
- topics.regex - 一个Java正则表达式,用来作为这个连接器的输入的主题。
对于任何其他选项,你应该查阅连接器的文档。
Transformations
连接器可以配置transformations功能,以进行轻量级的逐次信息修改。它们可以方便地用于数据处理和事件路由。
在连接器的配置中可以指定一个transformation链。
- transforms - 转换的别名列表,指定转换应用的顺序。
- transforms.$alias.type - 转换的完全合格的类名称。
- transforms.$alias.$transformationSpecificConfig 转换器的配置属性
例如,让我们使用内置的文件source Connector,并使用一个转换来添加一个静态字段。
在整个例子中,我们将使用无模式的JSON数据格式。为了使用无模式的格式,我们把connect-standalone.properties中的以下两行从true改为false:
key.converter.schemas.enable
value.converter.schemas.enable
文件source connector将每一行作为一个字符串来读取。我们将把每一行封装在一个Map中,然后添加第二个字段来识别事件的source。为了做到这一点,我们使用两个transforms。
- HoistField将输入行放在Map中。
- InsertField来添加静态字段。在这个例子中,我们将表明记录来自一个文件Connector
- 添加transformations后,connect-file-source.properties文件看起来如下:
1 | name=local-file-source |
所有以transforms开头的行都是为transform而添加的。你可以看到我们创建的两个转换。”InsertSource “和 “MakeMap “是我们选择用来给transform的别名。这些transform类型是基于你可以在下面看到的内置transform列表的。每个transform类型都有额外的配置。HoistField需要一个名为 “field “的配置,它是map中的字段名称,将包括文件中的原始String。InsertField transform让我们指定字段名和我们要添加的值。
当我们在没有transform的情况下在我的样本文件上运行文件source Connector,然后用kafka-console-consumer.sh读取它们,结果是:
1 | "foo" |
然后我们创建一个新的文件Connector,这次是在向配置文件添加transform后。这一次,结果将是:
1 | {"line":"foo","data_source":"test-file-source"} |
你可以看到,我们所读的行现在是JSON map的一部分,并且有一个额外的字段,有我们指定的静态值。这只是一个例子,说明你可以用transform做什么。
包含的transform
Kafka Connect包含了几个广泛适用的数据和路由transform。
- InsertField - 使用静态数据或记录元数据添加一个字段
- ReplaceField - 过滤或重命名字段
- MaskField - 用有效的空值类型(0、空字符串等)或自定义替换(仅非空字符串或数字值)替换字段
- ValueToKey - 用一个由记录值中的字段子集形成的新键来替换记录键
- HoistField - 将整个事件作为一个单一的字段包裹在一个Struct或Map中。
- ExtractField - 从Struct和Map中提取一个特定的字段,在结果中只包括这个字段
- SetSchemaMetadata - 修改模式名称或版本
- TimestampRouter - 基于原始主题和时间戳修改记录的主题。当使用一个需要根据时间戳写到不同的表或索引的水槽时非常有用
- RegexRouter - 根据原始主题、替换字符串和正则表达式来修改记录的主题。
- Filter - 从所有进一步的处理中移除消息。这与一个谓词一起使用,以选择性地过滤某些消息。
- InsertHeader - 使用静态数据添加一个标题
- HeadersFrom - 复制或移动键或值中的字段到记录头中
- DropHeaders - 按名称删除标头
下面列出了如何配置每个transform的细节:
org.apache.kafka.connect.transforms.InsertField
使用记录元数据的属性或配置的静态值插入字段。
使用为记录的键(org.apache.kafka.connect.transforms.InsertField$Key)或值(org.apache.kafka.connect.transforms.InsertField$Value)设计的具体转换类型。
offset.field
Kafka偏移量的字段名–只适用于sink Connector。
后缀为!以使其成为一个必需的字段,或?以保持其可选性(默认)。
Type: string
Default: null
Valid Values:
Importance: medium
partition.field
Kafka分区的字段名。后缀为!以使其成为一个必需的字段,或?以保持其可选性(默认)。
Type: string
Default: null
Valid Values:
Importance: medium
static.field
静态数据字段的名称。后缀为!以使其成为一个必需的字段,或?以保持其可选性(默认)。
Type: string
Default: null
Valid Values:
Importance: medium
static.value
静态字段值,如果配置了字段名。
Type: string
Default: null
Valid Values:
Importance: medium
timestamp.field
记录时间戳的字段名。后缀为!以使其成为必填字段,或?以保持其可选性(默认)。
Type: string
Default: null
Valid Values:
Importance: medium
topic.field
Kafka主题的字段名。后缀为!以使其成为必填字段,或?以保持其可选性(默认
Type: string
Default: null
Valid Values:
Importance: medium
org.apache.kafka.connect.transforms.ReplaceField
过滤或重命名字段。
使用为记录的键(org.apache.kafka.connect.transforms.ReplaceField$Key)或值(org.apache.kafka.connect.transforms.ReplaceField$Value)设计的具体转换类型。
exclude
要排除的字段。这优先于要包括的字段。
Type: list
Default: “”
Valid Values:
Importance: medium
include
要包括的字段。如果指定,将只使用这些字段。
Type: list
Default: “”
Valid Values:
Importance: medium
renames
字段重命名的映射。
Type: list
Default: “”
Valid Values: list of colon-delimited pairs, e.g. foo:bar,abc:xyz
Importance: medium
blacklist
弃用。用exclude代替
Type: list
Default: null
Valid Values:
Importance: low
whitelist
弃用。用include代替
Type: list
Default: null
Valid Values:
Importance: low
org.apache.kafka.connect.transforms.MaskField
用字段类型的有效空值屏蔽指定的字段(即0、false、空字符串,等等)。
对于数字和字符串字段,可以指定一个可选的替换值,它将被转换为正确的类型。
使用为记录键(org.apache.kafka.connect.transforms.MaskField$Key)或值(org.apache.kafka.connect.transforms.MaskField$Value)设计的具体转换类型。
fields
要掩盖的字段的名称。
Type: list
Default:
Valid Values: non-empty list
Importance: high
replacement
自定义值替换,将应用于所有 “字段 “的值(仅数字或非空字符串值)。
Type: string
Default: null
Valid Values: non-empty string
Importance: low
org.apache.kafka.connect.transforms.ValueToKey
用一个由记录值中的字段子集形成的新键替换记录键。
fields
记录值上的字段名,以提取作为记录键。
Type: list
Default:
Valid Values: non-empty list
Importance: high
org.apache.kafka.connect.transforms.HoistField
当模式存在时,使用Struct中的指定字段名来包装数据,或者在无模式数据的情况下使用Map来包装。
使用为记录的键(org.apache.kafka.connect.transforms.HoistField$Key)或值(org.apache.kafka.connect.transforms.HoistField$Value)设计的具体转换类型。
field
将在结果Struct或Map中创建的单个字段的字段名。
Type: string
Default:
Valid Values:
Importance: medium
org.apache.kafka.connect.transforms.ExtractField
如果有模式,从Struct中提取指定的字段,如果没有模式,则从Map中提取。任何空值都会未经修改地通过。
使用为记录的键(org.apache.kafka.connect.transforms.ExtractField$Key)或值(org.apache.kafka.connect.transforms.ExtractField$Value)设计的具体转换类型。
field
Field name to extract.
Type: string
Default:
Valid Values:
Importance: medium
org.apache.kafka.connect.transforms.SetSchemaMetadata
在记录的键(org.apache.kafka.connect.transforms.SetSchemaMetadata$Key)或值(org.apache.kafka.connect.transforms.SetSchemaMetadata$Value)模式上设置模式名称、版本或两者。
schema.name
设置Schema的name
Type: string
Default: null
Valid Values:
Importance: high
schema.version
设置Schema的version
Type: int
Default: null
Valid Values:
Importance: high
org.apache.kafka.connect.transforms.TimestampRouter
更新记录的topic字段,作为原始topic值和记录时间戳的函数。
这主要对sink Connector有用,因为topic字段经常被用来确定目标系统中的等效实体名称(例如数据库表或搜索索引名称)。
timestamp.format
与java.text.SimpleDateFormat兼容的时间戳格式字符串。
Type: string
Default: yyyyMMdd
Valid Values:
Importance: high
topic.format
格式字符串,可以包含${topic}和${timestamp},分别作为topic和时间戳的占位符。
Type: string
Default: ${topic}-${timestamp}
Valid Values:
Importance: high
org.apache.kafka.connect.transforms.RegexRouter
使用配置的正则表达式和替换字符串更新记录topic。
在引擎盖下,正则表达式被编译成java.util.regex.Pattern。如果该模式与输入的topic相匹配,java.util.regex.Matcher#replaceFirst()就会与替换字符串一起使用,以获得新的topic。
regex
用于匹配的正则表达式。
Type: string
**Default:
Valid Values: valid regex
Importance: high
replacement
替换字符串。
Type: string
Default:
Valid Values:
Importance: high
org.apache.kafka.connect.transforms.Flatten
扁平化一个嵌套的数据结构,通过将每一层的字段名与一个可配置的分隔符连接起来,为每个字段生成名称。当模式存在时适用于Struct,或者在无模式数据的情况下适用于Map。数组字段和它们的内容不被修改。默认的分隔符是’.’。
使用为记录键(org.apache.kafka.connect.transforms.Flatten$Key)或值(org.apache.kafka.connect.transforms.Flatten$Value)设计的具体转换类型。
delimiter
在为输出记录生成字段名时,在输入记录的字段名之间插入分隔符
Type: string
Default: .
Valid Values:
Importance: medium
org.apache.kafka.connect.transforms.Cast
将字段或整个键或值转换为一个特定的类型,例如,将一个整数字段强制转换为一个较小的宽度。从整数、浮点数、布尔值和字符串转换到任何其他类型,并将二进制转换为字符串(base64编码)。
使用为记录键(org.apache.kafka.connect.transforms.Cast$Key)或值(org.apache.kafka.connect.transforms.Cast$Value)设计的具体转换类型。
spec
字段的列表,以及要将其转换为field1:type,field2:type形式的字段来转换Maps或Structs的字段。一个单一的类型来铸造整个值。有效的类型是int8, int16, int32, int64, float32, float64, boolean, 和 string。注意,二进制字段只能被转换为字符串。
Type: list
Default:
Valid Values: list of colon-delimited pairs, e.g. foo:bar,abc:xyz
Importance: high
org.apache.kafka.connect.transforms.TimestampConverter
在不同的格式之间转换时间戳,如Unix epoch、字符串和Connect Date/Timestamp类型。适用于单个字段或整个值。
使用为记录键(org.apache.kafka.connect.transforms.TimestampConverter$Key)或值(org.apache.kafka.connect.transforms.TimestampConverter$Value)设计的具体转换类型。
target.type
所需的时间戳表示:字符串、unix、日期、时间或时间戳
Type: string
Default:
Valid Values:
Importance: high
field
包含时间戳的字段,如果整个值是时间戳,则为空。
Type: string
Default: “”
Valid Values:
Importance: high
format
一个与SimpleDateFormat兼容的时间戳格式。当type=string时用于生成输出,如果输入是一个字符串,则用于解析输入。
Type: string
Default: “”
Valid Values:
Importance: medium
org.apache.kafka.connect.transforms.Filter
丢弃所有的记录,将它们从链中的后续转换中过滤掉。这旨在有条件地用于过滤出匹配(或不匹配)某个特定Predicate的记录。
org.apache.kafka.connect.transforms.InsertHeader
为每条记录添加一个头。
header
header的名称。
Type: string
Default:
Valid Values: non-null string
Importance: high
value.literal
将被设置为所有记录的标题值的文字值。
Type: string
Default:
Valid Values: non-null string
Importance: high
org.apache.kafka.connect.transforms.DropHeaders
从每条记录中删除一个或多个标头。
headers
要删除的标头的名字
Type: list
Default:
Valid Values: non-empty list
Importance: high
org.apache.kafka.connect.transforms.HeaderFrom
将一个记录的键/值中的字段移动或复制到该记录的页眉中。字段和头文件的对应元素一起标识一个字段和它应该被移动或复制到的头文件。使用为记录的键(org.apache.kafka.connect.transforms.HeaderFrom$Key)或值(org.apache.kafka.connect.transforms.HeaderFrom$Value)设计的具体转换类型。
fields
记录中的字段名,其值将被复制或移动到标题中。
Type: list
Default:
Valid Values: non-empty list
Importance: high
headers
标题名称,顺序与字段配置属性中列出的字段名称相同。
Type: list
Default:
Valid Values: non-empty list
Importance: high
operation
如果要将字段移到页眉(从键/值中移除),则选择移动;如果要将字段复制到页眉(保留在键/值中),则选择复制。
Type: string
Default:
Valid Values: [move, copy]
Importance: high
谓语
转化可以用谓词来配置,这样转化就只适用于满足某些条件的信息。特别是,当与 “filter “转换相结合时,谓词可用于选择性地过滤掉某些消息。
谓词在Connector配置中被指定。
- predicates - 要应用于某些转换的谓词的别名集。
- predicates.$alias.type - 谓语的完全限定类名称。
- predicates.$alias.$predicateSpecificConfig - 谓词的配置属性。
所有转换都有隐含的配置属性 predicate 和 negate。通过将转换的predicate配置设置为predicate的别名,predicular predicate与转换相关联。谓词的值可以用否定的配置属性来反转。
例如,假设你有一个source Connector,它向许多不同的topic产生消息,你想。
- 完全过滤掉’foo’topic中的信息
- 将字段名为’other_field’的ExtractField转换应用于所有topic中的记录,除了topic’bar’。
要做到这一点,我们首先需要过滤掉目的地为topic “foo “的记录。过滤转换将记录从进一步的处理中移除,并且可以使用TopicNameMatches谓词,只对topic中符合特定正则表达式的记录应用转换。TopicNameMatches的唯一配置属性是pattern,它是一个Java正则表达式,用于与topic名称匹配。该配置看起来像这样。
1 | transforms=Filter |
接下来,我们需要只在记录的topic名称不是’bar’时应用ExtractField。我们不能直接使用TopicNameMatches,因为这将把转换应用于匹配的topic名称,而不是不匹配的topic名称。转换的隐式否定配置属性允许我们反转谓词匹配的记录集。把这个配置加入到前面的例子中,我们就得到了。
1 | transforms=Filter,Extract |
Kafka Connect包括以下谓词。
- TopicNameMatches - 匹配topic中名称与特定Java正则表达式匹配的记录。
- HasHeaderKey - 匹配具有给定键的头的记录。
- RecordIsTombstone - 匹配有墓碑的记录,即空值的记录。
下面列出了如何配置每个谓词的细节:
org.apache.kafka.connect.transforms.predicates.HasHeaderKey
一个谓词,对于至少有一个头有配置名称的记录来说是真的。
name
标头名称
Type: string
Default:
Valid Values: non-empty string
Importance: medium
org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
一个谓词,对属于墓碑的记录(即有空值)来说是真的。
org.apache.kafka.connect.transforms.predicates.TopicNameMatches
一个谓词,对于topic名称与配置的正则表达式相匹配的记录为真。
pattern
一个Java正则表达式,用于匹配记录的topic名称。
Type: string
Default:
Valid Values: non-empty string, valid regex
Importance: medium
REST API
由于Kafka Connect旨在作为一项服务来运行,它还提供了一个REST API来管理Connector。REST API服务器可以使用听众配置选项进行配置。这个字段应该包含一个监听器的列表,格式如下:protocol://host:port,protocol2://host2:port2。目前支持的协议是http和https。比如说。
1 | listeners=http://localhost:8080,https://localhost:8443 |
默认情况下,如果没有指定监听器,REST服务器使用HTTP协议在8083端口运行。当使用HTTPS时,配置必须包括SSL配置。默认情况下,它将使用ssl.*设置。如果需要为REST API使用与连接Kafka经纪人不同的配置,可以在字段前加上listeners.https。当使用前缀时,只有前缀的选项会被使用,没有前缀的ssl.*选项会被忽略。以下字段可用于为REST API配置HTTPS。
- ssl.keystore.location
- ssl.keystore.password
- ssl.keystore.type
- ssl.key.password
- ssl.truststore.location
- ssl.truststore.password
- ssl.truststore.type
- ssl.enabled.protocols
- ssl.provider
- ssl.protocol
- ssl.cipher.suites
- ssl.keymanager.algorithm
- ssl.secure.random.implementation
- ssl.trustmanager.algorithm
- ssl.endpoint.identification.algorithm
- ssl.client.auth
REST API不仅被用户用来监控/管理Kafka Connect。它也被用于Kafka Connect的跨集群通信。在跟随者节点的REST API上收到的请求将被转发到领导者节点的REST API。如果给定的主机可到达的URI与它监听的URI不同,可以使用配置选项rest.advertised.host.name、rest.advertised.port和rest.advertised.listener来改变URI,这将被跟随者节点用来与领导者连接。当使用HTTP和HTTPS监听器时,rest.advertised.listener选项也可以用来定义哪个监听器将被用于跨集群通信。当使用HTTPS进行节点间的通信时,同样的ssl.*或listeners.https选项将被用来配置HTTPS客户端。
以下是目前支持的REST API:
- GET /connectors - 返回活动Connector的列表
- POST /connectors - 创建一个新的Connector;请求主体应该是一个JSON对象,包含一个字符串名称字段和一个包含Connector配置参数的对象配置字段
- GET /connectors/{name} - 获取特定Connector的信息
- GET /connectors/{name}/config - 获取特定Connector的配置参数
- PUT /connectors/{name}/config - 更新某个特定Connector的配置参数
- GET /connectors/{name}/status - 获得Connector的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个工作者,如果它失败了,错误信息,以及它所有任务的状态。
- GET /connectors/{name}/tasks - 获得一个Connector当前运行的任务列表
- GET /connectors/{name}/tasks/{taskid}/status - 获得任务的当前状态,包括是否正在运行、失败、暂停等,分配给哪个工作器,以及失败后的错误信息。
- PUT /connectors/{name}/pause - 暂停Connector及其任务,停止消息处理,直到Connector恢复工作。
- PUT /connectors/{name}/resume - 恢复已暂停的Connector(如果Connector没有暂停,则不做任何事情)。
- POST /connectors/{name}/restart?includeTasks=<true|false>&onlyFailed=<true|false> - 重新启动一个Connector及其任务实例。
- “includeTasks” 参数指定是重启Connector实例和任务实例(”includeTasks=true”),还是只重启Connector实例(”includeTasks=false”),默认(”false”)保留了与早期版本相同的行为。
- the “onlyFailed” 参数指定是只重启状态为FAILED的实例(”onlyFailed=true”)还是重启所有实例(”onlyFailed=false”),默认(”false”)保留与早期版本相同的行为。
- POST /connectors/{name}/tasks/{taskId}/restart - 重新启动一个单独的任务(通常是因为它失败了)
- DELETE /connectors/{name} - 删除一个Connector,停止所有任务并删除其配置。
- GET /connectors/{name}/topics - 获取特定Connector自创建以来或发出重置其活动topic集的请求以来正在使用的topic集。
- PUT /connectors/{name}/topics/reset - 发送请求以清空Connector的活动topic集。
Kafka Connect还提供了一个REST API来获取Connector插件的信息。
- GET /connector-plugins- 返回Kafka Connect集群中安装的Connector插件的列表。请注意,该API只检查处理请求的工作器上的Connector,这意味着你可能会看到不一致的结果,特别是在滚动升级期间,如果你添加了新的Connectorjars
- PUT /connector-plugins/{connector-type}/config/validate - 根据配置定义验证所提供的配置值。这个API执行每个配置的验证,在验证过程中返回建议值和错误信息。
根据配置定义验证所提供的配置值。该API执行每个配置的验证,在验证过程中返回建议值和错误信息。以下是在顶级(根)端点支持的REST请求:
- GET /- 返回有关Kafka Connect集群的基本信息,例如为REST请求提供服务的Connect worker的版本(包括源代码的git commit ID)和连接到的Kafka集群ID。
Connect中的错误报告
Kafka Connect提供错误报告,以处理在处理的各个阶段遇到的错误。默认情况下,在转换过程中或在转换中遇到的任何错误都会导致Connector失败。每个Connector的配置也可以通过跳过这些错误来实现对这些错误的容忍,可以选择将每个错误和失败操作的细节以及问题记录(有不同程度的细节)写到Connect应用日志。这些机制还可以捕捉到sink Connector在处理从其Kafka topic消耗的消息时出现的错误,所有的错误都可以写到一个可配置的 “死信队列”(DLQ)Kafka topic。
要将Connector的converter、transforms或sink Connector本身的错误报告到日志中,请在Connector配置中设置errors.log.enable=true,以记录每个错误的细节和问题记录的topic、分区和偏移。为了更多的调试目的,设置errors.log.include.messages=true来记录问题记录的键、值和标题到日志中(注意这可能记录敏感信息)。
要报告Connector的converter、transforms或sink Connector本身的错误到一个死信队列topic,请设置 errors.deadletterqueue.topic.name,以及可选的 errors.deadletterqueue.context.headers.enable=true。
默认情况下,Connector在出现错误或异常时立即表现出 “快速失败 “行为。这相当于在Connector配置中添加以下配置属性及其默认值。
1 | # disable retries on failure |
这些和其他相关的Connector配置属性可以被改变以提供不同的行为。例如,以下配置属性可以被添加到Connector配置中,以设置具有多次重试的错误处理,记录到应用程序日志和my-connector-errors Kafka topic,并通过报告它们而不是失败Connector任务来容忍所有错误。
1 | # retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures |
8.3 Connector 开发指南
本指南介绍了开发人员如何为Kafka Connect编写新的Connector,以便在Kafka和其他系统之间移动数据。它简要地回顾了几个关键的概念,然后描述了如何创建一个简单的Connector。
核心概念和APIs
Connectors 和 Tasks
为了在Kafka和另一个系统之间复制数据,用户为他们想要拉取数据或推送数据的系统创建一个Connector。Connector有两种类型。SourceConnectors从另一个系统导入数据(例如JDBCSourceConnector会将一个关系型数据库导入Kafka),SinkConnectors导出数据(例如HDFSSinkConnector会将一个Kafkatopic的内容导出到一个HDFS文件)。
Connector本身并不执行任何数据复制:它们的配置描述了要复制的数据,而Connector负责将该工作分解为一组可以分配给工作者的Tasks。这些任务也有两种相应的类型。SourceTask和SinkTask。
有了任务在手,每个Task必须将其数据子集复制到Kafka或从Kafka复制。在Kafka Connect中,应该总是可以将这些任务框定为一组由具有一致模式的记录组成的输入和输出流。有时,这种映射是显而易见的:一组日志文件中的每个文件都可以被视为一个流,每个被解析的行形成一个记录,使用相同的模式和偏移量作为字节偏移量存储在文件中。在其他情况下,可能需要更多的努力来映射到这个模型:JDBCConnector可以将每个表映射到一个流,但偏移量就不那么清楚了。一个可能的映射使用一个时间戳列来生成查询,递增地返回新的数据,最后查询的时间戳可以作为偏移。
Streams 和 Records
每个流应该是一个键值记录的序列。键和值都可以有复杂的结构–提供了许多原始类型,但数组、对象和嵌套数据结构也可以被表示。运行时的数据格式不承担任何特定的序列化格式;这种转换是由框架内部处理的。
除了键和值之外,记录(包括那些由source产生的记录和交付给sink的记录)还有相关的流ID和偏移。这些被框架用来定期提交已经处理过的数据的偏移量,以便在发生故障时,可以从最后提交的偏移量恢复处理,避免不必要的重新处理和事件的重复发生。
动态 Connectors
并非所有的工作都是静态的,所以 Connector 实现还负责监视外部系统的任何变化,这些变化可能需要重新配置。例如,在JDBCSourceConnector的例子中,连接器可能为每个任务分配一组表。当一个新表被创建时,它必须发现这一点,以便通过更新其配置将新表分配给一个任务。当它注意到需要重新配置的变化(或任务数量的变化)时,它会通知框架,框架会更新任何相应的任务。
开发发一个简单的Connector
开发一个Connector只需要实现两个接口,即Connector和Task。在文件包中,Kafka的源代码中包含了一个简单的例子。这个Connector是用来在standalone模式下使用的,它实现了SourceConnector/SourceTask,用来读取文件中的每一行,并将其作为记录发射出去,还实现了SinkConnector/SinkTask,将每一条记录写入文件中。
本节的其余部分将通过一些代码来演示创建Connector的关键步骤,但开发者也应该参考完整的示例源代码,因为为了简洁起见,许多细节被省略了。
Connector 示例
我们将介绍SourceConnector作为一个简单的例子。SinkConnector 的实现非常相似。首先创建继承自SourceConnector的类,并添加几个字段,以存储解析的配置信息(从文件名读取和发送数据的topic)。
1 | public class FileStreamSourceConnector extends SourceConnector { |
最容易填写的方法是taskClass(),它定义了工人进程中应该实例化的类,以实际读取数据。
1 |
|
我们将在下面定义FileStreamSourceTask类。接下来,我们添加一些标准的生命周期方法,start()和stop()。
1 |
|
最后,实现的真正核心在taskConfigs()中。在这种情况下,我们只处理一个文件,所以尽管我们可能被允许按照maxTasks参数生成更多的任务,我们还是返回一个只有一个条目的列表。
1 |
|
尽管在这个例子中没有使用,SourceTask也提供了两个API来提交source系统中的偏移量:commit和commitRecord。这些API是为那些有消息确认机制的source系统提供的。重写这些方法可以让source Connector确认source系统中的消息,无论是批量的还是单独的,一旦它们被写入Kafka。提交API在source 系统中存储偏移量,直到被poll返回的偏移量。这个API的实现应该阻塞,直到提交完成。commitRecord API在每个SourceRecord被写入Kafka后,将其偏移量保存在source 系统中。由于Kafka Connect会自动记录偏移量,所以SourceTasks不需要实现它们。在Connector确实需要确认source 系统中的消息的情况下,通常只需要其中一个API。
即使有多个任务,这个方法的实现通常也很简单。它只需要确定输入任务的数量,这可能需要联系它正在提取数据的远程服务,然后把它们分出来。由于在任务间分工的一些模式非常普遍,ConnectorUtils中提供了一些实用工具来简化这些情况。
注意,这个简单的例子不包括动态输入。关于如何触发对任务配置的更新,见下一节的讨论。
Task 示例
Source Task
接下来我们将描述相应的SourceTask的实现。这个实现很短,但太长了,无法在本指南中完全涵盖。我们将使用伪代码来描述大部分的实现,但你可以参考源代码来了解完整的例子。
就像Connector一样,我们需要创建一个继承自适当的基础任务类的类。它也有一些标准的生命周期方法。
1 | public class FileStreamSourceTask extends SourceTask { |
这些都是稍微简化的版本,但表明这些方法应该是相对简单的,它们应该执行的唯一工作是分配或释放资source 。关于这个实现,有两点需要注意。首先,start()方法还没有处理从先前的偏移量恢复的问题,这将在后面的章节中讨论。第二,stop()方法是同步的。这将是必要的,因为SourceTasks被赋予了一个专用线程,它们可以无限期地阻塞,所以它们需要通过Worker中不同线程的调用来停止。
接下来,我们实现任务的主要功能,poll()方法从输入系统获取事件并返回List<SourceRecord>。
1 |
|
同样,我们省略了一些细节,但我们可以看到重要的步骤:poll()方法将被反复调用,每次调用它都会循环尝试从文件中读取记录。对于它读取的每一行,它也会跟踪文件的偏移量。它使用这些信息来创建一个输出的SourceRecord,其中有四个信息:source分区(只有一个,就是被读取的单个文件)、source偏移量(文件中的字节偏移量)、输出topic名称和输出值(行,我们包括一个模式,表明这个值将总是一个字符串)。SourceRecord构造函数的其他变体也可以包括一个特定的输出分区、一个键和头文件。
请注意,这个实现使用正常的Java InputStream接口,如果数据不可用,可能会休眠。这是可以接受的,因为Kafka Connect为每个任务提供了一个专用线程。虽然任务的实现必须符合基本的poll()接口,但它们在实现方式上有很大的灵活性。在这种情况下,基于NIO的实现会更有效率,但这种简单的方法是可行的,可以快速实现,而且与旧版本的Java兼容。
Sink Tasks
上一节描述了如何实现一个简单的SourceTask。与SourceConnector和SinkConnector不同,SourceTask和SinkTask有非常不同的接口,因为SourceTask使用拉动接口,而SinkTask使用推动接口。两者都有共同的生命周期方法,但SinkTask的接口有很大不同。
1 | public abstract class SinkTask implements Task { |
SinkTask文档包含完整的细节,但这个接口几乎和SourceTask一样简单。put()方法应该包含大部分的实现,接受SinkRecords的集合,执行任何需要的翻译,并将它们存储在目标系统中。这个方法不需要在返回之前确保数据已经完全写到目的系统中。事实上,在许多情况下,内部缓冲将是有用的,因此可以一次性发送整批记录,减少将事件插入下游数据存储的开销。SinkRecords包含的信息与SourceRecords基本相同。Kafka topic、分区、偏移量、事件键和值,以及可选的头文件。
flush()方法是在偏移量提交过程中使用的,它允许任务从失败中恢复,并从一个安全点恢复,这样就不会错过任何事件。该方法应将任何未完成的数据推送到目标系统,然后阻塞,直到写入被确认。Offsets参数通常可以被忽略,但在某些情况下是有用的,因为实施者希望在目标存储中存储偏移信息,以提供精确的一次交付。例如,HDFSConnector可以这样做,并使用原子移动操作,以确保flush()操作将数据和偏移量原子地提交到HDFS的最终位置。
Errant Record Reporter
当Connector的错误报告被启用时,Connector可以使用ErrantRecordReporter来报告发送至sink Connector的单个记录的问题。下面的例子显示了一个Connector的SinkTask子类如何获得和使用ErrantRecordReporter,当DLQ没有启用或者Connector安装在没有这个报告功能的旧Connect运行时,安全地处理一个空报告。
1 | private ErrantRecordReporter reporter; |
从以前的偏移量恢复
SourceTask的实现包括每个记录的流ID(输入文件名)和偏移量(文件中的位置)。该框架使用它来定期提交偏移量,这样在发生故障的情况下,任务可以恢复并尽量减少重新处理和可能重复的事件的数量(或者如果Kafka Connect被优雅地停止,例如在独立模式下或由于工作的重新配置而从最近的偏移量恢复)。这个提交过程完全由框架自动完成,但只有Connector知道如何在输入流中寻找到正确的位置,以便从该位置恢复。
为了在启动时正确恢复,任务可以使用传入其initialize()方法的SourceContext来访问偏移数据。在initialize()中,我们会增加一些代码来读取偏移量(如果它存在的话),并寻求到那个位置。
1 | stream = new FileInputStream(filename); |
当然,你可能需要为每个输入流读取许多键。OffsetStorageReader接口还允许你发出批量读取,以有效地加载所有的偏移量,然后通过寻找每个输入流的适当位置来应用它们。
动态输入/输出流
Kafka Connect旨在定义批量数据复制作业,例如复制整个数据库,而不是创建许多作业来单独复制每个表。这种设计的一个后果是,Connector的输入或输出流集可能会随时间变化。
source Connector需要监控source 系统的变化,例如数据库中表的添加/删除。当他们发现变化时,他们应该通过ConnectorContext对象通知框架有必要进行重新配置。例如,在SourceConnector中。
1 | if (inputsChanged()) |
该框架将及时请求新的配置信息并更新任务,允许它们在重新配置之前优雅地提交其进度。请注意,在SourceConnector中,这种监控目前是由Connector的实现决定的。如果需要一个额外的线程来执行这种监控,Connector必须自己分配它。
理想情况下,这种监控变化的代码会被隔离到Connector中,任务不需要担心它们。然而,变化也会影响到任务,最常见的是他们的一个输入流在输入系统中被破坏,例如,如果一个表从数据库中被删除。如果任务在Connector之前遇到这个问题(如果Connector需要轮询变化,这很常见),任务将需要处理随后的错误。值得庆幸的是,这通常可以通过捕捉和处理适当的异常来简单地处理。
SinkConnectors通常只需要处理流的增加,这可能转化为其输出中的新条目(例如,一个新的数据库表)。该框架管理Kafka输入的任何变化,例如,当输入topic集因为重构订阅而改变时。SinkTasks应该期待新的输入流,这可能需要在下游系统中创建新的资源,比如数据库中的一个新表。在这些情况下,要处理的最棘手的情况可能是多个SinkTasks第一次看到一个新的输入流并同时试图创建新的资源之间的冲突。另一方面,SinkConnectors通常不需要特别的代码来处理一组动态的流。
Connect 配置验证
Kafka Connect允许你在提交Connector执行之前验证Connector的配置,并可以提供关于错误和推荐值的反馈。为了利用这一点,Connector开发者需要提供config()的实现,将配置定义暴露给框架。
下面的代码在FileStreamSourceConnector中定义了配置并将其暴露给框架。
1 | private static final ConfigDef CONFIG_DEF = new ConfigDef() |
ConfigDef类用于指定预期配置的集合。对于每个配置,你可以指定名称、类型、默认值、文档、组信息、组中的顺序、配置值的宽度和适合在用户界面中显示的名称。另外,你可以通过重写Validator类来提供用于单一配置验证的特殊验证逻辑。此外,由于配置之间可能存在依赖关系,例如,一个配置的有效值和可见性可能会根据其他配置的值而改变。为了处理这个问题,ConfigDef 允许你指定配置的依赖关系,并提供 Recommender 的实现,以便在给定当前配置值的情况下获取配置的有效值并设置可见性。
此外,Connector中的validate()方法提供了一个默认的验证实现,它返回一个允许的配置列表,以及每个配置的配置错误和推荐值。但是,它不使用推荐值进行配置验证。您可以为定制的配置验证提供默认实现的覆盖,它可以使用推荐值。
使用Schemas工作
FileStreamConnector是很好的例子,因为它们很简单,但它们也有微不足道的结构化数据–每一行只是一个字符串。几乎所有实用的Connector都需要具有更复杂数据格式的模式。
要创建更复杂的数据,你需要与Kafka Connect数据API合作。除了原始类型外,大多数结构化记录将需要与两个类互动。Schema和Struct。
API文档提供了完整的参考,但这里有一个简单的例子,创建一个Schema和Struct。
1 | Schema schema = SchemaBuilder.struct().name(NAME) |
如果你正在实施一个source Connector,你需要决定何时以及如何创建模式。在可能的情况下,你应该尽可能地避免重新计算它们。例如,如果你的Connector被保证有一个固定的模式,那么就静态地创建它并重复使用一个实例。
然而,许多Connector会有动态模式。这方面的一个简单例子是数据库Connector。即使只是考虑到一个单一的表,模式也不会为整个Connector预定义(因为它在不同的表之间变化)。但是,在Connector的生命周期中,它也可能不会为单个表固定,因为用户可能执行ALTER TABLE命令。Connector必须能够检测到这些变化并作出适当的反应。
水槽Connector通常比较简单,因为它们消耗的是数据,因此不需要创建模式。然而,他们应该同样小心地验证他们收到的模式是否具有预期的格式。当模式不匹配时–通常表明上游生产者正在生成无效的数据,无法正确地翻译到目标系统–sink Connector应该抛出一个异常,向系统表明这个错误。
Kafka Connect 管理
Kafka Connect的REST层提供了一套API来实现集群的管理。这包括查看Connector的配置及其任务状态的API,以及改变其当前行为(例如改变配置和重启任务)。
当一个Connector第一次被提交到集群中时,为了分配由新Connector的任务组成的负载,会在连接工作者之间触发一个再平衡。当Connector增加或减少它们所需的任务数量时,当Connector的配置被改变时,或者当作为Connect集群有意升级的一部分或由于故障而从组中添加或删除一个工作者时,也会使用这个相同的重新平衡程序。
在2.3.0之前的版本中,Connect工作者会重新平衡集群中的全部Connector及其任务,以此作为一种简单的方式来确保每个工作者的工作量大致相同。这种行为仍然可以通过设置connect.protocol=eager来启用。
从2.3.0开始,Kafka Connect默认使用的协议是执行增量合作式再平衡,在Connect工作器之间逐步平衡Connector和任务,只影响那些新的、要移除的或需要从一个工作器移到另一个工作器的任务。在重新平衡期间,其他任务不会像旧协议那样被停止和重新启动。
如果 Connect 工作者有意或因故障而离开组,Connect 会等待 scheduled.rebalance.max.delay.ms,然后触发重新平衡。这个延迟的默认值是5分钟(300000ms),以容忍工人的故障或升级,而不立即重新分配离开的工人的负载。如果该工作者在配置的延迟内返回,它就会完全获得其先前分配的任务。但是,这意味着在 scheduled.rebalance.max.delay.ms 所指定的时间过去之前,这些任务将保持未分配状态。如果一个工作者没有在该时间限制内返回,Connect将在Connect集群中的其余工作者中重新分配这些任务。
当构成Connect集群的所有工作器都被配置为connect.protocol=compatible时,新的Connect协议就会被启用,这也是缺少该属性时的默认值。因此,当所有的工作器升级到2.3.0时,升级到新的Connect协议会自动发生。当最后一个工作者加入到2.3.0版本时,Connect集群的滚动升级将激活增量合作再平衡。
你可以使用REST API来查看Connector及其任务的当前状态,包括每个被分配到的工作者的ID。例如,GET /connectors/file-source/status请求显示了一个名为file-source的Connector的状态。
1 | { |
Connectors和它们的tasks将状态更新发布到一个共享topic(用 status.storage.topic 配置),集群中的所有workers都会监控该topic。因为工作者异步地使用这个topic,所以在状态API中可以看到状态变化之前,通常会有一个(短暂的)延迟。对于一个Connector或其任务之一,可能有以下状态。
- UNASSIGNED:Connector/Task还没有被分配给工作者。
- RUNNING: Connector/Task正在运行。
- PAUSED: Connector/Task已在管理上暂停。
- FAILED: Connector/Task失败了(通常是通过引发一个异常,在状态输出中报告)。
- DESTROYED: 该Connector/Task已被管理性地移除,并将停止出现在Connect集群中。
在大多数情况下,Connector和任务的状态是一致的,尽管当变化发生时或任务失败时,它们可能在短时间内有所不同。例如,当一个Connector第一次启动时,在Connector和它的任务全部过渡到运行状态之前,可能会有一个明显的延迟。当任务失败时,状态也会发生变化,因为Connect不会自动重新启动失败的任务。要手动重启一个Connector/任务,你可以使用上面列出的重启API。注意,如果你试图在重新平衡的时候重启一个任务,Connect会返回409(冲突)状态代码。你可以在重新平衡完成后重试,但可能没有必要,因为重新平衡有效地重新启动集群中的所有Connector和任务。
从2.5.0开始,Kafka Connect使用status.storage.topic来存储与每个Connector正在使用的topic有关的信息。Connect Workers使用这些每个Connector的topic状态更新来响应对REST端点GET /connectors/{name}/topics的请求,返回Connector正在使用的一组topic名称。对REST端点PUT /connectors/{name}/topics/reset的请求会重置Connector的活动topic集,并允许根据Connector的最新topic使用模式填充新的topic集。当Connector被删除时,该Connector的活动topic集也会被删除。topic追踪默认是启用的,但可以通过设置topic.tracking.enable=false来禁用。如果你想在运行期间不允许请求重置Connector的活动topic,可以设置Worker属性 topic.tracking.allow.reset=false。
有时,暂时停止Connector的消息处理是很有用的。例如,如果远程系统正在进行维护,最好让source Connector停止轮询它的新数据,而不是用异常垃圾信息填充日志。对于这种用例,Connect提供了一个暂停/恢复的API。当一个source Connector被暂停时,Connect将停止轮询它的额外记录。当一个sin kConnector暂停的时候,Connect将停止向它推送新的消息。暂停状态是持久的,所以即使你重新启动集群,Connector也不会再次开始消息处理,直到任务被恢复。请注意,在Connector的所有任务过渡到暂停状态之前,可能会有延迟,因为它们可能需要时间来完成被暂停时正在进行的任何处理。此外,失败的任务将不会过渡到暂停状态,直到它们被重新启动。