spring与kafka集成即spring-kafka实例讲解

一、消息系统模块说明

消息系统模块

  • common : 公共模块(一般放到maven私服中,供生产者和消费者加载)
  • producer : 生产者服务(一般单独部署,依赖common包)
  • consumer: 消费者服务(一般单独部署,依赖common包)

本文通过一个用户注册的案例来介绍spring与kafka客户端集成的技术实现

  • 用户注册:
    • 插入数据库等操作
    • 发送用户注册消息(生产者),其他相关事务在用户注册后,发送邮件或其他的操作(消费者)

二、公共模块定义

1.公共模块主要内容
  • 消息内容实体(供生产者和消费者使用)
  • 消息内容序列化与反序列化工具类
    • 消息内容序列化:消息生产者将消息内容进行序列化发送到broker
    • 消息内容反序列化:消费者从broker拉取消息,将消息内容进行反序列化

注:消息为什么要序列化或反序列化呢?

因为我们发送的消息内容一般不可能是单纯的字符串,也是结构化数据,如一个数组列表,或者一个数据实体类,或者一个Map,因此,我们需要将结构化的数据转化为字符串,因此就要用到序列化类.

2.导入kafka依赖包
  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.1.0</version>
  5. </dependency>
3.消息内容实体

根据业务需要,定义消息内容实体,一般可能与数据库表结构相关,这里以一个简单的用户注册的例子来说明,定义一个用户类

  1. @Data
  2. public class User implements Serializable {
  3. private static final long serialVersionUID = -3247930189609406661L;
  4. private long userId;
  5. private String nickname;
  6. private String email;
  7. private String account;
  8. private String password;
  9. private int status;
  10. private long createTime;
  11. public User() {
  12. }
  13. public User(String nickname, String email, String account, String password, int status, long createTime) {
  14. this.userId = userId;
  15. this.nickname = nickname;
  16. this.email = email;
  17. this.account = account;
  18. this.password = password;
  19. this.status = status;
  20. this.createTime = createTime;
  21. }
  22. public User(long userId, String nickname, String email, String account, String password, int status, long createTime) {
  23. this.userId = userId;
  24. this.nickname = nickname;
  25. this.email = email;
  26. this.account = account;
  27. this.password = password;
  28. this.status = status;
  29. this.createTime = createTime;
  30. }
  31. @Override
  32. public String toString() {
  33. return "User{" +
  34. "userId=" + userId +
  35. ", nickname='" + nickname + '\'' +
  36. ", email='" + email + '\'' +
  37. ", account='" + account + '\'' +
  38. ", password='" + password + '\'' +
  39. ", status=" + status +
  40. ", createTime=" + createTime +
  41. '}';
  42. }
  43. }

注:

  • 消息实体类一定要实现Serializable这个接口(因为消息内容要序列化,必须显示说明它是可序列化的)
4.定义消息内容序列化类和反序列化类
(1)定义一个转换工具类:用于字节数组与对象之间的转换
  1. import java.io.*;
  2. /**
  3. * @author shixinke
  4. * @version 1.0
  5. * @Description
  6. * @Date 19-2-1 下午3:02
  7. */
  8. public class Converter {
  9. /**
  10. * bean转化为byte数组
  11. * @param object
  12. * @return
  13. */
  14. public static byte[] bean2ByteArray(Object object) {
  15. byte[] bytes = null;
  16. ByteArrayOutputStream byteArrayOutputStream = null;
  17. ObjectOutputStream outputStream = null;
  18. try {
  19. /**
  20. * 1.定义字节数组输出流和对象输出流
  21. */
  22. byteArrayOutputStream = new ByteArrayOutputStream();
  23. outputStream = new ObjectOutputStream(byteArrayOutputStream);
  24. /**
  25. * 2.将对象写入输出流中
  26. */
  27. outputStream.writeObject(object);
  28. outputStream.flush();
  29. /**
  30. * 3.字节数组输出流转化为字节数组
  31. */
  32. bytes = byteArrayOutputStream.toByteArray();
  33. } catch (Exception ex) {
  34. ex.printStackTrace();
  35. } finally {
  36. /**
  37. * 4.关闭流,释放资源
  38. */
  39. if (outputStream != null) {
  40. try {
  41. outputStream.close();
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. if (byteArrayOutputStream != null) {
  47. try {
  48. byteArrayOutputStream.close();
  49. } catch (IOException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }
  54. return bytes;
  55. }
  56. /**
  57. * byte数组转化为bean
  58. * @param bytes
  59. * @return
  60. */
  61. public static Object byteArray2Bean(byte[] bytes) {
  62. Object object = null;
  63. ByteArrayInputStream byteArrayInputStream = null;
  64. ObjectInputStream inputStream = null;
  65. try {
  66. /**
  67. * 1.定义字节数组输入流
  68. */
  69. byteArrayInputStream= new ByteArrayInputStream(bytes);
  70. /**
  71. * 2.定义输入流
  72. */
  73. inputStream = new ObjectInputStream(byteArrayInputStream);
  74. /**
  75. * 3.从输入流中读取对象数据
  76. */
  77. object = inputStream.readObject();
  78. } catch (Exception ex) {
  79. ex.printStackTrace();
  80. } finally {
  81. /**
  82. * 4.关闭流,释放资源
  83. */
  84. if (inputStream != null) {
  85. try {
  86. inputStream.close();
  87. } catch (IOException e) {
  88. e.printStackTrace();
  89. }
  90. }
  91. if (byteArrayInputStream != null) {
  92. try {
  93. byteArrayInputStream.close();
  94. } catch (IOException e) {
  95. e.printStackTrace();
  96. }
  97. }
  98. }
  99. return object;
  100. }
  101. }
(2)定义序列化类
  1. import org.apache.kafka.common.serialization.Serializer;
  2. import java.util.Map;
  3. /**
  4. * @author shixinke
  5. * @version 1.0
  6. * @Description
  7. * @Date 19-2-1 下午2:56
  8. */
  9. public class KafkaObjectSerializer implements Serializer<Object> {
  10. @Override
  11. public void configure(Map<String, ?> config, boolean isKey) {
  12. }
  13. /**
  14. * 实现序列化的方法
  15. * @param topic 主题
  16. * @param data 消息内容数据
  17. * @return
  18. */
  19. @Override
  20. public byte[] serialize(String topic, Object data) {
  21. return Converter.bean2ByteArray(data);
  22. }
  23. @Override
  24. public void close() {
  25. }
  26. }

注:

  • 序列化类主要是实现org.apache.kafka.common.serialization.Serializer这个类
    • serialize 方法:实现序列化的方法 
  • 序列化类主要是消息生产者使用,在发送消息前先消息内容进行序列化
(3)实现反序列化方法
  1. mport org.apache.kafka.common.serialization.Deserializer;
  2. import java.util.Map;
  3. /**
  4. * @author shixinke
  5. * @version 1.0
  6. * @Description
  7. * @Date 19-2-1 下午5:43
  8. */
  9. public class KafkaObjectDeserializer implements Deserializer<Object> {
  10. @Override
  11. public void configure(Map<String, ?> config, boolean isKey) {
  12. }
  13. /**
  14. * 反序列化方法
  15. * @param topic 主题
  16. * @param data 消息内容
  17. * @return
  18. */
  19. @Override
  20. public Object deserialize(String topic, byte[] data) {
  21. return Converter.byteArray2Bean(data);
  22. }
  23. @Override
  24. public void close() {
  25. }
  26. }

注:

  • 反序列化类主要是实现org.apache.kafka.common.serialization.Deserializer这个类
    • deserialize 方法:实现反序列化的方法 
  • 反序列化类主要是消息消费者使用,在收到消息消费前,将生产者序列化的消息内容进行反序列化,还原成它本来的样子
  • 序列化和反序列化是一一对应的,消息结构一定要约定好,定义在一个公共模块中

三、消息生产者Producer

1.导入kafka依赖包

在项目maven配置pom.xml中加入:

  1. <!-- kafka包 -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.2.3.RELEASE</version>
  6. </dependency>
  7. <!-- 自定义的公共模块 -->
  8. <dependency>
  9. <groupId>com.shixinke.github.kafka.practise.common</groupId>
  10. <artifactId>kafka-practise-common</artifactId>
  11. <version>0.0.1-SNAPSHOT</version>
  12. </dependency>
2.kafka相关的应用配置
  1. spring.application.name=kafka-practise-producer
  2. spring.kafka.producer.batch-size=16384
  3. spring.kafka.producer.acks=all
  4. spring.kafka.producer.bootstrap-servers=localhost:9092
  5. spring.kafka.producer.buffer-memory=33554432
  6. spring.kafka.producer.client-id=${spring.application.name}
  7. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  8. spring.kafka.producer.value-serializer=com.shixinke.github.kafka.practise.common.util.KafkaObjectSerializer
  • spring.kafka.producer.batch-size : 批量发送消息的最大容量
  • spring.kafka.producer.acks : 消息确认方式
  • spring.kafka.producer.bootstrap-servers : 连接的broker地址
  • spring.kafka.producer.buffer-memory : 发送缓冲区大小
  • spring.kafka.producer.client-id : 当前应用的标识
  • spring.kafka.producer.key-serializer : 消息key的序列化类
  • spring.kafka.producer.value-serializer : 消息内容的序列化类(指定到公共模块的序列化类)
3.编写kafka配置类
(1)配置参数类
  1. import lombok.Data;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @author shixinke
  6. * @version 1.0
  7. * @Description
  8. * @Date 19-2-2 下午4:33
  9. */
  10. @Component
  11. @ConfigurationProperties(prefix = "spring.kafka.producer")
  12. @Data
  13. public class KafkaProducerProperties {
  14. private String batchSize;
  15. private String acks;
  16. private String bootstrapServers;
  17. private Long bufferMemory;
  18. private String clientId;
  19. private String keySerializer;
  20. private String valueSerializer;
  21. }
  • 对应应用配置中的配置项
(2)kafka对象配置类
  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import javax.annotation.Resource;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10. * @author shixinke
  11. * @version 1.0
  12. * @Description
  13. * @Date 19-2-2 下午4:31
  14. */
  15. @Configuration
  16. public class KafkaProducerConfiguration {
  17. @Resource
  18. private KafkaProducerProperties kafkaProducerProperties;
  19. public Map<String, Object> producerConfig() {
  20. Map<String, Object> props = new HashMap<String, Object>(8);
  21. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBootstrapServers());
  22. props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerProperties.getAcks());
  23. props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerProperties.getBatchSize());
  24. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProducerProperties.getBufferMemory());
  25. props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerProperties.getClientId());
  26. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerProperties.getKeySerializer());
  27. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerProperties.getValueSerializer());
  28. return props;
  29. }
  30. @Bean
  31. public <V> KafkaTemplate<String, V> kafkaTemplate() {
  32. return new KafkaTemplate<String, V>(new DefaultKafkaProducerFactory<String, V>(producerConfig()));
  33. }
  34. }
  • 通过kafka配置参数类来实例化KafkaTemplate类(调用生产者对象的类)
4.生产者服务类
(1)消息生产者服务接口
  1. import org.apache.kafka.clients.producer.RecordMetadata;
  2. import java.util.concurrent.Future;
  3. /**
  4. * @author shixinke
  5. * @version 1.0
  6. * @Description
  7. * @Date 19-2-1 下午2:05
  8. */
  9. public interface KafkaMQProducerService {
  10. public <K, V> Future<RecordMetadata> send(String topic, K key, V content);
  11. }
(2)消息生产者服务实现类
  1. import com.shixinke.github.kafka.practise.producer.service.KafkaMQProducerService;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.RecordMetadata;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.stereotype.Service;
  6. import javax.annotation.Resource;
  7. import java.util.concurrent.Future;
  8. /**
  9. * @author shixinke
  10. * @version 1.0
  11. * @Description
  12. * @Date 19-2-1 下午2:06
  13. */
  14. @Service
  15. public class KafkaMQProducerServiceImpl implements KafkaMQProducerService {
  16. /**
  17. * 在生产者配置中声明的KafkaTemplate的Bean类
  18. */
  19. @Resource
  20. private KafkaTemplate kafkaProducer;
  21. @Override
  22. public <K, V> Future<RecordMetadata> send(String topic, K key, V content) {
  23. ProducerRecord<String, V> record = new ProducerRecord<String, V>(topic, String.valueOf(key), content);
  24. return kafkaProducer.send(record);
  25. }
  26. }
5.业务服务(调用消息生产者的类)
(1)业务服务接口
  1. import com.shixinke.github.kafka.practise.common.data.Result;
  2. import com.shixinke.github.kafka.practise.producer.dto.UserDTO;
  3. public interface UserService {
  4. public Result create(UserDTO userDTO);
  5. }
(2)业务服务实现
  1. mport com.shixinke.github.kafka.practise.common.data.Result;
  2. import com.shixinke.github.kafka.practise.common.data.User;
  3. import com.shixinke.github.kafka.practise.producer.dto.UserDTO;
  4. import com.shixinke.github.kafka.practise.producer.service.KafkaMQProducerService;
  5. import com.shixinke.github.kafka.practise.producer.service.UserService;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.Resource;
  8. import java.time.Instant;
  9. /**
  10. * @author shixinke
  11. * @version 1.0
  12. * @Description
  13. * @Date 19-2-2 下午4:54
  14. */
  15. @Service
  16. public class UserServiceImpl implements UserService {
  17. @Resource
  18. private KafkaMQProducerService producerService;
  19. @Override
  20. public Result create(UserDTO userDTO) {
  21. User user = new User(userDTO.getNickname(), userDTO.getEmail(), userDTO.getAccount(), userDTO.getPassword(), 0, Instant.now().getEpochSecond());
  22. /**
  23. * 数据操作在此不再赘述(插入数据库,更新缓存)
  24. */
  25. user.setUserId(3000001);
  26. /**
  27. * 上面操作成功后,向消息队列发送一条消息,比如通知邮件服务要发邮件
  28. */
  29. producerService.send("user_reg", user.getUserId(), user);
  30. return Result.success(user.getUserId());
  31. }
  32. }
6.触发生产消息的事件
(1)接收用户端参数
  1. @Data
  2. public class UserDTO {
  3. private String nickname;
  4. private String account;
  5. private String email;
  6. private String password;
  7. @Override
  8. public String toString() {
  9. return "UserDTO{" +
  10. "nickname='" + nickname + '\'' +
  11. ", account='" + account + '\'' +
  12. ", email='" + email + '\'' +
  13. ", password='" + password + '\'' +
  14. '}';
  15. }
  16. }
(2)接收用户请求并处理
  1. @RestController
  2. @RequestMapping("/user")
  3. @Slf4j
  4. public class UserController {
  5. @Resource
  6. private UserService userService;
  7. @PostMapping("register")
  8. public Result register(UserDTO userDTO) {
  9. Result result = null;
  10. try {
  11. result = userService.create(userDTO);
  12. } catch (Exception ex) {
  13. log.error("注册出错:", ex);
  14. result = Result.error(ex.getMessage());
  15. }
  16. return result;
  17. }
  18. }
(3)触发事件

在浏览器或其他工具中模拟用户注册:

在postman中模拟用户注册接口

四、消息消费者Consumer

1.导入kafka依赖包

在项目maven配置pom.xml中加入:

  1. <!-- kafka包 -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.2.3.RELEASE</version>
  6. </dependency>
  7. <!-- 自定义的公共模块 -->
  8. <dependency>
  9. <groupId>com.shixinke.github.kafka.practise.common</groupId>
  10. <artifactId>kafka-practise-common</artifactId>
  11. <version>0.0.1-SNAPSHOT</version>
  12. </dependency>
2.项目中kafka消费者相关配置
  1. spring.application.name=kafka-practise-consumer
  2. spring.kafka.consumer.bootstrap-servers=localhost:9092
  3. spring.kafka.consumer.group-id=user
  4. spring.kafka.consumer.client-id=${spring.application.name}
  5. spring.kafka.consumer.subscribe-topics=user_reg,user_update
  6. spring.kafka.consumer.enable-auto-commit=false
  7. spring.kafka.consumer.auto-commit-interval-ms=1000
  8. spring.kafka.consumer.concurrency=2
  9. spring.kafka.consumer.auto-offset-reset=earliest
  10. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  11. spring.kafka.consumer.value-deserializer=com.shixinke.github.kafka.practise.common.util.KafkaObjectDeserializer
  • spring.kafka.consumer.bootstrap-servers : 连接的broker地址
  • spring.kafka.consumer.group-id : 消费者组名称
  • spring.kafka.consumer.client-id : 当前消费者标识
  • spring.kafka.consumer.subscribe-topics : 消费的主题(自定义选项)
  • spring.kafka.consumer.enable-auto-commit : 是否自动提交
  • spring.kafka.consumer.auto-commit-interval-ms : 自动提交频率
  • spring.kafka.consumer.concurrency : 并发数
  • spring.kafka.consumer.auto-offset-reset : 当当前偏移量不存在或未指定偏移量时消费的偏移量
  • spring.kafka.consumer.key-deserializer:消息key的反序列化类
  • spring.kafka.consumer.value-deserializer:消息内容的反序列化类(使用公共模块中的反序列化类)
3.应用配置类
(1)消费者配置参数类
  1. import lombok.Data;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. @ConfigurationProperties(prefix = "spring.kafka.consumer")
  6. @Data
  7. public class KafkaConsumerProperties {
  8. private String bootstrapServers;
  9. private String groupId;
  10. private String clientId;
  11. private String enableAutoCommit;
  12. private String autoCommitIntervalMs;
  13. private String autoOffsetReset;
  14. private String keyDeserializer;
  15. private String valueDeserializer;
  16. private String subscribeTopics;
  17. private int concurrency;
  18. }
  • 与应用配置文件中的kafka配置对应
(2)消费者对象配置类
  1. /**
  2. * @author shixinke
  3. * @version 1.0
  4. * @Description
  5. * @Date 19-2-1 下午4:05
  6. */
  7. @Configuration
  8. public class KafkaConsumerConfiguration {
  9. @Resource
  10. private KafkaConsumerProperties kafkaProperties;
  11. /**
  12. * 通用的监听bean
  13. * @return
  14. */
  15. @Bean
  16. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  17. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  18. factory.setConsumerFactory(consumerFactory());
  19. factory.setConcurrency(kafkaProperties.getConcurrency());
  20. factory.getContainerProperties().setPollTimeout(1500);
  21. return factory;
  22. }
  23. /**
  24. * 针对用户模块的监听bean
  25. * @return
  26. */
  27. @Bean("userContainerFactory")
  28. public ConcurrentKafkaListenerContainerFactory userContainerFactory() {
  29. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
  30. factory.setConcurrency(kafkaProperties.getConcurrency());
  31. factory.setBatchListener(true);
  32. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  33. factory.setConsumerFactory(consumerFactory());
  34. return factory;
  35. }
  36. /**
  37. * 消费者工厂
  38. * @return
  39. */
  40. @Bean
  41. public ConsumerFactory<String, String> consumerFactory() {
  42. return new DefaultKafkaConsumerFactory<>(getProperties());
  43. }
  44. /**
  45. * 配置参数设置(用于初始化)
  46. * @return
  47. */
  48. public Map<String, Object> getProperties() {
  49. Map<String, Object> props = new HashMap<String, Object>(8);
  50. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  51. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
  52. props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
  53. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getEnableAutoCommit());
  54. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getAutoOffsetReset());
  55. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaProperties.getAutoCommitIntervalMs());
  56. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getKeyDeserializer());
  57. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getValueDeserializer());
  58. return props;
  59. }
  60. }
4.消息事件监听器(消费实现)
  1. @Component
  2. @Slf4j
  3. public class UserMessageListener {
  4. @KafkaListener(id ="user_reg", topics = "user_reg", containerFactory = "userContainerFactory")
  5. public void subscribeCreateUser(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
  6. try {
  7. log.info("收到消息:{}", records);
  8. /**
  9. * 手动提交偏移量
  10. */
  11. ack.acknowledge();
  12. } catch (Exception ex) {
  13. try {
  14. ack.wait();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. }
  • 通过KafkaListener这个注解来标识这是一个消息事件监听
    • List> records : 消息内容列表
    • Acknowledgment ack : 消息确认对象
  1. 收到消息:[ConsumerRecord(topic = user_reg, partition = 0, offset = 1, CreateTime = 1549155259062, serialized key size = 7, serialized value size = 250, headers = RecordHeaders(headers = [], isReadOnly = false), key = 3000001, value = User(userId=3000001, nickname=诗心客, email=ishixinke@qq.com, account=shixinke, password=123456, status=0, createTime=1549155258))]