前言
项目中有多次调用外部接口,导致执行效率低。固定任务数量的可以使用CountDownLatch实现,动态任务数量的需要CompletableFuture。
使用过程中感觉不方便,所以构建了一个处理工具
注意看业务是否需要调整线程池参数
核心代码
/**
* 异步处理帮助类
*
* @author dzh
* @since 2024/5/10
*/
public class AsyncHelper {
private List<CompletableFuture<?>> futures = new ArrayList<>();
private AsyncHelper() {
this.futures = new ArrayList<>();
}
// 提交任务
public AsyncHelper runAsync(Runnable runnable) {
futures.add(MyUtils.runAsync(runnable));
return this;
}
// 等待线程完成
public AsyncHelper await() {
MyUtils.await(futures);
return this;
}
public static AsyncHelper newInstance() {
return new AsyncHelper();
}
/**
* 异步工具类
*/
public static class MyUtils {
private static final Executor executor;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("biz-pool-%d").setDaemon(true).build();
// 创建线程池,拒绝策略为当前线程运行任务
executor = new ThreadPoolExecutor(100, 100, 10, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 执行任务
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
// 这里处理ThreadLocal类型数据
final Long tenantId = DefaultTenantContext.getTenantId();
return CompletableFuture.runAsync(() -> {
DefaultTenantContext.setTenantId(tenantId);
runnable.run();
}, executor);
}
/**
* 等待所有任务执行完成
*/
public static void await(List<CompletableFuture<?>> futures) {
if (CollectionUtils.isNotEmpty(futures)) {
CompletableFuture<?>[] arr = new CompletableFuture[futures.size()];
for (int i = 0; i < arr.length; i++) {
arr[i] = futures.get(i);
}
try {
CompletableFuture.allOf(arr).get();
} catch (Exception e) {
throw new RuntimeException("任务等待异常", e);
}
}
}
}
}
使用
AsyncHelper asyncHelper = AsyncHelper.newInstance();
for (int i = 0; i < 100; i++) {
asyncHelper.runAsync(() -> {
// 核心代码
});
}
asyncHelper.await();
demo
@Test
public void run() {
AtomicInteger atomicInteger = new AtomicInteger(0);
long start = System.currentTimeMillis();
AtomicInteger threadCnt = new AtomicInteger();
ConcurrentHashMap<String, Object> threadNameSet = new ConcurrentHashMap<>();
AsyncHelper asyncHelper = AsyncHelper.newInstance();
for (int i = 0; i < 100; i++) {
final int j = i;
asyncHelper.runAsync(() -> {
try {
Thread.sleep(1000);
atomicInteger.addAndGet(1);
if (threadNameSet.put(Thread.currentThread().getName(), "") == null) {
threadCnt.addAndGet(1);
}
System.out.println(Thread.currentThread().getName() + j);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
asyncHelper.await();
System.out.printf("消耗时间 %dS %d%n 线程个数%d", ((System.currentTimeMillis() - start) / 1000), atomicInteger.get(), threadCnt.get());
}
最后的话
欢迎拍砖