业务
一个可有一定延迟时间执行的业务逻辑,拆解到调度中心;利用数据库存储执行数据的方式,调度中心获取异步执行;
实现
调度中心集群,然后使用分片广播方式调用执行器;执行器中的逻辑,使用用户id取模查询执行数据;
优化
处理数据逻辑,包含大量数据库操作和redis操作; 原来是在调度执行线程上单独串行执行多条数据,执行效率较低;
将多条数据进行再次根据用户id取模分组,然后使用多线程并发执行,发现执行效率提高;
说明
多个执行器的任务,使用用户id取模获取,避免不同执行器并行执行相同用户数据,造成数据库数据并发操作; 单个执行器本地多线程执行,将任务再次使用用户id取模分组后执行,避免执行器中相同用户数据被并发执行;
int shardTotal = XxlJobHelper.getShardTotal();
int shardIndex = XxlJobHelper.getShardIndex();
WfStudyTimeJob query = new WfStudyTimeJob();
query.setStatus(0);
//此处根据执行器数量取模获取数据
List<WfStudyTimeJob> wfStudyTimeJobListByPage = studyTimeJobMapper.queryListByJOB(query, shardTotal, shardIndex, limit);
if (wfStudyTimeJobListByPage.size() < 1) {
return;
}
//多线程再次取模
for (int i = 0; i < 4; i++) {
int finalI = i;
StudyTimeThreadPoolCallerRunsPolicyUtil.getPool().execute(() -> {
for (WfStudyTimeJob wfStudyTimeJob : wfStudyTimeJobListByPage) {
Integer userId = wfStudyTimeJob.getUserId();
if (userId % 4 != finalI) {
continue;
}
try {
courseController.saveStudyTime(wfStudyTimeJob.getChapterId(), wfStudyTimeJob.getVideoTime(),
wfStudyTimeJob.getStudyTime(), wfStudyTimeJob.getCreateTimeLong(),
wfStudyTimeJob.getUserId(), wfStudyTimeJob.getUserSource());
studyTimeJobMapper.deleteById(wfStudyTimeJob.getId());
} catch (Exception e) {
log.error("保存学习时间错误", e);
wfStudyTimeJob.setStatus(2);
wfStudyTimeJob.setErrorMsg(e.getMessage());
studyTimeJobMapper.updateById(wfStudyTimeJob);
}
}
});
}
此处线程池使用固定线程无缓冲队列执行:
ThreadPoolExecutor pool = new ThreadPoolExecutor(4, 4, 2
, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
Thread t = new Thread(r, "StudyTimeJOB-thread-" + WfIdWorkerUtil.getId());
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
还有一种线程池,模仿xxl-job通讯底层实现的线程池(通过源码可知,当来任务,会先创建线程,后面再到缓冲池):
ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 4, 2
, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200), new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
Thread t = new Thread(r, "StudyTimeJOB-thread-" + WfIdWorkerUtil.getId());
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
本文由 GY 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2022/04/17 19:52