一次改进xxl-job调度任务,使用多线程优化执行效率

/ 后端 / 没有评论 / 77浏览

业务

一个可有一定延迟时间执行的业务逻辑,拆解到调度中心;利用数据库存储执行数据的方式,调度中心获取异步执行;

实现

调度中心集群,然后使用分片广播方式调用执行器;执行器中的逻辑,使用用户id取模查询执行数据;

优化

处理数据逻辑,包含大量数据库操作和redis操作; 原来是在调度执行线程上单独串行执行多条数据,执行效率较低;

将多条数据进行再次根据用户id取模分组,然后使用多线程并发执行,发现执行效率提高;

说明

多个执行器的任务,使用用户id取模获取,避免不同执行器并行执行相同用户数据,造成数据库数据并发操作; 单个执行器本地多线程执行,将任务再次使用用户id取模分组后执行,避免执行器中相同用户数据被并发执行;

        int shardTotal = XxlJobHelper.getShardTotal();
        int shardIndex = XxlJobHelper.getShardIndex();

        WfStudyTimeJob query = new WfStudyTimeJob();
        query.setStatus(0);
        //此处根据执行器数量取模获取数据
        List<WfStudyTimeJob> wfStudyTimeJobListByPage = studyTimeJobMapper.queryListByJOB(query, shardTotal, shardIndex, limit);
        if (wfStudyTimeJobListByPage.size() < 1) {
            return;
        }
         
        //多线程再次取模
        for (int i = 0; i < 4; i++) {
            int finalI = i;
            StudyTimeThreadPoolCallerRunsPolicyUtil.getPool().execute(() -> {
                for (WfStudyTimeJob wfStudyTimeJob : wfStudyTimeJobListByPage) {
                    Integer userId = wfStudyTimeJob.getUserId();
                    if (userId % 4 != finalI) {
                        continue;
                    }
                    try {
                        courseController.saveStudyTime(wfStudyTimeJob.getChapterId(), wfStudyTimeJob.getVideoTime(),
                                wfStudyTimeJob.getStudyTime(), wfStudyTimeJob.getCreateTimeLong(),
                                wfStudyTimeJob.getUserId(), wfStudyTimeJob.getUserSource());
                        studyTimeJobMapper.deleteById(wfStudyTimeJob.getId());
                    } catch (Exception e) {
                        log.error("保存学习时间错误", e);
                        wfStudyTimeJob.setStatus(2);
                        wfStudyTimeJob.setErrorMsg(e.getMessage());
                        studyTimeJobMapper.updateById(wfStudyTimeJob);
                    }
                }
            });
        }

此处线程池使用固定线程无缓冲队列执行:

    ThreadPoolExecutor pool = new ThreadPoolExecutor(4, 4, 2
            , TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
        @Override
        public Thread newThread(@NotNull Runnable r) {
            Thread t = new Thread(r, "StudyTimeJOB-thread-" + WfIdWorkerUtil.getId());
            return t;
        }
    }, new ThreadPoolExecutor.CallerRunsPolicy());

还有一种线程池,模仿xxl-job通讯底层实现的线程池(通过源码可知,当来任务,会先创建线程,后面再到缓冲池):

    ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 4, 2
            , TimeUnit.SECONDS, new LinkedBlockingQueue<>(200), new ThreadFactory() {
        @Override
        public Thread newThread(@NotNull Runnable r) {
            Thread t = new Thread(r, "StudyTimeJOB-thread-" + WfIdWorkerUtil.getId());
            return t;
        }
    }, new ThreadPoolExecutor.CallerRunsPolicy());