您的位置 首页 > 数码极客

【小于或等于符号怎么打】kafka幂等、事务简单使用

Kafka 0.11

1.幂等性

含义:

生产者发送数据时,难免会重复发送消息,如重试导致相同的数据发送多次,而引入幂等性后,重复发送只会生成一条有效的消息。

原理:

生产者发送到broker端的每批消息都会被赋予一个序列号用于去重,从0开始严格单调递增。

每个生产者实例都有一个producer ID,生产者在初始化时必须分配一个producer ID(每个topic每个分区都有自己的序列号)

broker会将序列号与消息一起保存在日志中。这样即使leader副本挂掉,新选择的leader也能执行消息去重

如果发送消息的序列号小于或者等于broker端保存的序列号,那么broker会拒绝这条消息的写入操作


注意点:

以上的设计确保了即使出现重试操作,每条消息也仅仅生成一条有效的。不过由于每个新的producer实例都会有不同的producer ID,

同一个分区下序列化是递增的,所以只能保证单个producer实例的exactly once语义,无法保证多个producer实例一起提供exactly once语义,只能保证单个topic 分区的幂等性

使用:

使用spaing-kafka

配置

@Bean(name = "idempotenceKafkaTemplate")

public KafkaTemplate<String, String> idempotenceKafkaTemplate() {

//省略一些配置

Map<String, Object> props = new HashMap<>();

, serverAddress);

, S);

, S);

//, "all");

, true);//开启幂等配置,需要设置为ture,此时就会默认把acks设置为all,所以不需要再设置acks属性了

ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);

return template;

}

使用

@Autowired(required = false)

@Qualifier("idempotenceKafkaTemplate")

private KafkaTemplate<String, String> idempotenceKafkaTemplate;

idem(new KafkaProduceListener());这里可以设置监听器,

ListenableFuture<SendResult<String, String>> future = idem(topic, partition, key, data);

2.事务

含义:

kafka事务指kafka一以写生产、消费消息等操作组成一个原子操作。生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败

原理:

有点复杂,篇幅太长,简单说是通过TransactionCoordinator 、Transaction Log等来实现,想要了解可以看看专门深入剖析kafka的文档。

注意点:

消费者的自动模式需要设置为false,且不能再手动地进行执行commitSync、commitAsyc

生产者配置属性,生产者不需要再配置enable.idempotence,因为如果配置了,则此时enable.idempotence会被默认设置为true

消费者需要配置I。在consume-》trnasform-》produce模式下使用事务时,必须设置为READ_COMMITTED。

跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;

跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);

跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-》Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。

使用:

public KafkaTemplate<String, String> transactionIdempotenceKafkaTemplate() {

Map<String, Object> props = new HashMap<>();

, serverAddress);

, S);

, S);

, transactionalId);

//生产者不需要再配置enable.idempotence,因为如果配置了,则此时enable.idempotence会被设置为true

//, true);

//, "all");

DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);

return template;

}

/**

* 事务发送,能保证多个分区幂等

*/

public void sendWithTransactionIdempotence(String topic, String data) {

(kafkaOperations -> {

ListenableFuture<SendResult<String, String>> future = ka(topic, data);

return true;

});

}

关于作者: admin

无忧经验小编鲁达,内容侵删请Email至wohenlihai#qq.com(#改为@)

热门推荐