Kafka 0.11
1.幂等性
含义:
生产者发送数据时,难免会重复发送消息,如重试导致相同的数据发送多次,而引入幂等性后,重复发送只会生成一条有效的消息。
原理:
生产者发送到broker端的每批消息都会被赋予一个序列号用于去重,从0开始严格单调递增。
每个生产者实例都有一个producer ID,生产者在初始化时必须分配一个producer ID(每个topic每个分区都有自己的序列号)
broker会将序列号与消息一起保存在日志中。这样即使leader副本挂掉,新选择的leader也能执行消息去重
如果发送消息的序列号小于或者等于broker端保存的序列号,那么broker会拒绝这条消息的写入操作
注意点:
以上的设计确保了即使出现重试操作,每条消息也仅仅生成一条有效的。不过由于每个新的producer实例都会有不同的producer ID,
同一个分区下序列化是递增的,所以只能保证单个producer实例的exactly once语义,无法保证多个producer实例一起提供exactly once语义,只能保证单个topic 分区的幂等性
使用:
使用spaing-kafka
配置
@Bean(name = "idempotenceKafkaTemplate")
public KafkaTemplate<String, String> idempotenceKafkaTemplate() {
//省略一些配置
Map<String, Object> props = new HashMap<>();
, serverAddress);
, S);
, S);
//, "all");
, true);//开启幂等配置,需要设置为ture,此时就会默认把acks设置为all,所以不需要再设置acks属性了
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);
return template;
}
使用
@Autowired(required = false)
@Qualifier("idempotenceKafkaTemplate")
private KafkaTemplate<String, String> idempotenceKafkaTemplate;
idem(new KafkaProduceListener());这里可以设置监听器,
ListenableFuture<SendResult<String, String>> future = idem(topic, partition, key, data);
2.事务
含义:
kafka事务指kafka一以写生产、消费消息等操作组成一个原子操作。生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败
原理:
有点复杂,篇幅太长,简单说是通过TransactionCoordinator 、Transaction Log等来实现,想要了解可以看看专门深入剖析kafka的文档。
注意点:
消费者的自动模式需要设置为false,且不能再手动地进行执行commitSync、commitAsyc
生产者配置属性,生产者不需要再配置enable.idempotence,因为如果配置了,则此时enable.idempotence会被默认设置为true
消费者需要配置I。在consume-》trnasform-》produce模式下使用事务时,必须设置为READ_COMMITTED。
跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);
跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-》Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。
使用:
public KafkaTemplate<String, String> transactionIdempotenceKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
, serverAddress);
, S);
, S);
, transactionalId);
//生产者不需要再配置enable.idempotence,因为如果配置了,则此时enable.idempotence会被设置为true
//, true);
//, "all");
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);
return template;
}
/**
* 事务发送,能保证多个分区幂等
*/
public void sendWithTransactionIdempotence(String topic, String data) {
(kafkaOperations -> {
ListenableFuture<SendResult<String, String>> future = ka(topic, data);
return true;
});
}