多线程中的事务问题

it2025-05-18  24

多线程中多个线程插入,一个异常全部回滚的问题

目前萌新还在学习阶段,今天优化一个批量写订单的场景,我们给数据库写一堆订单,但是订单是我们这边的一个总单拆出来的,之前的写法是直接拆单循环处理,单次循环写入为0.3秒,如果我方订单拆开,那么我们的总单拆成10个子单,直接循环同步推送就3-6秒,这个是不可接受的。

所以第一考虑就是使用多线程分别将各个订单进行推送,并且一个报错,所有线程回滚。

一开始总感觉能用一个事务处理所有的线程,但是凭借自己对@transactional的底层实现和编程事务的比较浅显的认识没有搞定所有线程共享一个连接一个事务并且共同提交或回滚的场景,但是线程各自为战,但能够完成一起提交或一起回滚的demo我是勉强写出来了。

代码比较简单,下面贴一下代码

controller

package com.test.transaction.controller; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.test.transaction.dao.TransactionTestRepository; import com.test.transaction.entity.TransactionTest; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.web.bind.annotation.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * @author william * @description * @Date: 2020-10-21 00:17 */ @RestController @RequestMapping("/test") @Api(tags = "事务处理") @Slf4j public class TestController { @Autowired TransactionTestRepository repository; @Autowired PlatformTransactionManager platformTransactionManager; public static volatile Boolean needRollback = false; @ApiOperation(value = "多线程中的事务回滚") @Transactional(rollbackFor = Exception.class) @PostMapping("/threadTrans") public String threadTrans(@RequestParam(name = "target") Integer target) { //入参target用于指定在哪个线程执行时模拟报错 StringBuilder sb = new StringBuilder(); //用于接受callable的线程处理失败时的回执 List<Future> futureList = new ArrayList<>(); //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //开启20个线程 Integer count = 20; CyclicBarrier cyclicBarrier = new CyclicBarrier(count); for (int i = 0; i < count; i++) { try { //主线程每100ms加一个线程,让数据好看些,毕竟都要等待barrier过坎 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } SpreadProcess spreadProcess = new SpreadProcess(); //标记当前线程是第几个 spreadProcess.count = i; //传入jpaRepository用于数据库的操作 spreadProcess.repository = repository; //cyclicBarrier spreadProcess.barrier = cyclicBarrier; //事务管理器 spreadProcess.transactionManager = platformTransactionManager; //将要触发异常的目标线程号 spreadProcess.exceptionPoint = target; //执行 Future<String> future = executorService.submit(spreadProcess); futureList.add(future); } //拒绝接受新线程并等待处理完毕,超时时间为20秒 executorService.shutdown(); try { executorService.awaitTermination(20, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } //如果线程池运行完毕的后续处理,拼装异常信息 if (executorService.isTerminated()) { for (Future future : futureList) { String errorString = null; try { errorString = (String) future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } if (StringUtils.isNotEmpty(errorString)) { sb.append(errorString); } } } //打印异常信息 if (StringUtils.isNotEmpty(sb.toString())) { log.error(sb.toString()); throw new RuntimeException(sb.toString()); } return "succeed"; } } @Slf4j class SpreadProcess implements Callable<String> { public Integer count; public TransactionTestRepository repository; public CyclicBarrier barrier; private String errorStr; public PlatformTransactionManager transactionManager; public Integer exceptionPoint; public String runInfo() { return errorStr; } @Override public String call() throws Exception { String error = null; //设置一个新事务 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); //新实例化jpa实体并赋值,插入数据库 TransactionTest transactionTest = new TransactionTest(); transactionTest.setName(Thread.currentThread().getId() + "==" + count); repository.save(transactionTest); log.info(transactionTest.toString()); //指定的线程抛出一个异常,通知全部回滚 if (count == exceptionPoint) { error = String.format("线程%s处理%s时遇到错误抛异常回滚", Thread.currentThread().getId(), count.toString()); log.error(error); errorStr = error; TestController.needRollback = true; } //线程运行到此开始等待所有线程全部执行到此统一判定 barrier.await(); if (TestController.needRollback) { transactionManager.rollback(status); log.info("线程{},i={}事务回滚",Thread.currentThread().getId(),count); return error; } transactionManager.commit(status); log.info("线程{},i={}事务已经提交",Thread.currentThread().getId(),count); return ""; } }

dao

package com.test.transaction.dao; import com.test.transaction.entity.TransactionTest; import org.springframework.data.jpa.repository.JpaRepository; /** * @author william * @description * @Date: 2020-10-21 00:13 */ public interface TransactionTestRepository extends JpaRepository<TransactionTest, Integer> { }

entity

package com.test.transaction.entity; import lombok.Data; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; /** * @author william * @description * @Date: 2020-10-21 00:13 */ @Entity @Data public class TransactionTest { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; private String name; }

运行结果如下,虽然能够完成需求,但是总感觉因为回滚和提交那里不是原子的,会因为一些比如刚好卡到那一行宕机之类的极端情况出现bug

项目结构如下,注意这里还受到hiraki线程池大小的限制,如果线程数过多会一直因为hiraki的线程池因为占满而卡住,我把线程池大小开到了15,所以只是应急用,我这边实际的大单处理速度从5-8秒直接降到1秒左右,还是可以接受的。

希望大佬们可以指导下有没有什么思路在springboot上让多线程共享同一个事务去进行操作。。。毕竟萌新从helloworld开始到现在勉强半年,代码上面有啥要改进的欢迎吐槽~

最新回复(0)