kafka生产数据命令 如何删除kafka积压数据?
如何删除kafka积压数据?
卡夫卡可以通过两种方式删除数据
根据时间,删除一段时间后过期的消息
根据消息大小,消息数超过一定大小后删除最早的数据
卡夫卡删除数据的最小单位:segment
卡夫卡删除数据的主要逻辑:卡夫卡源代码
def cleanuplogs(){debug(”beging log cleanup。。。)var总计=0 val开始时间=时间.毫秒For(log
Kafka在一段时间内(配置文件设置)调用cleanuplogs一次,删除所有需要删除的日志数据。
Cleanupexpiredsegments负责清除超时数据
private def Cleanupexpiredsegments(log:log):int={val startms=时间.毫秒log.deleteOldSegments文件(开始时间->上次修改时间>log.config.retentions保留)}
cleanupsegmenttomaintainsize负责清理大于大小的数据私有def cleanupsegmentstomaintainsize(log:log):int={if(log.config.retentionSize文件=0){差异-=段.尺寸真}否则{假}log.deleteOldSegments文件(shouldDelete)}
我没事,来这里玩,开始在各种网络上寻找技术信息,之后以“标题”为主。从寻找信息到交朋友。因为我觉得事情落后于时代,有人认为,是因为自己水平不高。只是在心里想,无法实现现实
JAVA面试如何保证消息不被重复消费?如何保证消息消费的幂等性?
基于接收器的实现将使用kakfa的高级消费API。与所有其他接收器一样,接收到的数据将保存到执行器,然后sparkstreaming将启动作业来处理数据。
在默认配置中,如果出现故障,此方法将丢失数据。为了确保零数据丢失,我们需要启用wal(writeaheadlogs)。它将接收到的数据同步保存到分布式文件系统,如HDFS。因此,在发生错误时可以恢复数据。
使用两个步骤:1。添加依赖项:Spark streaming Kafka 2.10-1.3.0
2导入器g.apache.spark. 卡夫卡._
kafka生产数据命令 kafka创建消费组命令 kafka发送消息命令
版权声明:本文内容由互联网用户自发贡献,本站不承担相关法律责任.如有侵权/违法内容,本站将立刻删除。