2016 - 2024

感恩一路有你

kafka生产数据命令 如何删除kafka积压数据?

浏览量:1827 时间:2021-03-11 19:31:48 作者:admin

如何删除kafka积压数据?

卡夫卡可以通过两种方式删除数据

根据时间,删除一段时间后过期的消息

根据消息大小,消息数超过一定大小后删除最早的数据

卡夫卡删除数据的最小单位:segment

卡夫卡删除数据的主要逻辑:卡夫卡源代码

def cleanuplogs(){debug(”beging log cleanup。。。)var总计=0 val开始时间=时间.毫秒For(log

Kafka在一段时间内(配置文件设置)调用cleanuplogs一次,删除所有需要删除的日志数据。

Cleanupexpiredsegments负责清除超时数据

private def Cleanupexpiredsegments(log:log):int={val startms=时间.毫秒log.deleteOldSegments文件(开始时间->上次修改时间>log.config.retentions保留)}

cleanupsegmenttomaintainsize负责清理大于大小的数据私有def cleanupsegmentstomaintainsize(log:log):int={if(log.config.retentionSize文件=0){差异-=段.尺寸真}否则{假}log.deleteOldSegments文件(shouldDelete)}

我没事,来这里玩,开始在各种网络上寻找技术信息,之后以“标题”为主。从寻找信息到交朋友。因为我觉得事情落后于时代,有人认为,是因为自己水平不高。只是在心里想,无法实现现实

JAVA面试如何保证消息不被重复消费?如何保证消息消费的幂等性?

基于接收器的实现将使用kakfa的高级消费API。与所有其他接收器一样,接收到的数据将保存到执行器,然后sparkstreaming将启动作业来处理数据。

在默认配置中,如果出现故障,此方法将丢失数据。为了确保零数据丢失,我们需要启用wal(writeaheadlogs)。它将接收到的数据同步保存到分布式文件系统,如HDFS。因此,在发生错误时可以恢复数据。

使用两个步骤:1。添加依赖项:Spark streaming Kafka 2.10-1.3.0

2导入器g.apache.spark. 卡夫卡._

kafka生产数据命令 kafka创建消费组命令 kafka发送消息命令

版权声明:本文内容由互联网用户自发贡献,本站不承担相关法律责任.如有侵权/违法内容,本站将立刻删除。