Springboot整合kafka的示例代碼
1. 整合kafka
1、引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>2、設(shè)置yml文件
spring:
application:
name: demo
kafka:
bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
producer: # producer 生產(chǎn)者
retries: 0 # 重試次數(shù)
acks: 1 # 應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 生產(chǎn)端緩沖區(qū)大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: com.itheima.demo.config.MySerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: # consumer消費(fèi)者
group-id: javagroup # 默認(rèn)的消費(fèi)組ID
enable-auto-commit: true # 是否自動(dòng)提交offset
auto-commit-interval: 100 # 提交offset延時(shí)(接收到消息后多久提交offset)
# earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)
# latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
# none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: com.itheima.demo.config.MyDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer3、啟動(dòng)項(xiàng)目

2. 消息發(fā)送
2.1 發(fā)送類型
KafkaTemplate調(diào)用send時(shí)默認(rèn)采用異步發(fā)送,如果需要同步獲取發(fā)送結(jié)果,調(diào)用get方法
異步發(fā)送生產(chǎn)者:
@RestController
public class KafkaProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/test/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
Message message = new Message();
message.setMessage(msg);
kafkaTemplate.send("test", JSON.toJSONString(message));
}
}同步發(fā)送生產(chǎn)者:
//測(cè)試同步發(fā)送與監(jiān)聽(tīng)
@RestController
public class AsyncProducer {
private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
//同步發(fā)送
@GetMapping("/kafka/sync/{msg}")
public void sync(@PathVariable("msg") String msg) throws Exception {
Message message = new Message();
message.setMessage(msg);
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
//注意,可以設(shè)置等待時(shí)間,超出后,不再等候結(jié)果
SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
logger.info("send result:{}",result.getProducerRecord().value());
}
}
消費(fèi)者:
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默認(rèn)取yml里配置的
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("message:{}", msg);
}
}
}那么我們?cè)趺纯闯鰜?lái)同步發(fā)送和異步發(fā)送的區(qū)別呢?
①首先在服務(wù)器上,將kafka暫停服務(wù)。
②在swagger發(fā)送消息
調(diào)同步發(fā)送:請(qǐng)求被阻斷,一直等待,超時(shí)后返回錯(cuò)誤

而調(diào)異步發(fā)送的(默認(rèn)發(fā)送接口),請(qǐng)求立刻返回。

那么,異步發(fā)送的消息怎么確認(rèn)發(fā)送情況呢?
我們使用注冊(cè)監(jiān)聽(tīng)
即新建一個(gè)類:KafkaListener.java
@Configuration
public class KafkaListener {
private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);
@Autowired
KafkaTemplate kafkaTemplate;
//配置監(jiān)聽(tīng)
@PostConstruct
private void listener() {
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
logger.info("ok,message={}", producerRecord.value());
}
public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
logger.error("error!message={}", producerRecord.value());
});
}
}查看控制臺(tái),等待一段時(shí)間后,異步發(fā)送失敗的消息會(huì)被回調(diào)給注冊(cè)過(guò)的listener

如果是正常發(fā)送異步消息,則會(huì)獲得該消息。可以看到,在內(nèi)部類 KafkaListener$1 中,即注冊(cè)的Listener的消息。

2.2 序列化
消費(fèi)者使用:KafkaConsumer.java
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默認(rèn)取yml里配置的
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("message:{}", msg);
}
}
}1)序列化詳解
- 前面用到的是Kafka自帶的字符串序列化器(
org.apache.kafka.common.serialization.StringSerializer) - 除此之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
- 這些序列化器都實(shí)現(xiàn)了接口(
org.apache.kafka.common.serialization.Serializer) - 基本上,可以滿足絕大多數(shù)場(chǎng)景
2)自定義序列化
自己實(shí)現(xiàn),實(shí)現(xiàn)對(duì)應(yīng)的接口即可,有以下方法:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, Boolean isKey) {
}
//理論上,只實(shí)現(xiàn)這個(gè)即可正常運(yùn)行
byte[] serialize(String var1, T var2);
//默認(rèn)調(diào)上面的方法
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}
default void close() {
}
}我們來(lái)自己實(shí)現(xiàn)一個(gè)序列化器:MySerializer.java
public class MySerializer implements Serializer {
@Override
public byte[] serialize(String s, Object o) {
String json = JSON.toJSONString(o);
return json.getBytes();
}
}3)解碼MyDeserializer.java,實(shí)現(xiàn)方式與編碼器幾乎一樣.
public class MyDeserializer implements Deserializer {
private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);
@Override
public Object deserialize(String s, byte[] bytes) {
try {
String json = new String(bytes,"utf-8");
return JSON.parse(json);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}4)在yaml中配置自己的編碼器、解碼器

再次收發(fā),消息正常

2.3 分區(qū)策略
分區(qū)策略決定了消息根據(jù)key投放到哪個(gè)分區(qū),也是順序消費(fèi)保障的基石。
- 給定了分區(qū)號(hào),直接將數(shù)據(jù)發(fā)送到指定的分區(qū)里面去
- 沒(méi)有給定分區(qū)號(hào),給定數(shù)據(jù)的key值,通過(guò)key取上hashCode進(jìn)行分區(qū)
- 既沒(méi)有給定分區(qū)號(hào),也沒(méi)有給定key值,直接輪循進(jìn)行分區(qū)(默認(rèn))
- 自定義分區(qū),你想怎么做就怎么做
1)驗(yàn)證默認(rèn)分區(qū)規(guī)則
發(fā)送者代碼參考:PartitionProducer.java
//測(cè)試分區(qū)發(fā)送
@RestController
public class PartitionProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
// 指定分區(qū)發(fā)送
// 不管你key是什么,到同一個(gè)分區(qū)
@GetMapping("/kafka/partitionSend/{key}")
public void setPartition(@PathVariable("key") String key) {
kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0號(hào)分區(qū)");
}
// 指定key發(fā)送,不指定分區(qū)
// 根據(jù)key做hash,相同的key到同一個(gè)分區(qū)
@GetMapping("/kafka/keysend/{key}")
public void setKey(@PathVariable("key") String key) {
kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分區(qū)");
}消費(fèi)者代碼使用:PartitionConsumer.java
@Component
public class PartitionConsumer {
private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);
//分區(qū)消費(fèi)
@KafkaListener(topics = {"test"},topicPattern = "0")
public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("partition=0,message:[{}]", msg);
}
}
@KafkaListener(topics = {"test"},topicPattern = "1")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("partition=1,message:[{}]", msg);
}
}
}通過(guò)swagger訪問(wèn)setKey(也就是只給了key的方法):

可以看到key相同的被hash到了同一個(gè)分區(qū)
再訪問(wèn)setPartition來(lái)設(shè)置分區(qū)號(hào)0來(lái)發(fā)送:

可以看到無(wú)論key是什么,都是分區(qū)0來(lái)消費(fèi)
2)自定義分區(qū)
參考代碼:MyPartitioner.java , MyPartitionTemplate.java。
發(fā)送使用:MyPartitionProducer.java。
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 定義自己的分區(qū)策略
// 如果key以0開(kāi)頭,發(fā)到0號(hào)分區(qū)
// 其他都扔到1號(hào)分區(qū)
String keyStr = key+"";
if (keyStr.startsWith("0")){
return 0;
}else {
return 1;
}
}
public void close() {
public void configure(Map<String, ?> map) {
}@Configuration
public class MyPartitionTemplate {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
KafkaTemplate kafkaTemplate;
@PostConstruct
public void setKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//注意分區(qū)器在這里?。?!
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
}
public KafkaTemplate getKafkaTemplate(){
return kafkaTemplate;
}//測(cè)試自定義分區(qū)發(fā)送
@RestController
public class MyPartitionProducer {
@Autowired
MyPartitionTemplate template;
// 使用0開(kāi)頭和其他任意字母開(kāi)頭的key發(fā)送消息
// 看控制臺(tái)的輸出,在哪個(gè)分區(qū)里?
@GetMapping("/kafka/myPartitionSend/{key}")
public void setPartition(@PathVariable("key") String key) {
template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定義分區(qū)策略");
}
}使用swagger,發(fā)送0開(kāi)頭和非0開(kāi)頭兩種key

3. 消息消費(fèi)
3.1 消息組別
發(fā)送者使用:KafkaProducer.java
@RestController
public class KafkaProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/test/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
Message message = new Message();
message.setMessage(msg);
kafkaTemplate.send("test", JSON.toJSONString(message));
}
}1)代碼參考:GroupConsumer.java,Listener拷貝3份,分別賦予兩組group,驗(yàn)證分組消費(fèi):
//測(cè)試組消費(fèi)
@Component
public class GroupConsumer {
private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);
//組1,消費(fèi)者1
@KafkaListener(topics = {"test"},groupId = "group1")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("group:group1-1 , message:{}", msg);
}
}
//組1,消費(fèi)者2
public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
logger.info("group:group1-2 , message:{}", msg);
//組2,只有一個(gè)消費(fèi)者
@KafkaListener(topics = {"test"},groupId = "group2")
public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
logger.info("group:group2 , message:{}", msg);
}2)啟動(dòng)

3)通過(guò)swagger發(fā)送2條消息

- 同一group下的兩個(gè)消費(fèi)者,在group1均分消息
- group2下只有一個(gè)消費(fèi)者,得到全部消息
4)消費(fèi)端閑置
注意分區(qū)數(shù)與消費(fèi)者數(shù)的搭配,如果 ( 消費(fèi)者數(shù) > 分區(qū)數(shù)量 ),將會(huì)出現(xiàn)消費(fèi)者閑置(因?yàn)橐粋€(gè)分區(qū)只能分配給一個(gè)消費(fèi)者),浪費(fèi)資源!
驗(yàn)證方式:
停掉項(xiàng)目,刪掉test主題,重新建一個(gè) ,這次只給它分配一個(gè)分區(qū)。
重新發(fā)送兩條消息,試一試

- group2可以消費(fèi)到1、2兩條消息
- group1下有兩個(gè)消費(fèi)者,但是只分配給了 1 , 2這個(gè)進(jìn)程被閑置
3.2 位移提交
1)自動(dòng)提交
前面的案例中,我們?cè)O(shè)置了以下兩個(gè)選項(xiàng),則kafka會(huì)按延時(shí)設(shè)置自動(dòng)提交
enable-auto-commit: true # 是否自動(dòng)提交offset auto-commit-interval: 100 # 提交offset延時(shí)(接收到消息后多久提交offset,默認(rèn)單位為ms)
2)手動(dòng)提交
有些時(shí)候,我們需要手動(dòng)控制偏移量的提交時(shí)機(jī),比如確保消息嚴(yán)格消費(fèi)后再提交,以防止丟失或重復(fù)。
下面我們自己定義配置,覆蓋上面的參數(shù)
代碼參考:MyOffsetConfig.java
@Configuration
public class MyOffsetConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 注意這里?。?!設(shè)置手動(dòng)提交
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
// ack模式:
// AckMode針對(duì)ENABLE_AUTO_COMMIT_CONFIG=false時(shí)生效,有以下幾種:
//
// RECORD
// 每處理一條commit一次
// BATCH(默認(rèn))
// 每次poll的時(shí)候批量提交一次,頻率取決于每次poll的調(diào)用頻率
// TIME
// 每次間隔ackTime的時(shí)間去commit(跟auto commit interval有什么區(qū)別呢?)
// COUNT
// 累積達(dá)到ackCount次的ack去commit
// COUNT_TIME
// ackTime或ackCount哪個(gè)條件先滿足,就commit
// MANUAL
// listener負(fù)責(zé)ack,但是背后也是批量上去
// MANUAL_IMMEDIATE
// listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}然后通過(guò)在消費(fèi)端的Consumer來(lái)提交偏移量
MyOffsetConsumer:
@Component
public class MyOffsetConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory")
public void manualCommit(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
logger.info("手動(dòng)提交偏移量 , partition={}, msg={}", partition, message);
// 同步提交
consumer.commitSync();
//異步提交
//consumer.commitAsync();
// ack提交也可以,會(huì)按設(shè)置的ack策略走(參考MyOffsetConfig.java里的ack模式)
// ack.acknowledge();
}
@KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory")
public void noCommit(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
logger.info("忘記提交偏移量, partition={}, msg={}", partition, message);
// 不做commit!
/**
* 現(xiàn)實(shí)狀況:
* commitSync和commitAsync組合使用
* <p>
* 手工提交異步 consumer.commitAsync();
* 手工同步提交 consumer.commitSync()
* commitSync()方法提交最后一個(gè)偏移量。在成功提交或碰到無(wú)怯恢復(fù)的錯(cuò)誤之前,
* commitSync()會(huì)一直重試,但是commitAsync()不會(huì)。
* 一般情況下,針對(duì)偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會(huì)有太大問(wèn)題
* 因?yàn)槿绻峤皇∈且驗(yàn)榕R時(shí)問(wèn)題導(dǎo)致的,那么后續(xù)的提交總會(huì)有成功的。
* 但如果這是發(fā)生在關(guān)閉消費(fèi)者或再均衡前的最后一次提交,就要確保能夠提交成功。否則就會(huì)造成重復(fù)消費(fèi)
* 因此,在消費(fèi)者關(guān)閉前一般會(huì)組合使用commitAsync()和commitSync()。
*/
// @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory")
public void manualOffset(@Payload String message,
try {
logger.info("同步異步搭配 , partition={}, msg={}", partition, message);
//先異步提交
consumer.commitAsync();
//繼續(xù)做別的事
} catch (Exception e) {
System.out.println("commit failed");
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
* 甚至可以手動(dòng)提交,指定任意位置的偏移量
* 不推薦日常使用?。?!
// @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory")
public void offset(ConsumerRecord record, Consumer consumer) {
logger.info("手動(dòng)指定任意偏移量, partition={}, msg={}", record.partition(), record);
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(currentOffset);
}3)重復(fù)消費(fèi)問(wèn)題
如果手動(dòng)提交模式被打開(kāi),一定不要忘記提交偏移量。否則會(huì)造成重復(fù)消費(fèi)!
用km將test主題刪除,新建一個(gè)test空主題。方便觀察消息偏移 注釋掉其他Consumer的Component注解,只保留當(dāng)前MyOffsetConsumer.java 啟動(dòng)項(xiàng)目,使用swagger的KafkaProducer發(fā)送連續(xù)幾條消息 留心控制臺(tái),都能消費(fèi),沒(méi)問(wèn)題:

但是!重啟項(xiàng)目:

無(wú)論重啟多少次,不提交偏移量的消費(fèi)組,會(huì)重復(fù)消費(fèi)一遍!?。?/p>
再通過(guò)命令行查詢偏移量

4)經(jīng)驗(yàn)與總結(jié)
commitSync()方法,即同步提交,會(huì)提交最后一個(gè)偏移量。在成功提交或碰到無(wú)怯恢復(fù)的錯(cuò)誤之前,commitSync()會(huì)一直重試,但是commitAsync()不會(huì)。
這就造成一個(gè)陷阱:
如果異步提交,針對(duì)偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會(huì)有太大問(wèn)題,因?yàn)槿绻峤皇∈且驗(yàn)榕R時(shí)問(wèn)題導(dǎo)致的,那么后續(xù)的提交總會(huì)有成功的。只要成功一次,偏移量就會(huì)提交上去。
但是!如果這是發(fā)生在關(guān)閉消費(fèi)者時(shí)的最后一次提交,就要確保能夠提交成功,如果還沒(méi)提交完就停掉了進(jìn)程。就會(huì)造成重復(fù)消費(fèi)!
因此,在消費(fèi)者關(guān)閉前一般會(huì)組合使用commitAsync()和commitSync()。
詳細(xì)代碼參考:MyOffsetConsumer.manualOffset()
到此這篇關(guān)于Springboot整合kafka的文章就介紹到這了,更多相關(guān)Springboot整合kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java項(xiàng)目依賴包選擇具體實(shí)現(xiàn)類示例介紹
這篇文章主要為大家介紹了java項(xiàng)目依賴包選擇具體實(shí)現(xiàn)類示例介紹,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
Java 自定義Spring框架與Spring IoC相關(guān)接口分析
Spring框架是由于軟件開(kāi)發(fā)的復(fù)雜性而創(chuàng)建的。Spring使用的是基本的JavaBean來(lái)完成以前只可能由EJB完成的事情。然而,Spring的用途不僅僅限于服務(wù)器端的開(kāi)發(fā)2021-10-10
Java RMI詳細(xì)介紹及簡(jiǎn)單實(shí)例
這篇文章主要介紹了Java RMI詳細(xì)介紹及簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-02-02
創(chuàng)建Spring Boot項(xiàng)目的幾種方式總結(jié)(推薦)
這篇文章主要介紹了創(chuàng)建Spring Boot項(xiàng)目的幾種方式總結(jié)(推薦),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07
@JsonFormat?和?@DateTimeFormat?時(shí)間格式化注解(場(chǎng)景示例代碼)
這篇文章主要介紹了@JsonFormat和@DateTimeFormat時(shí)間格式化注解,本文通過(guò)場(chǎng)景示例代碼詳解給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-05-05
java?時(shí)區(qū)時(shí)間轉(zhuǎn)為UTC的實(shí)現(xiàn)
Java提供了多種方式來(lái)實(shí)現(xiàn)時(shí)區(qū)時(shí)間轉(zhuǎn)換為UTC時(shí)間,包括使用java.util.Date和java.util.Calendar以及Java?8中新增的java.time包,下面就來(lái)介紹一下2024-08-08
Java處理圖片實(shí)現(xiàn)base64編碼轉(zhuǎn)換
這篇文章主要介紹了Java處理圖片實(shí)現(xiàn)base64編碼轉(zhuǎn)換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
簡(jiǎn)單了解Java創(chuàng)建線程兩種方法
這篇文章主要介紹了簡(jiǎn)單了解Java創(chuàng)建線程兩種方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02

