一张图说明spring-kafka的基本用法

在java中操作kafka,可以通过kafka-clients或者使用spring-kafka来实现.

本文简要说明一下spring-kafka的使用,通过下面一张图概览spring-kafka的使用

spring-kafka的使用

一、生产者 

1.kafka生产者的初始化
  • kafka生产者主要通过KafkaTemplate对象来进行消息方面的操作
    • KafkaTemplate对象又是通过ProducerFactory对象来构造的
      • ProducerFactory只有一个默认的实现类DefaultKafkaProducerFactory(ProducerFactory通过一个map型配置参数字典来构造)
        • 通过ProducerConfig可以构造生产者的配置参数
        • 也可以自定义构造生产者配置参数
  1. /**
  2. * 1.构建一个配置参数字典(kafkaProducerProperties是一个读取应用配置的类)
  3. */
  4. Map<String, Object> props = new HashMap<String, Object>(8);
  5. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBootstrapServers());
  6. props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerProperties.getAcks());
  7. props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerProperties.getBatchSize());
  8. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProducerProperties.getBufferMemory());
  9. props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerProperties.getClientId());
  10. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerProperties.getKeySerializer());
  11. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerProperties.getValueSerializer());
  12. /**
  13. * 2.构造KafkaProducerFactory
  14. */
  15. KafkaProducerFactory<String, V> producerFactory = new DefaultKafkaConsumerFactory(props);
  16. /**
  17. * 3.构造KafkaTemplate
  18. */
  19. KafkaTemplate kafkaTemplate = new KafkaTemplate<String, V>(producerFactory);
2.发送消息

通过send方法或sendDefault方法发送消息

(1)sendDefault方法

发送到默认的主题中

  • sendDefault(V data):
    • data 为消息内容
  • sendDefault(K key, V data)
    • key为消息的唯一标识
  • sendDefault(Integer partition, K key, V data)
    • partition为分区标识
  • sendDefault(Integer partiton, long timestamp, K key, V data)
    • timestamp为消息时间

其底层是调用send方法

(2)send方法
  • send(String topic, V data):
    • topic为消息主题
    • data 为消息内容
  • send(String topic, K key, V data)
    • key为消息的唯一标识
  • send(String topic, Integer partition, K key, V data)
    • partition为分区标识
  • send(String topic, Integer partiton, long timestamp, K key, V data)
    • timestamp为消息时间
  • send(ProducerRecord producerRecord)
    • producerRecord对象有以下属性:
      • topic:主题
      • partition:分区
      • headers:头信息
      • key:消息标识
      • value:消息内容
      • timestamp:时间
  1. ProducerRecord<String, V> record = new ProducerRecord<String, V>(topic, String.valueOf(key), content);
  2. kafkaProducer.send(record);
3.其他方法
  • partitionsFor : 分区某个主题的分区列表信息
  • isTransactional : 是否是事务消息
  • inTransaction : 是否还在执行事务
  • flush : 主动刷新缓冲区中的日志到磁盘中
  • sendOffsetsToTransaction : 将偏移量发送给事务管理器
  1. List<PartitionInfo> partitionInfoList = kafkaProducer.partitionsFor("user_reg");
  2. log.info("分区信息:{}", partitionInfoList);

二、消费者 

消费消息通过消息监听器来实现,而监听器通过监听器容器来管理

1.初始化消息监听器容器 

而消息监听器容器有两类:

  • 普通容器:KafkaListenerContainerFactory
  • 并发容器:ConcurrentKafkaListenerContainerFactory

其步骤如下:

  • (1)构建消费者配置参数对象
  • (2)通过配置参数对象构造消费者工厂对象
  • (3)构造消息监听容器对象,并指定其消费者工厂对象
  1. /**
  2. * 1.构建配置消费者参数对象
  3. */
  4. Map<String, Object> props = new HashMap<String, Object>(8);
  5. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  6. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
  7. props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
  8. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getEnableAutoCommit());
  9. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getAutoOffsetReset());
  10. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaProperties.getAutoCommitIntervalMs());
  11. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getKeyDeserializer());
  12. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getValueDeserializer());
  13. /**
  14. * 2.通过配置参数对象构造消费者工厂对象
  15. */
  16. KafkaConsumerFactory kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(props);
  17. /**
  18. * 3.构建监听容器对象
  19. */
  20. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
  21. factory.setConcurrency(kafkaProperties.getConcurrency());
  22. factory.setBatchListener(true);
  23. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  24. factory.setConsumerFactory(kafkaConsumerFactory);
2.监听并消费消息
  • 通过@KafkaListener来标识监听器
    • id : 监听器标识
    • topics : 监听的主题
    • topicPattern : 监听的主题(通过模式来匹配)
    • containerFactory : 指定监听器容器
    • groupId : 消费者组
    • topicPartitions : 主题分区
  • 实现监听方法:它是一个回调方法,有以下参数
    • List> records:消息列表
    • Acknowledgment ack : 消息确认对象
      • acknowledge : 消费成功,手动确认提交偏移量
      • wait : 消费失败,等待重试 
  1. @KafkaListener(id ="user_reg", topics = "user_reg", containerFactory = "userContainerFactory")
  2. public void subscribeCreateUser(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
  3. try {
  4. log.info("收到消息:{}", records);
  5. /**
  6. * 手动提交偏移量
  7. */
  8. ack.acknowledge();
  9. } catch (Exception ex) {
  10. try {
  11. ack.wait();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }