kafka学习笔记

张彤 2021年10月20日 676次浏览

kafka 学习笔记

安装部署

本地虚拟机安装的过程在虚拟机笔记中,这里默认已经有了一台空的服务器

安装Java 1.8

教程链接

sudo yum install java-1.8.0-openjdk

java -version
javac -version


export JAVA_HOME=/opt/jdk1.8.0_201
export JRE_HOME=/opt/jdk1.8.0_201/jre
export PATH=$PATH:/opt/jdk1.8.0_201/bin:/opt/jdk1.8.0_201/jre/bin


find /usr/lib/jvm/java-1.x.x-openjdk
vim /etc/profile
source /etc/profile

# 找到java的安装位置
rpm -qa|grep java
rpm -ql java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64


#如果ls vi 等命令失效,说明Path变量被占用了,需要先恢复
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
#然后
vim /etc/profile
#在path设置最后加上 :$PATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin$PATH
#如果javac 提示命令无法找到 则
sudo yum install java-devel

安装kafka

参考链接

①建立用户名

-m 的作用是确保在/home 文件夹下有一个kafka的根目录

sudo useradd kafka -m
②设置密码

这里为了方便 统一为 123

sudo passwd kafka
③将用户kafka 加入wheel 组,这样就可以安装kafka相关的依赖了
sudo usermod -aG wheel kafka
④ 登录kafka账户
su -l kafka
⑤ 下载和解压kafka
mkdir ~/Downloads
⑥使用curl 命令下载二进制kafka
curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
⑦创建kafka安装基本目录
mkdir ~/kafka && cd ~/kafka
⑧ 使用tar命令解压缩下载的存档文件
tar -xvzf ~/Downloads/kafka.tgz --strip 1

这一步出现了问题,报错信息是
gzip: stdin: not in gzip format tar: Child returned status 1 tar: Error is not recoverable: exiting now
出现这样的问题,很可能是下载的包不是gzip文件
需要首先检查文件类型

file ~/Downloads/kafka.tgz

果然,返回的结果是
/home/kafka/Downloads/kafka.tgz: HTML document, ASCII text
那么需要删除这样的文件

rm /home/kafka/Downloads/kafka.tgz

官方网站上找到了链接,大概是66M

curl 'https://apache.website-solution.net/kafka/2.4.1/kafka_2.11-2.4.1.tgz' -o ~/Downloads/kafka.tgz

最后,还是在清华镜像站找到的资源,ftp传上去的 囧

⑨配置 kafka
vi ~/kafka/config/server.properties
#最底部加入
delete.topic.enable = true
10.创建systemctl配置文件,并且启动kafka
#add unit file for zookeeper
sudo vi /etc/systemd/system/zookeeper.service
# 输入以下文字

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target


#之后是新增kafka配置文件
sudo vi /etc/systemd/system/kafka.service

#填入以下内容

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
12.启动kafka
#启动
sudo systemctl start kafka
#检查kafka 启动日志
journalctl -u kafka
#服务开机自启动
sudo systemctl enable kafka
13.测试安装

下面我们将发布和消费一条helloworld消息,用来测试kafka运行正常

生产者,它支持向主题发布记录和数据。
消费者,从主题中读取消息和数据。

#首先,通过输入创建名为TutorialTopic的主题
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

#您可以使用kafka-console-producer.sh脚本从命令行创建一个生成器。它期望Kafka服务器的主机名、端口和主题名作为参数。
#通过输入将字符串“Hello, World”发布到TutorialTopic主题
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

#接下来,您可以使用Kafka -console-consumer.sh脚本创建一个Kafka使用者。它期望ZooKeeper服务器的主机名和端口,以及作为参数的主题名。
#下面的命令使用来自TutorialTopic的消息。请注意——from- begin标志的使用,它允许使用在使用者启动之前发布的消息


~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

#脚本将继续运行,等待向主题发布更多消息。您可以打开一个新的终端并启动一个生成器来发布更多的消息。你应该能在消费者的产出中看到它们。

#当您完成测试时,按CTRL+C停止使用者脚本。现在我们已经测试了安装,让我们继续安装KafkaT。

14.安装kafkaT

KafkaT是来自Airbnb(爱彼迎,一个民宿网站)的一个工具,它可以让您更容易地查看关于Kafka集群的详细信息,并通过命令行执行某些管理任务。因为它是Ruby项目,你将需要ruby来使用它。您还需要ruby-devel和构建相关的包,比如make和gcc,以便能够构建它所依赖的其他gem。使用yum安装它们

#安装ruby
sudo yum install ruby ruby-devel make gcc patch
#安装kafkat,可能有点慢,耐心等,我一度认为ruby装错了 :D
sudo gem install kafkat
#创建一个名为.kafkatcfg的新文件,root 下
vi ~/.kafkatcfg
#写入以下内容

{
  "kafka_path": "~/kafka",
  "log_path": "/tmp/kafka-logs",
  "zk_path": "localhost:2181"
}

#启动kafkat
kafkat partitions

更多关于kafkat的内容在这里

15.设置多节点集群(可选)

如果你在集群机器上经过以上步骤部署了kafka,还需要修改每一个kafka的
server.properties

  • 修改broker.id 的值,使得他在集群中是唯一的。
    此属性唯一地标识集群中的每个服务器,并可以使用任何字符串作为其值。例如,“server1”、“server2”等。
  • 应该更改zookeeper.connect属性的值,以便所有节点都指向同一ZooKeeper实例。 此属性指定Zookeeper实例的地址,并遵循<HOSTNAME / IP_ADDRESS>:格式。 例如“ 203.0.113.0:2181”、“203.0.113.1:2181”等。
16 限制kafka用户

现在所有的安装都完成了,您可以删除kafka用户的管理特权。在此之前,作为任何其他非根sudo用户注销并重新登录。如果您仍在运行与本教程开始时相同的shell会话,只需键入exit或者ctrl + D即可。

①从sudo组中删除kafka用户

sudo gpasswd -d kafka wheel

Removing user kafka from group wheel
②为了进一步提高您的Kafka服务器的安全性,使用passwd命令锁定Kafka用户的密码。这确保了没有人可以直接使用这个帐户登录到服务器

sudo passwd kafka -l

此时,只有root用户或sudo用户可以通过输入以下命令以kafka的身份登录

su - kafka
#如果想要解锁
sudo passwd kafka -u

python-kafka

安装

安装之前的一些工作
由于有些机器默认的是python2
我这里安装的是anaconda
后期可能要配置环境变量,我的anaconda安装位置是/etc/usr

#编辑虚拟环境变量
vi /etc/profile
#写入
CONDA_HOME='/usr/anaconda3/bin'
export PATH=$CONDA_HOME:$PATH
# 安装完毕后不要忘记重新注册
source /etc/profile
#激活虚拟环境
source active
#关闭
source deactive

安装



#在安装python与postgresql数据库交互前,需要执行以下操作
sudo yum groupinstall "Development Tools"
sudo yum install python3-devel
sudo yum install postgresql-libs
sudo yum install postgresql-devel
#最后执行
pip install psycopg2

#安装
pip install kafka-python 

kafka官方文档


KafkaConsumer 消费者

KafkaConsumer是一个高吞吐的消息消费者

  • 消费者不是线程安全的,不应该跨线程共享。
class kafka.KafkaConsumer(*topics, **configs)

消费来自kafka集群的记录

消费者将透明地处理Kafka集群中服务器的故障,
并在创建主题分区或在代理之间迁移时进行调整。
它还与分配的kafka Group协调器节点交互,以允许多个使用者负载平衡主题的消耗

参数说明
topics (str)

要订阅的可选主题列表,如果没有设置,
则调用之前消费过的记录的 subscribe() 订阅主题列表或主题正则表达式模式。
assign(partitions)手动将主题分区列表分配给此使用者。

Keyword Arguments:

bootstrap_servers: 

host[:port] 或者 host[:port] 的列表,用以引导消费端链接初始化kafka集群元数据。不必是完整的节点列表。默认端口是9092。如果没有指定服务器,默认为localhost:9092。

client_id:

本客户端名称,此字符串在每个请求中传递给服务器,并可用于标识与此客户机对应的特定服务器端日志条目。也提交给组协调器,用于记录有关使用者组管理的日志。默认‘kafka-python-

group_id

用于动态分区分配(如果启用)以及用于抓取和提交偏移量的连接的消费者组的名称。
如果没有,则禁用自动分区分配(通过组协调器)和偏移量提交。默认值:无

key_deserializer

接受原始消息键并返回反序列化键的任何可调用项。

value_deserializer

接受原始消息值并返回反序列化值的任何可调用。

fetch_min_bytes

服务器应该为获取请求返回的最小数据量.否则,等待获取max等待ms为更多的数据积累。默认值:1。

fetch_max_wait_ms

如果没有足够的数据来立即满足取回最小字节的要求,服务器在回答取回请求之前将阻塞的最大时间量(以毫秒为单位)。默认值:500。

fetch_max_bytes 

服务器应该为获取请求返回的最大数据量。这不是绝对最大值,如果fetch的第一个非空分区中的第一个消息大于这个值,仍然会返回该消息,以确保消费者能够继续执行。
注:使用者并行执行对多个代理的提取,因此内存使用将取决于包含主题分区的代理的数量

max_partition_fetch_bytes

服务每个分区返回的最大数据量。
此大小必须至少与服务允许的最大消息大小相同,否则,生产者发送的消息可能大于消费者获取的消息。如果发生这种情况,使用者在试图获取某个分区上的大消息时可能会陷入困境

request_timeout_ms

毫秒级别的等待时间,默认:305000 

retry_backoff_ms

毫秒级的错误重试间隔时间,默认100

reconnect_backoff_ms

尝试重新连接到给定主机之前等待的时间(以毫秒为单位)。默认值:50。

reconnect_backoff_max_ms

当重新连接到一个重复连接失败的代理时,后退/等待的最大时间(以毫秒为单位)。

max_in_flight_requests_per_connection

每个链接到broker的pipline(通道)的最大请求数量,默认为5.

auto_offset_reset

在OffsetOutOfRange上重置偏移量的策略.'latest'表示从最近一次kafka写入开始,'earliest'表示从kafka最早的记录开始读起,除此之外的值都会引发异常。

enable_auto_commit

如果为真,消费端的偏移量将定期在后台提交,默认为真

auto_commit_interval_ms

如果启用自动提交为真,自动偏移提交之间的毫秒数。默认值:5000。

default_offset_commit_callback

以callback(offsets, response)形式回调,响应将是OffsetCommitResponse结构体或者是一个异常,
此回调可用于在提交请求时触发自定义操作 

check_crcs

自动检查CRC32的消费记录。这确保了消息不会在线性网络上或磁盘上发生损坏。这个检查会增加一些开销,因此在寻求极端性能的情况下可能会禁用它,默认为True.

metadata_max_age_ms

即使没有看到任何分区领导更改来主动发现任何新的代理或分区,我们也将强制刷新元数据后的时间(以毫秒为单位)。 默认值:300000

partition_assignment_strategy

使用组管理时,用于在使用者实例之间分配分区所有权的对象列表.
默认值[RangePartitionAssignor, RoundRobinPartitionAssignor]

max_poll_records

一次调用poll()返回的最大记录数。 默认值:500

max_poll_interval_ms

当使用消费者组管理时,调用poll()之间的最大延迟。
这就为使用者在获取更多记录之前的空闲时间设置了一个上限。
如果在超时结束之前没有调用poll(),则认为使用者失败,组将重新进行平衡,以便将分区重新分配给另一个成员。默认300000

session_timeout_ms

当使用kafka组管理设施的时候,用于探测超时错误。
消费端发送周期性的心跳检测给broker.
如果broker在session超时之前没有收到心跳,broker将这个消费者从组中移除,而且初始化再平衡。
注意!该值必须在broker配置中配置的允许范围内(
group.min.session.timeout.ms,
group.max.session.timeout.ms,)
默认值10000

heartbeat_interval_ms

在使用kafka组管理工具时,消费端协调器与心跳之间的预期时间。
心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开小组时促进重新平衡。
该值的设置必须小于session_timeout_ms,但通常不应设置为该值的1/3。它可以调整更低,以控制正常重新平衡的预期时间。默认值:3000

receive_buffer_bytes

读取数据时使用的TCP接收缓冲区(即RCVBUF)的大小,
默认None(依赖于系统默认值), java 客户端默认 32768.

send_buffer_bytes

发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小,
默认None,java客户端默认131072。

socket_options

用于代理连接套接字的socket.setsockopt元组参数列表。 默认值:[(socket.IPPROTO_TCP,socket.TCP_NODELAY,1)]

consumer_timeout_ms

在触发iterator异常之前,消息块之间的间隔时长,

security_protocol

用于与broker之间通信的协议。
有效值包括 Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
默认值 PLAINTEXT

ssl_context

预配置的SSLContext用于包装套接字连接。
如果提供,所有其他ssl * 配置将被忽略。默认值:None

ssl_check_hostname

标志,用于配置ssl握手是否应验证证书与代理主机名匹配。默认值:True。

ssl_cafile

可选的用于证书验证的ca文件的文件名
默认值None

ssl_certfile

Pem格式的文件的可选文件名,包含客户端证书以及建立证书真实性所需的所有ca证书。
默认None

ssl_keyfile

包含客户端私钥的可选文件名。默认值:None。

ssl_crlfile

可选的文件名,包含用于检查证书过期的CRL。 默认情况下,不执行CRL检查。 提供文件时,将仅对此CRL检查叶子证书。
默认值:None

ssl_password

加载证书链时使用的可选密码。默认值:None。

ssl_ciphers

(可选)设置ssl连接的可用密码。 它应该是OpenSSL密码列表格式的字符串。 如果没有选择密码(因为编译时选项或其他配置禁止使用所有指定的密码),将引发ssl.SSLError。

api_version

指定kafka api版本。如果设置为None,客户机将尝试通过探测各种api来推断代理版本
不同的版本支持不同的功能。
默认值None

api_version_auto_timeout_ms

检查代理api版本时从构造函数抛出超时异常的毫秒数。仅在api版本设置为None时应用。

connections_max_idle_ms

在此配置指定的毫秒数之后关闭空闲连接。
默认值:540000

metric_reporters

用作指标报告者的类列表。 通过实现AbstractMetricsReporter接口,可以插入将通知新度量标准创建的类。
默认值 []

metrics_num_samples

为计算指标而维护的样本数量。默认值:2

metrics_sample_window_ms

用于计算指标的样本的最长生存时间(以毫秒为单位)。 默认值:30000

selector

提供用于I/O多路复用的特定选择器实现。
默认selectors.DefaultSelector

exclude_internal_topics

内部主题的记录(例如偏移量)是否应该向消费者公开。 如果设置为True,则从内部主题接收记录的唯一方法是订阅该主题。

sasl_mechanism

为SASL_PLAINTEXT或SASL_SSL配置security_protocol时的身份验证机制。
有效值是:PLAIN, GSSAPI, oauthholder, SCRAM-SHA-256, SCRAM-SHA-512。

sasl_plain_username

用于sasl PLAIN和SCRAM身份验证的用户名。 如果sasl_mechanism是PLAIN或SCRAM机制之一,则为必需。

sasl_plain_password

sasl普通认证和SCRAM认证的密码。如果sasl机制是普通的或一个停堆机制是必需的。

sasl_kerberos_service_name

要包含在GSSAPI sasl机制握手中的服务名称。 默认值:“ kafka”

sasl_kerberos_domain_name

GSSAPI sasl机制握手中使用的kerberos域名。
默认:一个 bootstrap 服务

sasl_oauth_token_provider

OAuthBearer令牌提供者实例。 (请参阅kafka.oauth.abstract)。 默认值:None


KafkaConsumer 实例属性与方法

assign(partitions)

手动将主题分区列表分配给此使用者。
此接口不支持增量赋值,并将替换先前的赋值(如果存在)。
通过这种方法手动分配主题不会使用消费者的组管理功能。 这样,当组成员资格或群集和主题元数据发生更改时,将不会触发任何重新平衡操作。

assignment()

获取当前分配给此使用者的主题分区。
如果主题是使用subscribe()进行订阅的,则这将给出当前分配给使用者的主题分区集(如果尚未发生分配,或者正在重新分配分区,则可以为“None”)。
注意!如果分区不存在,此方法可能会无限期阻塞。

bootstrap_connected()

bootstrap 是否链接,如果链接返回True.
这里需要注意的是,如果没有特别设置,在初始化consumer一段时间后,是断开的

commit(offsets=None)

提交偏移量到kafka,阻塞直到成功或错误。

commit_async(offsets=None, callback=None)

异步地向kafka提交偏移量,可选地触发回调。
这只会向Kafka提交偏移量。 使用此API提交的偏移量将在每次重新平衡后的首次提取中使用,也将在启动时使用。 因此,如果您需要将偏移量存储在Kafka以外的任何其他位置,则不应使用此API。 为了避免重新处理使用方重新启动后读取的最后一条消息,提交的偏移量应该是应用程序应该使用的下一条消息,即:last_offset + 1。
这是一个异步调用,不会阻塞。遇到的任何错误要么传递给回调(如果提供),要么丢弃。

committed(partition, metadata=False)

获取给定分区的最后提交偏移量。

end_offsets(partitions)

获取给定分区的最后偏移量。分区的最后偏移量是即将到来的消息的偏移量,即最后可用消息的偏移量+ 1。此方法不会改变分区的当前使用者位置。

highwater(partition)

分区的上一个已知高水位偏移量
highwater偏移量是将分配给生成的下一个消息的偏移量。

metrics(raw=False)

获取消费者表现的指标

offsets_for_times(timestamps)

按时间戳查找给定分区的偏移量。每个分区的返回偏移量是其时间戳大于或等于相应分区中给定时间戳的最早偏移量。
这是一个阻塞调用。不需要为使用者分配分区。

partitions_for_topic(topic)

此方法首先检查本地元数据缓存中有关主题的信息。
如果没有找到主题(因为主题不存在,用户没有权限查看主题,
或者没有填充元数据缓存),那么它将向集群发出元数据更新调用。

pause( partitions)*

暂停从请求的分区中提取消息
在使用resume()恢复分区之前,对poll()的后续调用不会返回来自这些分区的任何记录。

paused()

获取之前使用pause()暂停的分区。

poll(timeout_ms=0, max_records=None, update_offsets=True)

从分配的主题/分区中获取数据。
按主题分区分批获取和返回记录。在每次轮询中,使用者将尝试使用最后消耗的偏移量作为开始偏移量,并按顺序获取。最后使用的偏移量可以通过seek()手动设置,也可以自动设置为已订阅分区列表的最后提交偏移量。

position(partition)

获取将要提取的下一条记录的偏移量

resume( partitions)*

继续从指定的(暂停的)分区进行提取消息

seek(partition, offset)

手动指定TopicPartition的获取偏移量。
覆盖使用者将在下一个poll()中使用的获取偏移量。如果对同一个分区多次调用此API,那么下一次poll()将使用最新的偏移量。

seek_to_beginning

寻找分区的最早可用偏移量。

seek_to_end

寻找分区中最新的可用偏移量

subscribe(topics=(), pattern=None, listener=None)

订阅主题列表或主题正则表达式模式。

subscription

获取当前主题。

topics()

获取用户被授权查看的所有主题。这将始终向集群发出远程调用,以获取最新的信息。

unsubscribe

取消订阅所有主题并清除所有分配的分区。


KafkaProducer 生产者

  • KafkaProducer是一个高吞吐的异步消息生成器。
  • 将记录发布到Kafka集群的Kafka客户端。
  • 生产者是线程安全的,跨线程共享一个生产者实例通常比拥有多个实例要快。
  • 生产者由一个缓冲空间池(用于保存尚未传输到服务器的记录)以及一个后台I / O线程组成,该线程负责将这些记录转换为请求并将它们传输到集群。
  • send() 方法是异步的。当它被调用时,将被添加到记录发送挂起缓冲区,然后立刻返回。这允许生产者批量处理单个记录以提高效率。注意~异步只解决阻塞问题,不解决处理效率问题,增加处理效率只能从增加broker上入手。
  • “ acks”配置可控制将请求视为完整请求的标准。 “ all”设置将导致记录的完全提交受阻,这是最慢但最持久的设置。
  • 如果请求失败,生产者将自动重试,除非 ‘retries’选项配置为0。启用重试,也打开了消息重复的可能性。消息传递语义的文档
  • 生产者为每个分区维护未发送记录的缓冲区。这些缓冲区的大小由“ batch_size”配置指定。增大他可以处理更多的批处理,但是需要占用更多的内存。(因为每个活动分区通常都有一个缓冲区)
  • 默认情况下,即使缓冲区中有额外的未使用空间,也可以立即发送缓冲区内容。但是,如果您想减少请求数量,可以将“ linger_ms”设置为大于0的值。这将指示生产者在发送请求之前最多等待多少毫秒数,以期会有更多记录来填充在同一批次。这类似于TCP中的Nagle算法。请注意,即使linger_ms = 0,时间上接近的记录也通常会一起批处理,因此无论负载如何配置,都会在重负载下进行批处理。 但是,如果不将其设置为大于0的值,则当不在最大负载下时,它会导致更少,更有效的请求,但以少量的延迟为代价
  • buffer_memory控制生产者可用于缓冲的内存总量。如果记录的发送到缓冲区速度超过了将缓冲区记录发送到服务器的速度,则该缓冲区空间将被耗尽。 当缓冲区空间用尽时,其他发送调用将阻塞。
  • key_serializer和value_serializer指示如何将用户提供的键和值对象转换为字节。
classkafka.KafkaProducer(**configs)

参数说明

bootstrap_servers

host:port 字符串或者该形式的列表,生产者与元数据集群的联系。
这不必是完整的节点列表。 它仅需要至少一个broker来响应元数据API请求。 默认端口为9092。如果未指定服务器,则默认为localhost:9092。 

client_id

此客户的名称。 该字符串在每个请求中传递给服务器,可用于标识与此客户端相对应的特定服务器端日志条目。 默认值:“ kafka-python-producer-#”(每个实例附加唯一编号)

key_serializer

将用户提供的key 转换为字节bytes,如果不是None,则以f(key) 形式回调,回调函数应当返回bytes。默认值None。

value_serializer

将用户提供的值values转换为字节bytes。如果不是None,则以f(value)形式回调,回调函数应当返回bytes,

acks(0, 1, 'all')

生产者要求leader在确认请求完成之前已收到的确认数。 这控制了发送记录的持久性

  • 0:生产者将不等待服务器的任何确认。
    该消息将立即添加到套接字缓冲区中并视为已发送。 在这种情况下,不能保证服务器已收到记录,并且重试配置不会生效(因为客户端通常不会知道任何故障)。 为每个记录提供的偏移量将始终设置为-1。
  • 1:等待leader将记录仅写入其本地日志。
    broker 将立刻做出响应,无需等待所有follower的响应。
    在这种情况下,如果leader在确认记录后立即失败,但是在follower复制它之前,那么记录将丢失。
  • all:等待完整的同步副本写入记录。
    这样可以确保只要至少一个同步副本仍处于活动状态,记录就不会丢失。 这是最有力的保证。

默认值为1

compression_type

生产者生成的所有数据的压缩类型,
有效值包括‘gzip’, ‘snappy’, ‘lz4’, 或 None
压缩是对全部批次的数据进行压缩,所以批处理的效率也会影响压缩率(批处理越多意味着压缩越好)
默认值None

retries

设置一个大于零的值将导致客户端重新发送其发送失败并带有潜在的瞬时错误的任何记录。
请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。
如果不将max_in_flight_requests_per_connection设置为1而允许重试,则可能会更改记录的顺序,因为如果将两个批次发送到单个分区,并且第一个失败并被重试,但是第二个成功,则第二个批次中的记录可能会首先出现。
默认值为0

batch_size (int)

发送给brokers的请求将包含多个批次,每个分区一个,可发送数据。
较小的批处理大小将使批处理变得不那么普遍,并且可能会降低吞吐量(批处理batch_size大小为零将完全禁用批处理)。
默认值16384

linger_ms(int)

生产者将在请求传输之间到达的所有记录归为一个批处理的请求。 通常,只有在记录到达速度快于记录发送速度时,才在负载下发生这种情况。
但是,在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。
此设置通过添加少量的人为延迟来实现。 也就是说,生产者将立即等待直到给定的延迟,以允许其他记录被发送,以便发送者可以一起批处理,而不是立即发送记录。

  • 可以认为这类似于Nagle在TCP中的算法。 此设置给出了批处理延迟的上限:一旦获得分区的记录的batch_size值,无论此设置如何,它都会立即发送,但是,如果我们为该分区积累的字节数少于这个数量,我们将“徘徊” 在指定的时间内等待更多记录显示。 此设置默认为0(即无延迟)。 设置linger_ms = 5将具有减少发送请求的数量的效果,但是在没有负载的情况下,发送记录的延迟将增加5ms。

默认值:0

partitioner (callable)

Callable用于确定每个消息分配给哪个分区。 调用(在密钥序列化之后):分区程序(key_bytes,all_partitions,available_partitions)。 默认的分区程序实现使用与Java客户端相同的murmur2算法对每个非无键进行哈希处理,以便将具有相同键的消息分配给同一分区。 当键为“无”时,消息将传递到随机分区(如果可能,将过滤到只有可用leader的分区)

buffer_memory(int)

生产者应该用于缓冲等待发送到服务器的记录的总内存字节.
如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将最多阻塞max_block_ms,从而引发超时异常。 在当前的实现中,此设置为近似值。 默认值:33554432(32MB)

connections_max_idle_ms

在此配置指定的毫秒数后关闭空闲连接。 代理在connections.max.idle.ms之后关闭空闲连接,因此可以避免在客户端上遇到意外的套接字断开连接错误。 默认值:540000

max_block_ms (int)

send()和partitions_for()期间要阻止的毫秒数。 由于缓冲区已满或元数据不可用,可以阻止这些方法。 用户提供的串行器或分区器中的阻塞将不计入此超时。 默认值:60000

max_request_size (int)

请求的最大size,际上也是最大记录大小的上限。
请注意,服务器具有自己的记录大小上限,该上限可能与此不同。
此设置将限制生产者将在单个请求中发送的记录批数,以避免发送大量请求。 默认值:1048576

metadata_max_age_ms(int)

以毫秒为单位的时间段,在此之后,即使我们没有看到任何分区leader更改也可以强制刷新元数据以主动发现任何新的broker或分区。 默认值:300000

retry_backoff_ms (int)

重试错误时要退后的毫秒数。 默认值:100。

request_timeout_ms (int)

客户端请求超时(以毫秒为单位)。 默认值:30000

receive_buffer_bytes (int)

读取数据时要使用的TCP接收缓冲区(SO_RCVBUF)的大小。 默认值:None(取决于系统默认值)。 Java客户端默认为32768。

socket_options (list)

用于代理连接套接字的socket.setsockopt元组参数列表。 默认值:[(socket.IPPROTO_TCP,socket.TCP_NODELAY,1)]

reconnect_backoff_ms (int)

尝试重新连接到给定主机之前要等待的时间(以毫秒为单位)。 默认值:50。

reconnect_backoff_max_ms (int)

当重新连接到一个连接失败的broker时,后退/等待的最大时间(以毫秒为单位)
如果提供此选项,则对于每个连续的连接失败,每台主机的回退将成倍增加,直至达到此最大值。
一旦达到最大值,重新连接尝试将以此固定速率定期进行。 为了避免出现连接风暴,将对退避值应用0.2的随机化因子,以使随机范围介于计算值以下20%到计算值20%之间。 默认值:1000。

max_in_flight_requests_per_connection (int)

请求被管道传输到kafka代理,每个代理连接的最大请求数达到此上限。 请注意,如果将此设置设置为大于1且发送失败,则由于重试(例如,如果启用重试),则存在消息重新排序的风险。
默认值:5

security_protocol (str)

与broker通信的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 默认值:PLAINTEXT。

ssl_context (ssl.SSLContext)

预配置的SSLContext用于包装套接字连接。 如果提供,所有其他ssl_ * 配置将被忽略。
默认值:无。

ssl_check_hostname (bool)

标志,用于配置ssl握手是否应验证证书是否与代理主机名匹配。 默认值:true。

ssl_cafile (str)

用于证书验证的ca文件的可选文件名。 默认值:无。ssl_certfile(str)– Pem格式的文件的可选文件名,包含客户端证书以及建立证书真实性所需的所有ca证书。
默认值:无。

ssl_ciphers (str)

(可选)设置ssl连接的可用密码。 它应该是OpenSSL密码列表格式的字符串。 如果无法选择任何密码(因为编译时选项或其他配置禁止使用所有指定的密码),将引发ssl.SSLError。 请参阅ssl.SSLContext.set_ciphers

api_version (tuple)

指定要使用的Kafka API版本。
如果设置为None,则客户端将尝试通过探查各种API来推断代理版本。
示例:(0,10,2)。
默认值:无

api_version_auto_timeout_ms(int)

检查代理api版本时从构造函数引发超时异常的毫秒数。 仅在api_version设置为None时适用。

metric_reporters (list)

用作指标报告者的类列表。 通过实现AbstractMetricsReporter接口,可以插入将通知新度量标准创建的类。 默认值:[]

metrics_num_samples (int)

维护以计算指标的样本数。
默认值:2

metrics_sample_window_ms (int)

用于计算指标的样本的最长生存时间(以毫秒为单位)。
默认值:30000

selector (selectors.BaseSelector)

提供用于I/O多路复用的特定选择器实现。默认值:selectors.DefaultSelector

sasl_mechanism (str)

为SASL_PLAINTEXT或SASL_SSL配置security_protocol时的身份验证机制。
有效值为:PLAIN,GSSAPI,OAUTHBEARER,SCRAM-SHA-256,SCRAM-SHA-512。

sasl_plain_username (str)

用于sasl PLAIN和SCRAM身份验证的用户名。
如果sasl_mechanism是PLAIN或SCRAM机制之一,则为必需。

sasl_plain_password (str)

用于sasl PLAIN和SCRAM身份验证的密码。
如果sasl_mechanism是PLAIN或SCRAM机制之一,则为必需。

sasl_kerberos_service_name (str)

要包含在GSSAPI sasl机制握手中的服务名称。 默认值:“ kafka”

sasl_oauth_token_provider (AbstractTokenProvider)

OAuthBearer令牌提供者实例。 (请参阅kafka.oauth.abstract)。 默认值:无


KafkaProducer 实例属性与方法

bootstrap_connected()

bootstrap 是否链接,如果链接,返回True

close(timeout=None)

关闭producer

flush(timeout=None)

调用此方法将使所有缓冲记录立即可用于发送(即使linger_ms大于0),并在与这些记录关联的请求完成时阻塞。 flush()的后置条件是任何先前发送的记录都将完成(例如Future.is_done()== True)。
根据生产者的“ acks”配置成功确认请求后,或者导致错误,则认为该请求已完成。
其他线程可以继续发送消息,同时一个线程被阻塞,等待刷新调用完成。 但是,不能保证刷新调用开始后发送的消息是否完整。

metrics(raw=False)

获取生产者绩效指标。

partitions_for(topic)

返回该主题的所有已知分区的集合。

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

向主题发布消息,具体的参数如下:

  • topic (str)  将要发送到的主题
  • value (optional) 消息值。 必须是字节类型,或者可以通过配置的value_serializer序列化为字节。如果value是None,则key是必需的,message充当删除操作。
  • partition (int, optional)**
    (可选)指定分区。 如果未设置,则将使用配置的“分区程序”来选择分区。
  • key (optional)**
    与消息关联的key。 可用于确定将消息发送到哪个分区。
    如果分区是None(并且producer s partitioner config保留为默认值),那么具有相同键的消息将被传递到相同的分区(但是如果键是None,分区将被随机选择)。
    类型必须为byte,或可通过配置的键序列化器序列化为字节。
  • headers (optional)
    一个键值对的列表,列表中的对必须是 以元组形式出现(key str,value byte)
  • timestamp_ms (int, optional)
    历时毫秒(从UTC 1970年1月1日开始)用作消息时间戳。 默认为当前时间。
    返回解析为RecordMetadata

Thread safety 线程安全

KafkaProducer可以毫无问题地跨线程使用,而KafkaConsumer则不能。
虽然可以以多线程本地的方式使用KafkaConsumer,但建议使用多进程处理。
线程安全相关文章

Compression 压缩

kafka-python原生支持gzip压缩/解压缩。 要产生或使用lz4压缩消息,您应该安装python-lz4(pip install lz4)。 要启用快照,请安装python-snappy(也需要快照库)。

Protocol 协议

kafka-python的第二个目标是提供一个易于使用的协议层,用于通过python repl与kafka代理交互。这对于测试、探测和一般实验是有用的