使用spring boot 整合kafka,延遲啟動(dòng)消費(fèi)者
spring boot 整合kafka,延遲啟動(dòng)消費(fèi)者
spring boot整合kafka的時(shí)候一般使用@KafkaListener來設(shè)置消費(fèi)者,但是這種方式在spring啟動(dòng)的時(shí)候就會(huì)立即開啟消費(fèi)者。如果有需要根據(jù)配置信息延遲開啟指定的消費(fèi)者就不能使用這種方式。
參考了類:KafkaListenerAnnotationBeanPostProcessor,我提取了一部分代碼??梢愿鶕?jù)需要隨時(shí)動(dòng)態(tài)的開啟消費(fèi)者。還可以很方便的啟動(dòng)多個(gè)消費(fèi)者。
為了方便使用,我自定義了一個(gè)注解:
import org.springframework.kafka.annotation.TopicPartition;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DelayKafkaConsumer {
String id() default "";
String[] topics() default {};
String errorHandler() default "";
String groupId() default "";
TopicPartition[] topicPartitions() default {};
String beanRef() default "__listener";
}
配合注解使用的factory:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.config.*;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.format.Formatter;
import org.springframework.format.FormatterRegistry;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.*;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware {
private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class);
private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar;
private final AtomicInteger counter = new AtomicInteger();
private BeanFactory beanFactory;
private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
private BeanExpressionContext expressionContext;
private final ListenerScope listenerScope = new ListenerScope();
private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
new KafkaHandlerMethodFactoryAdapter();
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
this.kafkaListenerEndpointRegistrar = registrar;
addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService);
}
public void startConsumer(KafkaListenerEndpoint endpoint){
kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
}
public void startConsumer(Object target){
logger.info("start consumer {} ...",target.getClass());
Class<?> targetClass = AopUtils.getTargetClass(target);
Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() {
@Override
public Set<DelayKafkaConsumer> inspect(Method method) {
Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
}
});
if (annotatedMethods.size()==0)
throw new IllegalArgumentException(target.getClass()+" need have method with @DelayKafkaConsumer");
for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
logger.info("find message listen handler method : {} , object : {}",method.getName(),target.getClass());
for (DelayKafkaConsumer listener : entry.getValue()) {
if(listener.topics().length==0) {
logger.info("topics value is empty , will skip it , method : {} , target object : {}",method.getName(),target.getClass());
continue;
}
processKafkaListener(listener,method,target);
logger.info("register method {} success , target object : {}",method.getName(),target.getClass());
}
}
logger.info("{} consumer start complete .",target.getClass());
}
protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
}
processListener(endpoint, kafkaListener, bean, methodToUse);
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean,
Object adminTarget) {
String beanRef = kafkaListener.beanRef();
if (StringUtils.hasText(beanRef)) {
this.listenerScope.addListener(beanRef, bean);
}
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
}
}
private String getEndpointId(DelayKafkaConsumer kafkaListener) {
if (StringUtils.hasText(kafkaListener.id())) {
return resolve(kafkaListener.id());
}
else {
return "Custom-Consumer" + this.counter.getAndIncrement();
}
}
private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) {
String groupId = null;
if (StringUtils.hasText(kafkaListener.groupId())) {
groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
}
if (groupId == null && StringUtils.hasText(kafkaListener.id())) {
groupId = id;
}
return groupId;
}
private String[] resolveTopics(DelayKafkaConsumer kafkaListener) {
String[] topics = kafkaListener.topics();
List<String> result = new ArrayList<>();
if (topics.length > 0) {
for (int i = 0; i < topics.length; i++) {
Object topic = resolveExpression(topics[i]);
resolveAsString(topic, result);
}
}
return result.toArray(new String[result.size()]);
}
private void resolveAsString(Object resolvedValue, List<String> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolveAsString(object, result);
}
}
else if (resolvedValue instanceof String) {
result.add((String) resolvedValue);
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolveAsString(object, result);
}
}
else {
throw new IllegalArgumentException(String.format(
"@DelayKafkaConsumer can't resolve '%s' as a String", resolvedValue));
}
}
private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) {
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
List<TopicPartitionInitialOffset> result = new ArrayList<>();
if (topicPartitions.length > 0) {
for (TopicPartition topicPartition : topicPartitions) {
result.addAll(resolveTopicPartitionsList(topicPartition));
}
}
return result.toArray(new TopicPartitionInitialOffset[result.size()]);
}
private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
Object topic = resolveExpression(topicPartition.topic());
Assert.state(topic instanceof String,
"topic in @TopicPartition must resolve to a String, not " + topic.getClass());
Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
String[] partitions = topicPartition.partitions();
PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
"At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
List<TopicPartitionInitialOffset> result = new ArrayList<>();
for (int i = 0; i < partitions.length; i++) {
resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);
}
for (PartitionOffset partitionOffset : partitionOffsets) {
Object partitionValue = resolveExpression(partitionOffset.partition());
Integer partition;
if (partitionValue instanceof String) {
Assert.state(StringUtils.hasText((String) partitionValue),
"partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
partition = Integer.valueOf((String) partitionValue);
}
else if (partitionValue instanceof Integer) {
partition = (Integer) partitionValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
topic, partitionOffset.partition(), partitionValue.getClass()));
}
Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
Long initialOffset;
if (initialOffsetValue instanceof String) {
Assert.state(StringUtils.hasText((String) initialOffsetValue),
"'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
initialOffset = Long.valueOf((String) initialOffsetValue);
}
else if (initialOffsetValue instanceof Long) {
initialOffset = (Long) initialOffsetValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
}
Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
Boolean relativeToCurrent;
if (relativeToCurrentValue instanceof String) {
relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
}
else if (relativeToCurrentValue instanceof Boolean) {
relativeToCurrent = (Boolean) relativeToCurrentValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
}
TopicPartitionInitialOffset topicPartitionOffset =
new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent);
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
}
}
return result;
}
private void resolvePartitionAsInteger(String topic, Object resolvedValue,
List<TopicPartitionInitialOffset> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
}
}
else if (resolvedValue instanceof String) {
Assert.state(StringUtils.hasText((String) resolvedValue),
"partition in @TopicPartition for topic '" + topic + "' cannot be empty");
result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
}
else if (resolvedValue instanceof Integer[]) {
for (Integer partition : (Integer[]) resolvedValue) {
result.add(new TopicPartitionInitialOffset(topic, partition));
}
}
else if (resolvedValue instanceof Integer) {
result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
}
}
else {
throw new IllegalArgumentException(String.format(
"@DelayKafkaConsumer for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
}
}
private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) {
Set<DelayKafkaConsumer> listeners = new HashSet<>();
DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer.class);
if (ann != null) {
listeners.add(ann);
}
return listeners;
}
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
try {
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<?> iface : proxiedInterfaces) {
try {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
catch (NoSuchMethodException noMethod) {
}
}
}
catch (SecurityException ex) {
ReflectionUtils.handleReflectionException(ex);
}
catch (NoSuchMethodException ex) {
throw new IllegalStateException(String.format(
"target method '%s' found on bean target class '%s', " +
"but not found in any interface(s) for bean JDK proxy. Either " +
"pull the method up to an interface or switch to subclass (CGLIB) " +
"proxies by setting proxy-target-class/proxyTargetClass " +
"attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);
}
}
return method;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
this.listenerScope);
}
}
private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
}
else {
throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
}
private Object resolveExpression(String value) {
String resolvedValue = resolve(value);
return this.resolver.evaluate(resolvedValue, this.expressionContext);
}
/**
* Resolve the specified value if possible.
* @param value the value to resolve
* @return the resolved value
* @see ConfigurableBeanFactory#resolveEmbeddedValue
*/
private String resolve(String value) {
if (this.beanFactory instanceof ConfigurableBeanFactory) {
return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
}
return value;
}
private void addFormatters(FormatterRegistry registry) {
for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
registry.addConverter(converter);
}
for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
registry.addConverter(converter);
}
for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter.class)) {
registry.addFormatter(formatter);
}
}
private <T> Collection<T> getBeansOfType(Class<T> type) {
if (this.beanFactory instanceof ListableBeanFactory) {
return ((ListableBeanFactory) this.beanFactory).getBeansOfType(type).values();
}else {
return Collections.emptySet();
}
}
private static class ListenerScope implements Scope {
private final Map<String, Object> listeners = new HashMap<>();
ListenerScope() {
super();
}
public void addListener(String key, Object bean) {
this.listeners.put(key, bean);
}
public void removeListener(String key) {
this.listeners.remove(key);
}
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
return this.listeners.get(name);
}
@Override
public Object remove(String name) {
return null;
}
@Override
public void registerDestructionCallback(String name, Runnable callback) {
}
@Override
public Object resolveContextualObject(String key) {
return this.listeners.get(key);
}
@Override
public String getConversationId() {
return null;
}
}
private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
private MessageHandlerMethodFactory messageHandlerMethodFactory;
public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
}
@Override
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
}
private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
if (this.messageHandlerMethodFactory == null) {
this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
}
return this.messageHandlerMethodFactory;
}
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
defaultFactory.setBeanFactory(MyKafkaConsumerFactory.this.beanFactory);
ConfigurableBeanFactory cbf =
(MyKafkaConsumerFactory.this.beanFactory instanceof ConfigurableBeanFactory ?
(ConfigurableBeanFactory) MyKafkaConsumerFactory.this.beanFactory : null);
defaultFactory.setConversionService(this.defaultFormattingConversionService);
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
// Annotation-based argument resolution
argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
argumentResolvers.add(new HeadersMethodArgumentResolver());
// Type-based argument resolution
final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
@Override
protected boolean isEmptyPayload(Object payload) {
return payload == null || payload instanceof KafkaNull;
}
});
defaultFactory.setArgumentResolvers(argumentResolvers);
defaultFactory.afterPropertiesSet();
return defaultFactory;
}
}
}
通過startConsumer來啟動(dòng)一個(gè)消費(fèi)者(多次調(diào)用會(huì)啟動(dòng)多個(gè)消費(fèi)者)。target必須至少包含一個(gè)有@DelayKafkaConsumer注解的方法。這里類似@KafkaListener。我去掉了一部分功能,保留了比較常用的部分。
這里提供了一個(gè)通過注解的方式在spring boot項(xiàng)目中動(dòng)態(tài)控制consumer的方法。還有其他的方法來達(dá)到這種效果,不過我覺得這種方法比較方便。
java項(xiàng)目集成springboot使用kafka消費(fèi)者,啟動(dòng)失敗報(bào)錯(cuò) Failed to construct kafka consumer
之前博客里面提到本公司為物聯(lián)網(wǎng)項(xiàng)目。項(xiàng)目中使用mqtt+kafka進(jìn)行與設(shè)備端的通訊,之前的協(xié)議格式為json格式,現(xiàn)在改成字節(jié)數(shù)組byte[]格式進(jìn)行通信。
集成springboot后,具體的demo網(wǎng)上很多,接下來有時(shí)間會(huì)出一份kafka的demo。
報(bào)錯(cuò)信息如下:
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer
原因分析:
之前json格式通信時(shí)候,構(gòu)建kafka消費(fèi)工廠的時(shí)候,其中ConcurrentMessageListenerContainer的key為String類型,而value現(xiàn)在為byte[]類型,所以構(gòu)建消費(fèi)者工廠的時(shí)候需要指定正確的value類型。
代碼如下:
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
factory.setConsumerFactory(consumerByteFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
整體kafka生產(chǎn)者+kafka消費(fèi)者的demo會(huì)在接下來的博客中陸續(xù)整理。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
JAVA SpringBoot統(tǒng)一日志處理原理詳解
這篇文章主要介紹了SpringBoot的統(tǒng)一日志處理原理,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-09-09
解決Eclipse發(fā)布到Tomcat丟失依賴jar包的問題
這篇文章介紹了如何在Eclipse中配置部署裝配功能,以確保在將Web項(xiàng)目發(fā)布到Tomcat服務(wù)器時(shí)不會(huì)丟失任何依賴jar包,通過手動(dòng)配置或使用構(gòu)建工具腳本,可以自動(dòng)化這個(gè)過程,提高開發(fā)效率和應(yīng)用程序的穩(wěn)定性,感興趣的朋友跟隨小編一起看看吧2025-01-01
基于Java實(shí)現(xiàn)進(jìn)制轉(zhuǎn)換工具類的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何基于Java實(shí)現(xiàn)進(jìn)制轉(zhuǎn)換工具類,從而實(shí)現(xiàn)減少參數(shù)長度的效果,文中的示例代碼講解詳細(xì),需要的可以參考一下2023-02-02
解決springboot 實(shí)體類String轉(zhuǎn)Date類型的坑
這篇文章主要介紹了解決springboot 實(shí)體類String轉(zhuǎn)Date類型的坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10
java數(shù)據(jù)庫開發(fā)之JDBC的完整封裝兼容多種數(shù)據(jù)庫
這篇文章主要介紹了java數(shù)據(jù)庫開發(fā)之JDBC的完整封裝兼容多種數(shù)據(jù)庫,需要的朋友可以參考下2020-02-02
Activiti7與Spring以及Spring Boot整合開發(fā)
這篇文章主要介紹了Activiti7與Spring以及Spring Boot整合開發(fā),在Activiti中核心類的是ProcessEngine流程引擎,與Spring整合就是讓Spring來管理ProcessEngine,有感興趣的同學(xué)可以參考閱讀2023-03-03

