SpringBoot中使用線程池控制主線程與子線程事務(wù)的全過程
一、引言:事務(wù)管理在多線程環(huán)境下的挑戰(zhàn)
1.1 事務(wù)的本質(zhì)與線程安全
在Spring框架中,事務(wù)管理是基于ThreadLocal實(shí)現(xiàn)的。ThreadLocal為每個線程提供了獨(dú)立的變量副本,確保每個線程都能獨(dú)立地操作自己的事務(wù)資源,而不會相互干擾。這種設(shè)計(jì)在單線程環(huán)境下工作得很好,但在多線程環(huán)境下卻帶來了挑戰(zhàn)。
// ThreadLocal在Spring事務(wù)管理中的應(yīng)用示例
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
}1.2 多線程事務(wù)管理的核心問題
在多線程環(huán)境下,事務(wù)管理面臨以下主要挑戰(zhàn):
- 事務(wù)上下文隔離:主線程的事務(wù)上下文無法自動傳播到子線程
- 資源競爭與死鎖:多個線程同時訪問共享數(shù)據(jù)資源
- 事務(wù)一致性保證:如何確保所有線程的操作要么全部成功,要么全部回滾
- 異常處理復(fù)雜度:一個線程的異常如何影響其他線程的事務(wù)狀態(tài)
1.3 SpringBoot事務(wù)管理架構(gòu)概覽
─────────────────────────────────────────────────────────────┐ │ Spring Transaction Architecture │ ├─────────────────────────────────────────────────────────────┤ │ @Transactional │ │ │ │ │ ▼ │ │ TransactionInterceptor │ │ │ │ │ ▼ │ │ PlatformTransactionManager │ │ │ │ │ ▼ │ │ DataSourceTransactionManager / JpaTransactionManager / etc │ │ │ │ │ ▼ │ │ JDBC Connection / JPA EntityManager │ └─────────────────────────────────────────────────────────────┘
二、Spring事務(wù)管理基礎(chǔ)回顧
2.1 聲明式事務(wù)管理
Spring通過@Transactional注解提供聲明式事務(wù)管理,這是最常用的方式:
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
@Transactional
public User createUser(User user) {
// 業(yè)務(wù)邏輯
return userRepository.save(user);
}
@Transactional(readOnly = true)
public User findUserById(Long id) {
return userRepository.findById(id).orElse(null);
}
}2.2 編程式事務(wù)管理
對于更復(fù)雜的事務(wù)控制,Spring提供了編程式事務(wù)管理:
@Service
public class OrderService {
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private TransactionTemplate transactionTemplate;
public void processOrder(Long orderId) {
// 方式1:使用TransactionTemplate
transactionTemplate.execute(status -> {
// 業(yè)務(wù)邏輯
return null;
});
// 方式2:使用PlatformTransactionManager
TransactionDefinition def = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(def);
try {
// 業(yè)務(wù)邏輯
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
}2.3 事務(wù)傳播行為詳解
Spring定義了7種事務(wù)傳播行為,理解這些行為對于多線程事務(wù)管理至關(guān)重要:
| 傳播行為 | 說明 | 適用場景 |
|---|---|---|
| REQUIRED | 支持當(dāng)前事務(wù),如果不存在則創(chuàng)建新事務(wù) | 默認(rèn)設(shè)置,最常用 |
| SUPPORTS | 支持當(dāng)前事務(wù),如果不存在則以非事務(wù)方式執(zhí)行 | 查詢方法 |
| MANDATORY | 支持當(dāng)前事務(wù),如果不存在則拋出異常 | 必須存在事務(wù)的方法 |
| REQUIRES_NEW | 創(chuàng)建新事務(wù),暫停當(dāng)前事務(wù) | 獨(dú)立事務(wù)操作 |
| NOT_SUPPORTED | 以非事務(wù)方式執(zhí)行,暫停當(dāng)前事務(wù) | 不需要事務(wù)支持的操作 |
| NEVER | 以非事務(wù)方式執(zhí)行,如果存在事務(wù)則拋出異常 | 禁止事務(wù)的方法 |
| NESTED | 如果存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行 | 需要部分回滾的場景 |
@Service
public class ComplexService {
@Transactional(propagation = Propagation.REQUIRED)
public void methodA() {
// 方法A的業(yè)務(wù)邏輯
methodB(); // 調(diào)用方法B
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void methodB() {
// 方法B將在獨(dú)立的事務(wù)中執(zhí)行
// 即使methodA回滾,methodB的提交也不會被影響
}
}三、SpringBoot線程池配置與使用
3.1 線程池基礎(chǔ)配置
SpringBoot提供了靈活的線程池配置選項(xiàng):
@Configuration
@EnableAsync
public class ThreadPoolConfig {
/**
* 核心業(yè)務(wù)線程池
*/
@Bean("businessExecutor")
public ThreadPoolTaskExecutor businessExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心線程數(shù):線程池維護(hù)的最小線程數(shù)量
executor.setCorePoolSize(10);
// 最大線程數(shù):線程池允許的最大線程數(shù)量
executor.setMaxPoolSize(50);
// 隊(duì)列容量:當(dāng)線程數(shù)達(dá)到核心線程數(shù)時,新任務(wù)會進(jìn)入隊(duì)列等待
executor.setQueueCapacity(100);
// 線程名前綴:方便日志追蹤
executor.setThreadNamePrefix("business-thread-");
// 拒絕策略:當(dāng)線程池和隊(duì)列都滿了時的處理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 線程空閑時間:非核心線程空閑存活時間
executor.setKeepAliveSeconds(60);
// 等待所有任務(wù)完成后關(guān)閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待任務(wù)完成的超時時間
executor.setAwaitTerminationSeconds(60);
// 初始化線程池
executor.initialize();
return executor;
}
/**
* 事務(wù)處理專用線程池
*/
@Bean("transactionExecutor")
public ThreadPoolTaskExecutor transactionExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("transaction-thread-");
// 使用自定義拒絕策略
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 記錄日志
log.warn("Transaction task rejected: {}", r.toString());
// 嘗試重新執(zhí)行
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
});
executor.initialize();
return executor;
}
}3.2 異步任務(wù)執(zhí)行
@Service
public class AsyncService {
@Async("businessExecutor")
public CompletableFuture<String> asyncMethodWithReturn(String param) {
log.info("Async method started with param: {}", param);
try {
Thread.sleep(1000); // 模擬耗時操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return CompletableFuture.completedFuture("Result for " + param);
}
@Async("transactionExecutor")
public void asyncMethodWithoutReturn() {
log.info("Async method without return value started");
// 執(zhí)行業(yè)務(wù)邏輯
}
}3.3 線程池監(jiān)控與管理
@Component
public class ThreadPoolMonitor {
@Autowired
@Qualifier("businessExecutor")
private ThreadPoolTaskExecutor businessExecutor;
@Scheduled(fixedRate = 30000) // 每30秒監(jiān)控一次
public void monitorThreadPool() {
ThreadPoolExecutor executor = businessExecutor.getThreadPoolExecutor();
log.info("====== Thread Pool Monitor ======");
log.info("Active Threads: {}", executor.getActiveCount());
log.info("Pool Size: {}", executor.getPoolSize());
log.info("Core Pool Size: {}", executor.getCorePoolSize());
log.info("Maximum Pool Size: {}", executor.getMaximumPoolSize());
log.info("Queue Size: {}", executor.getQueue().size());
log.info("Completed Tasks: {}", executor.getCompletedTaskCount());
log.info("Total Tasks: {}", executor.getTaskCount());
log.info("================================");
// 如果隊(duì)列使用率過高,可以動態(tài)調(diào)整
double queueUsage = (double) executor.getQueue().size() / executor.getQueue().remainingCapacity();
if (queueUsage > 0.8) {
log.warn("Thread pool queue usage is high: {}%", queueUsage * 100);
}
}
}四、多線程環(huán)境下的事務(wù)挑戰(zhàn)與解決方案
4.1 問題分析:為什么事務(wù)不能跨線程傳播
@Service
public class ProblematicService {
@Transactional
public void mainMethod() {
// 主線程事務(wù)開始
log.info("Main thread transaction active: {}",
TransactionSynchronizationManager.isActualTransactionActive());
// 創(chuàng)建子線程
new Thread(() -> {
// 子線程中無法訪問主線程的事務(wù)上下文
log.info("Child thread transaction active: {}",
TransactionSynchronizationManager.isActualTransactionActive());
// 這里會拋出異常:沒有活動的事務(wù)
// 嘗試數(shù)據(jù)庫操作會失敗
}).start();
}
}4.2 解決方案1:事務(wù)上下文傳遞
4.2.1 手動傳遞事務(wù)屬性
@Service
public class TransactionPropagationService {
@Transactional
public void processWithContextPropagation() {
// 獲取當(dāng)前事務(wù)的屬性
TransactionAttribute transactionAttribute =
TransactionAspectSupport.currentTransactionStatus().getTransactionAttribute();
// 獲取當(dāng)前事務(wù)的隔離級別、超時時間等屬性
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
Integer timeout = transactionAttribute.getTimeout();
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
// 將事務(wù)屬性傳遞給子線程
TransactionContext context = new TransactionContext();
context.setIsolationLevel(isolationLevel);
context.setTimeout(timeout);
context.setReadOnly(readOnly);
// 在子線程中重新創(chuàng)建事務(wù)
CompletableFuture.runAsync(() -> {
executeInNewTransaction(context, () -> {
// 子線程的業(yè)務(wù)邏輯
log.info("Child thread executing with transaction");
});
});
}
@Data
private static class TransactionContext {
private Integer isolationLevel;
private Integer timeout;
private Boolean readOnly;
}
}4.2.2 使用InheritableThreadLocal(謹(jǐn)慎使用)
@Component
public class TransactionContextHolder {
// 注意:InheritableThreadLocal有內(nèi)存泄漏風(fēng)險(xiǎn),需要謹(jǐn)慎使用
private static final InheritableThreadLocal<Map<String, Object>> context =
new InheritableThreadLocal<>() {
@Override
protected Map<String, Object> childValue(Map<String, Object> parentValue) {
// 深度拷貝,避免父子線程共享同一對象
return parentValue != null ? new HashMap<>(parentValue) : null;
}
};
public static void set(String key, Object value) {
Map<String, Object> map = context.get();
if (map == null) {
map = new HashMap<>();
context.set(map);
}
map.put(key, value);
}
public static Object get(String key) {
Map<String, Object> map = context.get();
return map != null ? map.get(key) : null;
}
public static void clear() {
context.remove();
}
}4.3 解決方案2:使用編程式事務(wù)管理
@Service
public class ProgrammaticTransactionService {
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private JdbcTemplate jdbcTemplate;
public void processWithMultipleThreads(List<Task> tasks) {
// 主線程事務(wù)
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
TransactionStatus mainStatus = transactionManager.getTransaction(definition);
try {
// 主線程業(yè)務(wù)邏輯
executeMainLogic();
// 創(chuàng)建子線程任務(wù)
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (Task task : tasks) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 每個子線程有自己的事務(wù)
DefaultTransactionDefinition childDefinition = new DefaultTransactionDefinition();
childDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
childDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
TransactionStatus childStatus = transactionManager.getTransaction(childDefinition);
try {
executeChildLogic(task);
transactionManager.commit(childStatus);
} catch (Exception e) {
transactionManager.rollback(childStatus);
throw new RuntimeException("Child thread transaction failed", e);
}
});
futures.add(future);
}
// 等待所有子線程完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 提交主事務(wù)
transactionManager.commit(mainStatus);
} catch (Exception e) {
// 回滾主事務(wù)
transactionManager.rollback(mainStatus);
throw new RuntimeException("Main transaction failed", e);
}
}
private void executeMainLogic() {
// 主線程業(yè)務(wù)邏輯實(shí)現(xiàn)
jdbcTemplate.update("INSERT INTO main_table (data) VALUES (?)", "main data");
}
private void executeChildLogic(Task task) {
// 子線程業(yè)務(wù)邏輯實(shí)現(xiàn)
jdbcTemplate.update("INSERT INTO child_table (task_id, data) VALUES (?, ?)",
task.getId(), task.getData());
}
}五、主線程與子線程事務(wù)協(xié)調(diào)策略
5.1 策略一:主線程等待所有子線程提交
@Service
public class CoordinatedTransactionService {
@Autowired
private DataSource dataSource;
@Autowired
private PlatformTransactionManager transactionManager;
/**
* 策略:主線程等待所有子線程事務(wù)成功后才提交
*/
public void coordinatedStrategy1(List<BusinessTask> tasks) {
// 用于收集子線程執(zhí)行結(jié)果
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
// 創(chuàng)建CountDownLatch用于等待所有子線程完成
CountDownLatch latch = new CountDownLatch(tasks.size());
// 創(chuàng)建共享異常收集器
AtomicReference<Exception> sharedException = new AtomicReference<>();
// 啟動主事務(wù)
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
TransactionStatus mainStatus = transactionManager.getTransaction(def);
try {
// 執(zhí)行主線程邏輯
executeMainBusiness();
// 啟動子線程
for (BusinessTask task : tasks) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
// 每個子線程使用獨(dú)立的事務(wù)
return executeChildTransaction(task);
} catch (Exception e) {
sharedException.set(e);
return false;
} finally {
latch.countDown();
}
});
futures.add(future);
}
// 等待所有子線程完成
boolean completed = latch.await(30, TimeUnit.SECONDS);
if (!completed) {
throw new TimeoutException("Child threads timeout");
}
// 檢查是否有子線程失敗
if (sharedException.get() != null) {
throw new RuntimeException("Child thread failed", sharedException.get());
}
// 檢查所有子線程結(jié)果
for (CompletableFuture<Boolean> future : futures) {
if (!future.get()) {
throw new RuntimeException("At least one child thread failed");
}
}
// 提交主事務(wù)
transactionManager.commit(mainStatus);
} catch (Exception e) {
// 回滾主事務(wù)
transactionManager.rollback(mainStatus);
throw new RuntimeException("Coordinated transaction failed", e);
}
}
private boolean executeChildTransaction(BusinessTask task) {
// 子線程使用獨(dú)立的事務(wù)
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = transactionManager.getTransaction(def);
try {
// 執(zhí)行子線程業(yè)務(wù)邏輯
processTask(task);
transactionManager.commit(status);
return true;
} catch (Exception e) {
transactionManager.rollback(status);
log.error("Child transaction failed for task: {}", task.getId(), e);
return false;
}
}
}5.2 策略二:兩階段提交模式
@Service
public class TwoPhaseCommitService {
@Autowired
private DataSource dataSource;
/**
* 簡化的兩階段提交實(shí)現(xiàn)
*/
public void twoPhaseCommitStrategy(List<Runnable> tasks) {
// 第一階段:準(zhǔn)備階段
List<CompletableFuture<Boolean>> prepareFutures = new ArrayList<>();
List<TransactionStatus> childStatuses = Collections.synchronizedList(new ArrayList<>());
try {
// 主事務(wù)開始
Connection mainConn = dataSource.getConnection();
mainConn.setAutoCommit(false);
try {
// 主線程準(zhǔn)備
prepareMainPhase(mainConn);
// 子線程準(zhǔn)備
for (Runnable task : tasks) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
Connection childConn = dataSource.getConnection();
childConn.setAutoCommit(false);
// 執(zhí)行準(zhǔn)備操作
boolean prepared = prepareChildPhase(childConn, task);
if (prepared) {
// 保存連接和狀態(tài),用于第二階段
childStatuses.add(new TransactionStatus(childConn));
}
return prepared;
} catch (SQLException e) {
log.error("Child prepare phase failed", e);
return false;
}
});
prepareFutures.add(future);
}
// 等待所有準(zhǔn)備階段完成
CompletableFuture<Void> allPrepare = CompletableFuture.allOf(
prepareFutures.toArray(new CompletableFuture[0])
);
allPrepare.get(10, TimeUnit.SECONDS);
// 檢查所有子線程是否準(zhǔn)備成功
boolean allPrepared = prepareFutures.stream()
.allMatch(f -> {
try {
return f.get();
} catch (Exception e) {
return false;
}
});
if (!allPrepared) {
// 有任何失敗,執(zhí)行回滾
rollbackAll(mainConn, childStatuses);
throw new RuntimeException("Prepare phase failed");
}
// 第二階段:提交階段
commitAll(mainConn, childStatuses);
} catch (Exception e) {
mainConn.rollback();
throw new RuntimeException("Two-phase commit failed", e);
} finally {
mainConn.close();
}
} catch (SQLException e) {
throw new RuntimeException("Database connection error", e);
}
}
private void commitAll(Connection mainConn, List<TransactionStatus> childStatuses) throws SQLException {
try {
// 先提交所有子事務(wù)
for (TransactionStatus status : childStatuses) {
status.getConnection().commit();
status.getConnection().close();
}
// 最后提交主事務(wù)
mainConn.commit();
} catch (SQLException e) {
// 提交失敗,嘗試回滾所有
try {
mainConn.rollback();
} catch (SQLException ex) {
log.error("Failed to rollback main connection", ex);
}
throw e;
}
}
@Data
private static class TransactionStatus {
private final Connection connection;
private final long threadId = Thread.currentThread().getId();
private final LocalDateTime createTime = LocalDateTime.now();
}
}5.3 策略三:補(bǔ)償事務(wù)模式
@Service
@Slf4j
public class CompensationTransactionService {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 補(bǔ)償事務(wù)模式:記錄所有操作,失敗時執(zhí)行補(bǔ)償
*/
public void compensationStrategy(List<BusinessOperation> operations) {
// 用于記錄需要補(bǔ)償?shù)牟僮?
List<CompensationAction> compensationActions = Collections.synchronizedList(new ArrayList<>());
// 主事務(wù)開始
DefaultTransactionDefinition mainDef = new DefaultTransactionDefinition();
PlatformTransactionManager transactionManager =
new DataSourceTransactionManager(Objects.requireNonNull(jdbcTemplate.getDataSource()));
TransactionStatus mainStatus = transactionManager.getTransaction(mainDef);
try {
// 執(zhí)行主操作
CompensationAction mainAction = executeMainOperation();
compensationActions.add(mainAction);
// 并行執(zhí)行子操作
List<CompletableFuture<CompensationAction>> futures = operations.stream()
.map(op -> CompletableFuture.supplyAsync(() -> {
try {
return executeChildOperation(op);
} catch (Exception e) {
throw new CompletionException(e);
}
}))
.collect(Collectors.toList());
// 等待所有子操作完成
List<CompensationAction> childActions = futures.stream()
.map(f -> {
try {
return f.get();
} catch (Exception e) {
throw new RuntimeException("Child operation failed", e);
}
})
.collect(Collectors.toList());
compensationActions.addAll(childActions);
// 所有操作成功,提交主事務(wù)
transactionManager.commit(mainStatus);
// 記錄成功日志
logCompensationSuccess(compensationActions);
} catch (Exception e) {
// 回滾主事務(wù)
transactionManager.rollback(mainStatus);
// 執(zhí)行補(bǔ)償操作
executeCompensations(compensationActions);
throw new RuntimeException("Transaction failed, compensation executed", e);
}
}
private CompensationAction executeMainOperation() {
// 執(zhí)行業(yè)務(wù)操作,并返回補(bǔ)償動作
String operationId = UUID.randomUUID().toString();
try {
// 業(yè)務(wù)邏輯
jdbcTemplate.update("INSERT INTO main_operations (id, data) VALUES (?, ?)",
operationId, "main data");
// 返回補(bǔ)償動作
return CompensationAction.builder()
.operationId(operationId)
.operationType("INSERT_MAIN")
.compensationSql("DELETE FROM main_operations WHERE id = ?")
.compensationParams(new Object[]{operationId})
.build();
} catch (Exception e) {
throw new RuntimeException("Main operation failed", e);
}
}
private void executeCompensations(List<CompensationAction> actions) {
// 按照操作的反向順序執(zhí)行補(bǔ)償
Collections.reverse(actions);
for (CompensationAction action : actions) {
try {
jdbcTemplate.update(action.getCompensationSql(), action.getCompensationParams());
log.info("Compensation executed for operation: {}", action.getOperationId());
} catch (Exception e) {
log.error("Failed to execute compensation for operation: {}",
action.getOperationId(), e);
// 繼續(xù)執(zhí)行其他補(bǔ)償,不中斷
}
}
}
@Data
@Builder
private static class CompensationAction {
private String operationId;
private String operationType;
private String compensationSql;
private Object[] compensationParams;
private LocalDateTime operationTime;
}
}六、Spring事務(wù)同步機(jī)制在多線程中的應(yīng)用
6.1 使用TransactionSynchronization
@Service
public class TransactionSynchronizationService {
@Autowired
private DataSource dataSource;
/**
* 使用TransactionSynchronization協(xié)調(diào)多線程事務(wù)
*/
@Transactional
public void processWithSynchronization(List<SubTask> subTasks) {
// 注冊事務(wù)同步器
TransactionSynchronizationManager.registerSynchronization(
new CustomTransactionSynchronization(subTasks)
);
// 主線程業(yè)務(wù)邏輯
executeMainBusiness();
// 注意:子線程操作將在事務(wù)提交前執(zhí)行
// TransactionSynchronization.beforeCommit()中啟動子線程
}
private class CustomTransactionSynchronization implements TransactionSynchronization {
private final List<SubTask> subTasks;
private final ExecutorService executorService;
private final List<Future<?>> futures;
public CustomTransactionSynchronization(List<SubTask> subTasks) {
this.subTasks = subTasks;
this.executorService = Executors.newFixedThreadPool(subTasks.size());
this.futures = new ArrayList<>();
}
@Override
public void beforeCommit(boolean readOnly) {
log.info("TransactionSynchronization.beforeCommit called");
// 在事務(wù)提交前啟動子線程
for (SubTask task : subTasks) {
Future<?> future = executorService.submit(() -> {
try {
// 每個子線程使用獨(dú)立連接和事務(wù)
executeSubTaskInNewTransaction(task);
} catch (Exception e) {
log.error("Subtask execution failed", e);
throw new RuntimeException(e);
}
});
futures.add(future);
}
// 等待所有子線程完成
for (Future<?> future : futures) {
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to complete subtasks", e);
}
}
executorService.shutdown();
}
@Override
public void afterCompletion(int status) {
log.info("TransactionSynchronization.afterCompletion called with status: {}",
status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
if (status == STATUS_ROLLED_BACK) {
// 事務(wù)回滾,需要清理子線程可能已經(jīng)提交的操作
log.warn("Main transaction rolled back, but child transactions may have been committed");
// 這里可以實(shí)現(xiàn)補(bǔ)償邏輯
}
cleanup();
}
private void cleanup() {
if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
}
}
}6.2 事務(wù)事件監(jiān)聽機(jī)制
@Component
@Slf4j
public class TransactionEventListenerService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void handleBeforeCommit(TransactionEvent event) {
log.info("Before commit event received");
// 在事務(wù)提交前執(zhí)行操作
prepareForCommit();
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void handleAfterCompletion(TransactionCompletionEvent event) {
log.info("Transaction completed with status: {}",
event.getTransactionResult() == TransactionResult.COMMITTED ? "COMMITTED" : "ROLLED_BACK");
if (event.getTransactionResult() == TransactionResult.COMMITTED) {
// 事務(wù)提交后執(zhí)行異步操作
executePostCommitOperations(event.getBusinessData());
} else {
// 事務(wù)回滾后的清理操作
executeRollbackCleanup(event.getBusinessData());
}
}
@Async
public void executePostCommitOperations(BusinessData data) {
// 異步執(zhí)行提交后的操作
log.info("Executing post-commit operations asynchronously");
// 這里可以啟動子線程進(jìn)行后續(xù)處理
}
@Data
public static class TransactionEvent {
private final String transactionId;
private final LocalDateTime eventTime;
private final BusinessData businessData;
}
@Data
public static class TransactionCompletionEvent extends TransactionEvent {
private final TransactionResult transactionResult;
}
public enum TransactionResult {
COMMITTED,
ROLLED_BACK
}
}七、分布式事務(wù)在多線程場景下的應(yīng)用
7.1 基于Seata的分布式事務(wù)解決方案
@Service
@Slf4j
public class SeataDistributedTransactionService {
@Autowired
private UserService userService;
@Autowired
private OrderService orderService;
@Autowired
private InventoryService inventoryService;
/**
* 使用Seata AT模式處理多線程分布式事務(wù)
* 注意:Seata默認(rèn)不支持多線程,需要特殊處理
*/
@GlobalTransactional(timeoutMills = 300000, name = "multi-thread-purchase")
public void purchaseWithMultipleThreads(PurchaseRequest request) {
// 獲取全局事務(wù)ID
String xid = RootContext.getXID();
log.info("Global transaction started, xid: {}", xid);
// 用于收集子線程執(zhí)行結(jié)果
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
try {
// 任務(wù)1:扣減庫存(異步執(zhí)行)
CompletableFuture<Boolean> inventoryFuture = CompletableFuture.supplyAsync(() -> {
// 傳播全局事務(wù)ID到子線程
RootContext.bind(xid);
try {
return inventoryService.deduct(request.getProductId(), request.getQuantity());
} finally {
RootContext.unbind();
}
});
futures.add(inventoryFuture);
// 任務(wù)2:創(chuàng)建訂單(異步執(zhí)行)
CompletableFuture<Boolean> orderFuture = CompletableFuture.supplyAsync(() -> {
RootContext.bind(xid);
try {
return orderService.createOrder(request);
} finally {
RootContext.unbind();
}
});
futures.add(orderFuture);
// 任務(wù)3:更新用戶信息(主線程執(zhí)行)
boolean userUpdated = userService.updatePurchaseInfo(request.getUserId(), request.getAmount());
if (!userUpdated) {
throw new RuntimeException("Failed to update user info");
}
// 等待所有異步任務(wù)完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFutures.get(30, TimeUnit.SECONDS);
// 檢查所有任務(wù)結(jié)果
for (CompletableFuture<Boolean> future : futures) {
if (!future.get()) {
throw new RuntimeException("One of the async tasks failed");
}
}
log.info("All distributed transactions completed successfully");
} catch (Exception e) {
log.error("Distributed transaction failed", e);
// Seata會自動回滾所有分支事務(wù)
throw new RuntimeException("Purchase failed", e);
} finally {
// 清理上下文
RootContext.unbind();
}
}
}
// Seata配置類
@Configuration
public class SeataConfig {
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("multi-thread-app", "my_test_tx_group");
}
/**
* 自定義DataSourceProxy以支持多線程
*/
@Bean
public DataSource dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}7.2 基于消息隊(duì)列的最終一致性方案
@Service
@Slf4j
public class MQBasedTransactionService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 基于消息隊(duì)列的最終一致性方案
*/
@Transactional
public void processWithMQ(List<SubTask> tasks) {
// 1. 主事務(wù)操作
executeMainTransaction();
// 2. 發(fā)送準(zhǔn)備消息(不投遞)
List<String> messageIds = new ArrayList<>();
for (SubTask task : tasks) {
String messageId = sendPrepareMessage(task);
messageIds.add(messageId);
}
// 3. 本地記錄消息狀態(tài)
saveMessageStatus(messageIds, MessageStatus.PREPARED);
// 4. 提交主事務(wù)(消息仍未投遞)
// 事務(wù)提交后,下面的代碼才會執(zhí)行
// 5. 事務(wù)提交后,確認(rèn)投遞消息
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 確認(rèn)投遞所有消息
for (String messageId : messageIds) {
confirmMessageDelivery(messageId);
updateMessageStatus(messageId, MessageStatus.CONFIRMED);
}
// 異步執(zhí)行子任務(wù)
executeSubTasksAsync(tasks);
}
@Override
public void afterCompletion(int status) {
if (status == STATUS_ROLLED_BACK) {
// 取消所有消息
for (String messageId : messageIds) {
cancelMessage(messageId);
updateMessageStatus(messageId, MessageStatus.CANCELLED);
}
}
}
}
);
}
private void executeSubTasksAsync(List<SubTask> tasks) {
ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
List<CompletableFuture<Void>> futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(() -> {
try {
// 每個子線程處理自己的任務(wù)
processSubTask(task);
} catch (Exception e) {
log.error("Subtask processing failed", e);
// 發(fā)送補(bǔ)償消息
sendCompensationMessage(task);
}
}, executor))
.collect(Collectors.toList());
// 等待所有任務(wù)完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
executor.shutdown();
log.info("All subtasks completed");
})
.exceptionally(ex -> {
log.error("Failed to complete all subtasks", ex);
executor.shutdownNow();
return null;
});
}
private enum MessageStatus {
PREPARED,
CONFIRMED,
CANCELLED,
COMPLETED
}
}八、性能優(yōu)化與最佳實(shí)踐
8.1 線程池優(yōu)化配置
# application.yml 線程池配置
spring:
task:
execution:
pool:
# 公共線程池配置
common:
core-size: 10
max-size: 50
queue-capacity: 1000
keep-alive: 60s
thread-name-prefix: "common-task-"
# 事務(wù)處理專用線程池
transaction:
core-size: 5
max-size: 20
queue-capacity: 500
keep-alive: 30s
thread-name-prefix: "tx-task-"
allow-core-thread-timeout: true
# IO密集型任務(wù)線程池
io-intensive:
core-size: 20
max-size: 100
queue-capacity: 2000
keep-alive: 120s
thread-name-prefix: "io-task-"
# 線程池監(jiān)控配置
management:
endpoints:
web:
exposure:
include: "health,info,metrics,threadpool"
metrics:
export:
prometheus:
enabled: true8.2 事務(wù)優(yōu)化策略
@Configuration
@EnableTransactionManagement
public class TransactionOptimizationConfig {
/**
* 事務(wù)管理器配置優(yōu)化
*/
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(dataSource);
// 優(yōu)化配置
transactionManager.setNestedTransactionAllowed(true); // 允許嵌套事務(wù)
transactionManager.setValidateExistingTransaction(true); // 驗(yàn)證已有事務(wù)
transactionManager.setGlobalRollbackOnParticipationFailure(false); // 優(yōu)化參與失敗時的回滾行為
return transactionManager;
}
/**
* 事務(wù)模板配置
*/
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
TransactionTemplate template = new TransactionTemplate(transactionManager);
// 設(shè)置默認(rèn)事務(wù)屬性
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
template.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
template.setTimeout(30); // 30秒超時
// 只讀事務(wù)優(yōu)化
template.setReadOnly(false);
return template;
}
/**
* 事務(wù)攔截器優(yōu)化
*/
@Bean
public TransactionInterceptor transactionInterceptor(PlatformTransactionManager transactionManager) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionManager(transactionManager);
// 配置事務(wù)屬性源
NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
// 查詢方法使用只讀事務(wù)
RuleBasedTransactionAttribute readOnlyAttr = new RuleBasedTransactionAttribute();
readOnlyAttr.setReadOnly(true);
readOnlyAttr.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
// 寫操作使用讀寫事務(wù)
RuleBasedTransactionAttribute writeAttr = new RuleBasedTransactionAttribute();
writeAttr.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
writeAttr.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
writeAttr.setTimeout(30);
// 方法名模式匹配
source.addTransactionalMethod("get*", readOnlyAttr);
source.addTransactionalMethod("find*", readOnlyAttr);
source.addTransactionalMethod("query*", readOnlyAttr);
source.addTransactionalMethod("save*", writeAttr);
source.addTransactionalMethod("update*", writeAttr);
source.addTransactionalMethod("delete*", writeAttr);
source.addTransactionalMethod("process*", writeAttr);
interceptor.setTransactionAttributeSource(source);
return interceptor;
}
}8.3 監(jiān)控與告警
@Component
@Slf4j
public class TransactionMonitor {
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private PlatformTransactionManager transactionManager;
private final Map<String, AtomicInteger> transactionCounters = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> transactionDurations = new ConcurrentHashMap<>();
/**
* 事務(wù)監(jiān)控AOP
*/
@Aspect
@Component
public static class TransactionMonitoringAspect {
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
private final TransactionMonitor monitor;
public TransactionMonitoringAspect(TransactionMonitor monitor) {
this.monitor = monitor;
}
@Around("@annotation(org.springframework.transaction.annotation.Transactional)")
public Object monitorTransaction(ProceedingJoinPoint joinPoint) throws Throwable {
String methodName = joinPoint.getSignature().toShortString();
String transactionName = extractTransactionName(methodName);
// 記錄開始時間
startTime.set(System.currentTimeMillis());
try {
// 增加事務(wù)計(jì)數(shù)器
monitor.incrementTransactionCounter(transactionName);
// 執(zhí)行原方法
Object result = joinPoint.proceed();
// 記錄成功
monitor.recordTransactionSuccess(transactionName,
System.currentTimeMillis() - startTime.get());
return result;
} catch (Exception e) {
// 記錄失敗
monitor.recordTransactionFailure(transactionName,
System.currentTimeMillis() - startTime.get(), e);
throw e;
} finally {
startTime.remove();
}
}
private String extractTransactionName(String methodName) {
// 簡化的方法名提取邏輯
return methodName.replaceAll(".*\\.", "").replaceAll("\\(.*\\)", "");
}
}
public void incrementTransactionCounter(String transactionName) {
transactionCounters
.computeIfAbsent(transactionName, k -> new AtomicInteger(0))
.incrementAndGet();
// 發(fā)布到監(jiān)控系統(tǒng)
meterRegistry.counter("transactions.total", "name", transactionName).increment();
}
public void recordTransactionSuccess(String transactionName, long duration) {
transactionDurations
.computeIfAbsent(transactionName, k -> new AtomicLong(0))
.addAndGet(duration);
// 發(fā)布到監(jiān)控系統(tǒng)
meterRegistry.timer("transactions.duration", "name", transactionName, "status", "success")
.record(duration, TimeUnit.MILLISECONDS);
// 檢查性能閾值
if (duration > 1000) { // 超過1秒告警
log.warn("Slow transaction detected: {} took {}ms", transactionName, duration);
}
}
/**
* 生成監(jiān)控報(bào)告
*/
@Scheduled(fixedDelay = 60000) // 每分鐘生成一次報(bào)告
public void generateMonitoringReport() {
Map<String, Object> report = new HashMap<>();
transactionCounters.forEach((name, counter) -> {
long count = counter.getAndSet(0);
long totalDuration = transactionDurations.getOrDefault(name, new AtomicLong(0))
.getAndSet(0);
long avgDuration = count > 0 ? totalDuration / count : 0;
report.put(name, Map.of(
"count", count,
"avgDuration", avgDuration,
"tps", count / 60.0
));
});
log.info("Transaction monitoring report: {}", report);
// 發(fā)送到監(jiān)控系統(tǒng)
sendToMonitoringSystem(report);
}
}九、常見問題與解決方案
9.1 問題一:事務(wù)不回滾
問題現(xiàn)象:子線程拋出異常,但主線程事務(wù)沒有回滾。
原因分析:
- 子線程異常沒有傳播到主線程
- 事務(wù)傳播行為配置不當(dāng)
- 異常類型沒有被Spring事務(wù)管理器識別
解決方案:
@Service
public class TransactionRollbackSolution {
@Transactional(rollbackFor = Exception.class)
public void processWithRollbackControl(List<Task> tasks) {
// 使用CompletableFuture收集異常
List<CompletableFuture<Void>> futures = new ArrayList<>();
CompletableFuture<Throwable> errorFuture = new CompletableFuture<>();
for (Task task : tasks) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
executeTask(task);
} catch (Exception e) {
// 將異常傳遞給錯誤Future
errorFuture.complete(e);
throw new CompletionException(e);
}
});
futures.add(future);
}
try {
// 等待所有任務(wù)完成或發(fā)生錯誤
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 設(shè)置超時時間
allFutures.get(30, TimeUnit.SECONDS);
// 檢查是否有錯誤發(fā)生
if (errorFuture.isDone()) {
throw new RuntimeException("Child thread failed", errorFuture.get());
}
} catch (TimeoutException e) {
throw new RuntimeException("Operation timeout", e);
} catch (Exception e) {
// 確保事務(wù)回滾
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
throw new RuntimeException("Process failed", e);
}
}
/**
* 另一種解決方案:使用TransactionCallback
*/
public void processWithTransactionCallback(List<Task> tasks) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
try {
// 并行執(zhí)行任務(wù)
List<CompletableFuture<Void>> futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(() -> {
// 每個子線程使用獨(dú)立事務(wù)
executeInNewTransaction(task);
}))
.collect(Collectors.toList());
// 等待所有完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.exceptionally(ex -> {
// 標(biāo)記事務(wù)為回滾
status.setRollbackOnly();
return null;
})
.join();
} catch (Exception e) {
status.setRollbackOnly();
throw e;
}
}
});
}
}9.2 問題二:連接泄漏
問題現(xiàn)象:數(shù)據(jù)庫連接數(shù)持續(xù)增長,最終耗盡連接池。
原因分析:
- 子線程沒有正確關(guān)閉數(shù)據(jù)庫連接
- 事務(wù)管理不當(dāng)導(dǎo)致連接未釋放
- 線程池配置不合理
解決方案:
@Service
public class ConnectionLeakSolution {
@Autowired
private DataSource dataSource;
/**
* 使用Connection包裝器確保資源釋放
*/
public void processWithConnectionManagement(List<Task> tasks) {
// 使用try-with-resources確保連接關(guān)閉
try (ConnectionHolder connectionHolder = new ConnectionHolder(dataSource)) {
List<CompletableFuture<Void>> futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(() -> {
// 每個線程使用獨(dú)立的連接
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
try {
executeTaskWithConnection(task, connection);
connection.commit();
} catch (Exception e) {
connection.rollback();
throw new RuntimeException("Task failed", e);
}
} catch (SQLException e) {
throw new RuntimeException("Connection error", e);
}
}))
.collect(Collectors.toList());
// 等待所有任務(wù)完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} catch (Exception e) {
throw new RuntimeException("Process failed", e);
}
}
/**
* 連接持有器,確保連接正確關(guān)閉
*/
private static class ConnectionHolder implements AutoCloseable {
private final List<Connection> connections = new ArrayList<>();
private final DataSource dataSource;
public ConnectionHolder(DataSource dataSource) {
this.dataSource = dataSource;
}
public Connection getConnection() throws SQLException {
Connection connection = dataSource.getConnection();
connections.add(connection);
return connection;
}
@Override
public void close() {
for (Connection connection : connections) {
try {
if (!connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
log.error("Failed to close connection", e);
}
}
}
}
/**
* 連接池監(jiān)控
*/
@Component
@Slf4j
public static class ConnectionPoolMonitor {
@Autowired
private DataSource dataSource;
@Scheduled(fixedRate = 30000)
public void monitorConnectionPool() {
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
log.info("Connection pool status: " +
"Active: {}, " +
"Idle: {}, " +
"Total: {}, " +
"Waiting: {}",
hikariDataSource.getHikariPoolMXBean().getActiveConnections(),
hikariDataSource.getHikariPoolMXBean().getIdleConnections(),
hikariDataSource.getHikariPoolMXBean().getTotalConnections(),
hikariDataSource.getHikariPoolMXBean().getThreadsAwaitingConnection());
// 連接泄漏檢測
if (hikariDataSource.getHikariPoolMXBean().getActiveConnections() >
hikariDataSource.getMaximumPoolSize() * 0.8) {
log.warn("Connection pool usage is high, possible connection leak");
}
}
}
}
}十、總結(jié)與最佳實(shí)踐建議
10.1 核心原則總結(jié)
- 事務(wù)邊界清晰:明確每個事務(wù)的邊界,避免事務(wù)過長或過短
- 線程隔離:確保每個線程使用獨(dú)立的事務(wù)上下文
- 資源管理:嚴(yán)格管理數(shù)據(jù)庫連接等資源,避免泄漏
- 異常處理:設(shè)計(jì)完善的異常處理機(jī)制,確保事務(wù)一致性
- 監(jiān)控告警:建立全面的監(jiān)控體系,及時發(fā)現(xiàn)和處理問題
10.2 最佳實(shí)踐建議
10.2.1 架構(gòu)設(shè)計(jì)層面
/**
* 推薦的架構(gòu)模式
*/
@Component
public class TransactionArchitecturePattern {
/**
* 模式1:主從事務(wù)模式
* 主線程負(fù)責(zé)協(xié)調(diào),子線程執(zhí)行具體任務(wù)
*/
public void masterSlavePattern(List<Task> tasks) {
// 1. 主線程開啟事務(wù),記錄任務(wù)狀態(tài)
recordTaskStart(tasks);
// 2. 子線程并行處理(各自獨(dú)立事務(wù))
List<CompletableFuture<Result>> futures = processTasksInParallel(tasks);
// 3. 收集結(jié)果,更新狀態(tài)
processResults(futures);
// 4. 主線程提交事務(wù)
}
/**
* 模式2:補(bǔ)償事務(wù)模式
* 適用于需要最終一致性的場景
*/
public void compensationPattern(BusinessOperation operation) {
// 1. 執(zhí)行主操作
OperationResult result = executeMainOperation(operation);
// 2. 記錄操作日志(用于補(bǔ)償)
recordOperationLog(operation, result);
// 3. 異步執(zhí)行后續(xù)操作
executeAsyncFollowUp(operation, result);
// 4. 提供補(bǔ)償接口
registerCompensationCallback(operation);
}
/**
* 模式3:批量處理模式
* 適用于大批量數(shù)據(jù)處理
*/
public void batchProcessingPattern(List<DataItem> items) {
// 1. 分批處理
List<List<DataItem>> batches = partitionItems(items, 100);
// 2. 并行處理每個批次
batches.parallelStream().forEach(batch -> {
// 每個批次獨(dú)立事務(wù)
processBatchInTransaction(batch);
});
// 3. 匯總結(jié)果
summarizeResults();
}
}10.2.2 代碼實(shí)現(xiàn)層面
- 使用模板方法減少重復(fù)代碼:
public abstract class TransactionTemplatePattern {
@Autowired
protected PlatformTransactionManager transactionManager;
/**
* 執(zhí)行帶事務(wù)的異步任務(wù)
*/
protected <T> CompletableFuture<T> executeAsyncInTransaction(
Supplier<T> task,
TransactionDefinition definition) {
return CompletableFuture.supplyAsync(() -> {
TransactionStatus status = transactionManager.getTransaction(definition);
try {
T result = task.get();
transactionManager.commit(status);
return result;
} catch (Exception e) {
transactionManager.rollback(status);
throw new CompletionException(e);
}
});
}
/**
* 執(zhí)行帶重試的事務(wù)
*/
protected <T> T executeWithRetry(
Callable<T> task,
int maxRetries,
long backoffDelay) {
int retryCount = 0;
while (retryCount <= maxRetries) {
try {
return transactionTemplate.execute(status -> {
try {
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
retryCount++;
if (retryCount > maxRetries) {
throw e;
}
try {
Thread.sleep(backoffDelay * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
throw new IllegalStateException("Should not reach here");
}
}10.2.3 配置管理層面
- 環(huán)境特定的線程池配置:
@Configuration
@Profile({"dev", "test"})
public class DevThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
return executor;
}
}
@Configuration
@Profile("prod")
public class ProdThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(120);
return executor;
}
}10.3 未來發(fā)展趨勢
- 響應(yīng)式事務(wù)管理:隨著響應(yīng)式編程的普及,響應(yīng)式事務(wù)管理將成為趨勢
- 云原生事務(wù):在微服務(wù)和云原生架構(gòu)下,分布式事務(wù)管理將更加重要
- AI優(yōu)化:利用AI技術(shù)自動優(yōu)化事務(wù)參數(shù)和線程池配置
- 無服務(wù)器事務(wù):在Serverless架構(gòu)下的新型事務(wù)管理模式
10.4 結(jié)語
SpringBoot中使用線程池控制主線程和子線程的事務(wù)是一個復(fù)雜但重要的話題。通過合理的架構(gòu)設(shè)計(jì)、正確的事務(wù)策略選擇、完善的異常處理機(jī)制和全面的監(jiān)控體系,我們可以構(gòu)建出既高效又可靠的多線程事務(wù)處理系統(tǒng)。
到此這篇關(guān)于SpringBoot中使用線程池控制主線程與子線程事務(wù)的全過程的文章就介紹到這了,更多相關(guān)springboot線程池控制主線程與子線程事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Cloud集成Nacos?Config動態(tài)刷新源碼剖析
這篇文章主要為大家介紹了Spring?Cloud集成Nacos?Config動態(tài)刷新源碼剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
Java利用ZipOutputStream進(jìn)行高效壓縮的技巧詳解
ZipOutputStream 是 Java 標(biāo)準(zhǔn)庫中用于 創(chuàng)建 ZIP 文件 的核心類,位于 java.util.zip 包中,本文主要和大家介紹了Java如何使用ZipOutputStream進(jìn)行高效壓縮,有需要的可以了解下2025-07-07
SpringBoot多環(huán)境開發(fā)該如何配置
這篇文章主要介紹了 SpringBoot多環(huán)境的開發(fā)配置詳情,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-09-09
使用Mybatis-plus清空表數(shù)據(jù)的操作方法
MyBatis 是一個基于 java 的持久層框架,它內(nèi)部封裝了 jdbc,極大提高了我們的開發(fā)效率,文中給大家介紹了MybatisPlus常用API-增刪改查功能,感興趣的朋友跟隨小編一起看看吧2022-11-11
SpringBoot集成ActiveMQ的實(shí)戰(zhàn)全過程
消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合、異步消息、流量削鋒等問題,實(shí)現(xiàn)高性能、高可用、可伸縮和最終一致性架構(gòu),是大型分布式系統(tǒng)不可缺少的中間件,這篇文章主要給大家介紹了關(guān)于SpringBoot集成ActiveMQ的相關(guān)資料,需要的朋友可以參考下2021-11-11
mybatis mybatis-plus-generator+clickhouse自動生成代碼案例詳解
這篇文章主要介紹了mybatis mybatis-plus-generator+clickhouse自動生成代碼案例詳解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08

