多线程中多个线程插入,一个异常全部回滚的问题
目前萌新还在学习阶段,今天优化一个批量写订单的场景,我们给数据库写一堆订单,但是订单是我们这边的一个总单拆出来的,之前的写法是直接拆单循环处理,单次循环写入为0.3秒,如果我方订单拆开,那么我们的总单拆成10个子单,直接循环同步推送就3-6秒,这个是不可接受的。
所以第一考虑就是使用多线程分别将各个订单进行推送,并且一个报错,所有线程回滚。
一开始总感觉能用一个事务处理所有的线程,但是凭借自己对@transactional的底层实现和编程事务的比较浅显的认识没有搞定所有线程共享一个连接一个事务并且共同提交或回滚的场景,但是线程各自为战,但能够完成一起提交或一起回滚的demo我是勉强写出来了。
代码比较简单,下面贴一下代码
controller
package com
.test
.transaction
.controller
;
import cn
.hutool
.json
.JSONObject
;
import cn
.hutool
.json
.JSONUtil
;
import com
.test
.transaction
.dao
.TransactionTestRepository
;
import com
.test
.transaction
.entity
.TransactionTest
;
import io
.swagger
.annotations
.Api
;
import io
.swagger
.annotations
.ApiOperation
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.apache
.commons
.lang
.StringUtils
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.transaction
.PlatformTransactionManager
;
import org
.springframework
.transaction
.TransactionDefinition
;
import org
.springframework
.transaction
.TransactionStatus
;
import org
.springframework
.transaction
.annotation
.Transactional
;
import org
.springframework
.transaction
.support
.DefaultTransactionDefinition
;
import org
.springframework
.web
.bind
.annotation
.*
;
import java
.util
.ArrayList
;
import java
.util
.List
;
import java
.util
.concurrent
.*
;
@RestController
@RequestMapping("/test")
@Api(tags
= "事务处理")
@Slf4j
public class TestController {
@Autowired
TransactionTestRepository repository
;
@Autowired
PlatformTransactionManager platformTransactionManager
;
public static volatile Boolean needRollback
= false;
@ApiOperation(value
= "多线程中的事务回滚")
@Transactional(rollbackFor
= Exception
.class)
@PostMapping("/threadTrans")
public String
threadTrans(@RequestParam(name
= "target") Integer target
) {
StringBuilder sb
= new StringBuilder();
List
<Future> futureList
= new ArrayList<>();
ExecutorService executorService
= Executors
.newCachedThreadPool();
Integer count
= 20;
CyclicBarrier cyclicBarrier
= new CyclicBarrier(count
);
for (int i
= 0; i
< count
; i
++) {
try {
Thread
.sleep(100);
} catch (InterruptedException e
) {
e
.printStackTrace();
}
SpreadProcess spreadProcess
= new SpreadProcess();
spreadProcess
.count
= i
;
spreadProcess
.repository
= repository
;
spreadProcess
.barrier
= cyclicBarrier
;
spreadProcess
.transactionManager
= platformTransactionManager
;
spreadProcess
.exceptionPoint
= target
;
Future
<String> future
= executorService
.submit(spreadProcess
);
futureList
.add(future
);
}
executorService
.shutdown();
try {
executorService
.awaitTermination(20, TimeUnit
.SECONDS
);
} catch (InterruptedException e
) {
e
.printStackTrace();
}
if (executorService
.isTerminated()) {
for (Future future
: futureList
) {
String errorString
= null
;
try {
errorString
= (String
) future
.get();
} catch (InterruptedException e
) {
e
.printStackTrace();
} catch (ExecutionException e
) {
e
.printStackTrace();
}
if (StringUtils
.isNotEmpty(errorString
)) {
sb
.append(errorString
);
}
}
}
if (StringUtils
.isNotEmpty(sb
.toString())) {
log
.error(sb
.toString());
throw new RuntimeException(sb
.toString());
}
return "succeed";
}
}
@Slf4j
class SpreadProcess implements Callable<String> {
public Integer count
;
public TransactionTestRepository repository
;
public CyclicBarrier barrier
;
private String errorStr
;
public PlatformTransactionManager transactionManager
;
public Integer exceptionPoint
;
public String
runInfo() {
return errorStr
;
}
@Override
public String
call() throws Exception
{
String error
= null
;
DefaultTransactionDefinition def
= new DefaultTransactionDefinition();
def
.setPropagationBehavior(TransactionDefinition
.PROPAGATION_REQUIRES_NEW
);
TransactionStatus status
= transactionManager
.getTransaction(def
);
TransactionTest transactionTest
= new TransactionTest();
transactionTest
.setName(Thread
.currentThread().getId() + "==" + count
);
repository
.save(transactionTest
);
log
.info(transactionTest
.toString());
if (count
== exceptionPoint
) {
error
= String
.format("线程%s处理%s时遇到错误抛异常回滚", Thread
.currentThread().getId(), count
.toString());
log
.error(error
);
errorStr
= error
;
TestController
.needRollback
= true;
}
barrier
.await();
if (TestController
.needRollback
) {
transactionManager
.rollback(status
);
log
.info("线程{},i={}事务回滚",Thread
.currentThread().getId(),count
);
return error
;
}
transactionManager
.commit(status
);
log
.info("线程{},i={}事务已经提交",Thread
.currentThread().getId(),count
);
return "";
}
}
dao
package com
.test
.transaction
.dao
;
import com
.test
.transaction
.entity
.TransactionTest
;
import org
.springframework
.data
.jpa
.repository
.JpaRepository
;
public interface TransactionTestRepository extends JpaRepository<TransactionTest, Integer> {
}
entity
package com
.test
.transaction
.entity
;
import lombok
.Data
;
import javax
.persistence
.Entity
;
import javax
.persistence
.GeneratedValue
;
import javax
.persistence
.GenerationType
;
import javax
.persistence
.Id
;
@Entity
@Data
public class TransactionTest {
@Id
@GeneratedValue(strategy
= GenerationType
.IDENTITY
)
private Integer id
;
private String name
;
}
运行结果如下,虽然能够完成需求,但是总感觉因为回滚和提交那里不是原子的,会因为一些比如刚好卡到那一行宕机之类的极端情况出现bug
项目结构如下,注意这里还受到hiraki线程池大小的限制,如果线程数过多会一直因为hiraki的线程池因为占满而卡住,我把线程池大小开到了15,所以只是应急用,我这边实际的大单处理速度从5-8秒直接降到1秒左右,还是可以接受的。
希望大佬们可以指导下有没有什么思路在springboot上让多线程共享同一个事务去进行操作。。。毕竟萌新从helloworld开始到现在勉强半年,代码上面有啥要改进的欢迎吐槽~