锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. 不能放弃任务,线程池的任务拒绝策略应该如何设定

不能放弃任务,线程池的任务拒绝策略应该如何设定

0
  • JAVA
  • 发布于 2024-08-16
  • 0 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

  1. 线程池的拒绝策略和异常捕获

1.1 什么时候要进行用到拒绝策略?

        就是当所有线程都在工作的时候,并且任务队列已经满了,系统无法处理这么多请求了就要去进行触发拒绝策略。

1.2JDK 内置的拒绝策略

        JDK 提供了四种内置的拒绝策略:

        1.DiscardPolicy:默默丢弃无法处理的任务,不予任何处理。

        2.DiscardOlderstPolicy:丢弃队列中最老的任务,尝试再次提交当前任务。

        3.AbortPolicy(默认):直接抛出异常,组织系统正常工作。

        4.CallerRunsPolicy:将任务分给调用线程来执行,运行当前被丢弃的任务,这样做不回真的丢去任务,但是提交线程的性能可能会急剧下降(主线程被占用了,会产生同步阻塞)。

        CallerRunsPolicy 使用的是当前的调用执行异步任务的线程进行执行被拒绝的任务,所以进行阻塞的也是当前启动异步任务的线程,会影响程序的整体性能,如果你的程序可以承受次延迟并且你要求任何一个任务都要被执行的话,你可以选择这个任务 => 问题,任务多了就是一直阻塞在这个线程,还是比较糟糕的,和把任务队列调大没有任何区别。其实进行理解就是,CallerRunsPolicy 会将任务会退给调用者,让调用者进行处理。

1.3 创建线程池的时候指定拒绝策略

        new ThreadPoolExecutor(corePoolSize 活跃线程数,maximumPoolSize 全部可工作线程数,keepAliveTime 多久后销毁临时线程,TimeUnit 时间的单位,queue 任务队列,线程池拒绝策略)

        活跃线程数就是正式员工,全部可工作线程数就是可以进行同时工作的线程,任务队列就是存储任务的,拒绝策略就是线程池沾满了任务队列也满了的情况,如何进行处理又来的任务。

1.4 自定义一个拒绝策略

        将初始化的拒绝策略传到 ThreadPoolExecutor 中,就可以配置自定义策略。

/**
 * 自定义线程池拒绝策略
 */
public class ThreadPoolRejectionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("失败啦");
    }
}
  1. 拒绝策略的选择

2.1 如果不允许丢弃任务,应该选择哪个拒绝策略

        如果不允许丢弃任务,在 JDK 提供的四个默认的拒绝策略中

        DiscardPolicy:默认丢弃无法处理的任务,不做任何处理。

        DiscardOldersPolicy:丢弃任务队列中最老进入的任务,尝试重新提交任务。

        AbortPolicy(默认策略):丢弃无法处理的任务,抛出异常,中断当前程序的执行。

        CallerRunsPolicy:将无法处理的任务抛回给当前调用线程,进行执行这个无法被处理的任务。

        目前可以很容想到使用 CallerRunsPolicy:从源码中可以卡看到,只要当前程序不关闭,就会使用 execute 方法的线程执行该任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
 
        public CallerRunsPolicy() { }
 
 
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //只要当前程序没有关闭,就用执行execute方法的线程执行该任务
            if (!e.isShutdown()) {
 
                r.run();
            }
        }
    }
}

2.2CallerRunsPolicy 拒绝策略有什么风险吗?如何解决?

        1. 如果走到 CallerRunsPolicy 的是一个十分消耗时间的任务,而且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行,严重情况下可能会导致 OOM。

        注明:OOM 的全程是 Out Of Memory,翻译为中文就是内存用完啦,当 JVM 因为没有足够的内存来为对象分配空间并且垃圾回收期也已经没有空间可以进行回收的时候,就会出现这个错误。

        2. 如果我们的目标是使用线程池进行控制系统流量,比如我们现在设置的线城市核心线程有两个,最大线程 4 个,我们有一个 AI 服务,最多允许四个人同时调用,现在线程池所有线程都在运作,并且任务队列也满了,那如果我为了保证任务不丢弃,继续执行,调用主线程执行 AI 任务,那 AI 服务就直接超负荷崩掉了。

2.3 那如何保证任务不被丢失呢?=> 任务持久化

        如果服务器资源已经达到了可利用的极限,这意味着我们要在设计策略上改变线程池的调度了,我们都知道,导致主线程卡死或者 AI 服务(任何一个有限度的服务)崩溃的根本原因就是我们不希望任何一个任务被丢弃,换个思路,我们能不能在保证任务不丢失的情况下还能在服务器有余力的时候去处理呢?

        那就是进行数据持久化操作。

        1. 可以设计一个任务表存储到 MySQL 数据库中

        2.Redis 缓存数据

        3.Caffeint 本地缓存数据

        4. 将任务提交到消息队列中

2.4 数据库持久化方案的实现 => 也适用于缓存数据库方案

        1. 实现 RejectedExecutionHandler 接口自定义拒绝策略,自定义拒绝策略可以负责将线程池暂时无法处理(此时任务队列已满)的任务入库(保存到 MySQL 中)。注意:线程池在那时无法处理的任务会被先放在任务队列中,任务队列满了才会触发拒绝策略。

        2. 继承 BlockingQueue 实现一个混合式任务队列,该队列包含 JDK 自带的 ArrayBlockingQueue。这里是这样的,建议先取早进入的任务,需要进行重写 take() 拿取任务的方法,先去数据库中读取最早的任务,数据库中没有任务的时候再去 ArrayyBlockQueue 中进行拿任务。

  1. 方案的问题思考以及优化

3.1 问题分析

        虽然我们成功持久化了数据,但是我们优先取数据库中拿数据,这个方案会带来的问题就是,我们很有可能拿不到最早加入队列的任务,因为最早加入的任务很有可能在任务队列中,而我们是先去处理数据库存储的任务了,还有一种极端情况,任务不断来,数据库中任务数据不断增加,然而你任务队列中的数据一直不取,那最早进任务队列里的几个数据不就死里面了吗?

        当然也可以阻止任务入队,只走数据库持久化存储任务。

        但是现在给大家提供一种新思路,自己去把控持久化数据。

3.2 场景搭建

        目前有一个场景:目前有一个场景,我们需要调用 AI 执行一个耗时任务,并且我们 AI 处理的能力很有限(GPU 资源不足),而且现在我们不能遗漏任何一个任务,所有请求都要求成功,那我们应该怎么做呢?

3.3 目标分析

        首先我们必须进行分解一下目标:

        1. 我们需要保证 AI 不被压垮

        2. 我们需要保证用户不能跟个傻子样干等着

        3. 我们必须保证任何一个任务都不能丢弃

        4. 我们必须保证首先执行的一定是最早的任务

3.4 方案设计

3.4.1 线程池异步化

        首先用户不能傻等着那一定得走异步,因为 AI 分析是一个很耗时间的任务,如果让用户在那里干等着,那用户就要掀桌子了,所以系统必须设计为异步的。

3.4.2 线程池的设计

        为什么需要专门设计一个线程池,当然是因为如果我们随便用一个线程池(或者用系统默认的),我们不能把控住到底创建了多少个异步任务,如果 AI 受不了这么多人调度呢?好家伙我 AI 就能抗住俩人同时调用,你直接给我整 10 个异步任务同时调度我,那 AI 不直接嘎掉了?所以我们必须进行设计一个符合我们当前业务的线程池。

        假装我们和 AI 部门沟通,目前 AI 部门形式大差,老板穷的不行了,钱都买茅子了,现在就一台 32G 显存的 GPU 服务器了,AI 部门说,这个 AI 推理能力比较慢,只能俩人同时用,而且每次调用得至少推理半分钟,根据你的经验,用户等一分钟都很强了,所以你决定,设置两个常备线程,最大两个线程,任务队列最多两个。但是此时老板又发威了,所有任务不能丢,一时处理不完要用短信发给用户说你的结果已经处理完了,请及时查看,这可把你愁坏了。

3.4.3 线程池任务拒绝策略和持久化策略的设计

        你决定创建一个任务表,这个任务表主要是用来存储所有任务的,一方面可以进行数据统计分析,一方面你要通过任务表进行存储很多 AI 一时半会处理不完需要持久化挂起的任务。

        让我们来研究着个表,窗口 ID 是因为用户可能开多个 AI 窗口进行发布任务,任务就是进行存储执行的任务的,isNotify 是最关键的一个,这个字段主要是进行存储是否要通知用户,需要通知的就是被挂起来的(当前 AI 一时半会整不完啊,没有加入到线程池中的全被挂起来了),不需要通知的就是一会 AI 就出结果的。

-- 消息表 
 
create table reject_task
(
    id           bigint auto_increment comment 'id'
        primary key,
    userId       bigint                             not null comment '用户ID',
    chatWindowId bigint                             not null comment '窗口ID',
    task         text                               not null comment '任务',
    createTime   datetime default CURRENT_TIMESTAMP not null comment '创建时间',
    updateTime   datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
    isDelete     tinyint  default 0                 not null comment '是否删除',
    isNotify     tinyint  default 1                 not null comment '是否通知 0 = 通知 1 = 不通知'
)
    comment '消息表' collate = utf8mb4_unicode_ci;

3.4.4 线程池任务拒绝策略怎么搞?

        我根据需求进行分析,选定了以下策略:

        1. 当线程正常调度的时候,将任务加入到异步任务中进行调度使用

        2. 当线程池中的工作线程和任务队列都满了之后,进行返回一个任务被挂起的结果,然后将任务存储到数据库中,进行存储下来,等线程池空闲的时候去取出任务并执行。

        3. 在调度异步任务之前,进行判断一下数据库中是否还有没有执行过的任务。如果没有就将调度线程池执行异步任务,如果有,就将异步任务挂起到数据库中,并返回执行结果。

        4. 使用定时任务进行定时轮询线程池的任务队列是否有空闲(判断是否工作线程都满了,并且任务队列空闲,也就是 2 =< Task =< 3),如果有空闲,就调度线程池执行任务。

3.5 方案实现

3.5.1 扩展 Runnable => 方便存储上下文信息,方便 rejectedExecution 可以取到

        按自己需求进行扩展即可

package com.langchao.ai.config;
 
import lombok.Data;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
import java.io.Serializable;
 
@Data
public class CallRunable implements Runnable, Serializable {
 
    private Runnable task;
    private String content;
    private Long chatWindowsId;
    private Long userId;
    private SseEmitter sseEmitter;
 
    public CallRunable(Runnable task, Long chatWindowsId, Long userId, String content) {
        this.task = task;
        this.chatWindowsId = chatWindowsId;
        this.userId = userId;
        this.content = content;
    }
 
    public CallRunable(Runnable task, Long chatWindowsId, Long userId, String content, SseEmitter sseEmitter) {
        this.task = task;
        this.chatWindowsId = chatWindowsId;
        this.userId = userId;
        this.content = content;
        this.sseEmitter = sseEmitter;
    }
 
    @Override
    public void run() {
        task.run();
    }
 
}

3.5.2 实现线程池拒绝策略

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.langchao.ai.model.entity.RejectTask;
import com.langchao.ai.service.RejectTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
import javax.annotation.Resource;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * 自定义线程池拒绝策略
 */
@Slf4j
@Component
public class ThreadPoolRejectionHandler implements RejectedExecutionHandler {
 
    @Resource
    private RejectTaskService rejectTaskService;
 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof CallRunable) {
            // 1. 拿到上下文信息
            // 伪代码
            
            // 进行执行持久化任务
            QueryWrapper<RejectTask> queryWrapper = new QueryWrapper<>();
            queryWrapper.eq("userId", userId);
            queryWrapper.eq("chatWindowId", chatWindowsId);
            RejectTask task = rejectTaskService.getOne(queryWrapper);
            // 更新任务
            task.setTask(content);
            task.setIsNotify(0);
            rejectTaskService.updateById(task);
        } else {
            System.out.println("Task was rejected, but it's not a CallRunnable instance.");
        }
 
 
        System.out.println("失败啦");
    }
}

3.5.3 实现线程池

import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.Resource;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
@Configuration
public class ThreadPoolExecutorConfig {
 
    @Resource
    private ThreadPoolRejectionHandler threadPoolRejectionHandler;
 
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
 
        // 配置线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            private int count = 1;
 
            @Override
            public Thread newThread(@NotNull Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("线程" + count++);
                return thread;
            }
        };
 
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 100, TimeUnit.SECONDS,
                 new ArrayBlockingQueue<>(2), threadFactory, threadPoolRejectionHandler);
        return threadPoolExecutor;
    }
}

3.5.4 业务代码(处理新请求)

// 校验/或者处理其他信息
 
// 将任务推进到数据库中
RejectTask rejectTask = new RejectTask();
rejectTask.setUserId(loginUser.getId());
rejectTask.setChatWindowId(chatWindowId);
rejectTask.setTask(content);
rejectTask.setIsNotify(1);
boolean isSuccess = rejectTaskService.save(rejectTask);
ThrowUtils.throwIf(!isSuccess, ErrorCode.SYSTEM_ERROR, "系统出错!");
 
// 异步化系统
CallRunable callRunable = new CallRunable(() -> {
    doAsyncAI.asyncUserAI(loginUser.getId(), chatWindowId, sseEmitter, chatWindows, rejectTask, content);
}, chatWindowId, loginUser.getId(), content, sseEmitter);
 
// 判断任务数据库中是否还有更早的任务
QueryWrapper<RejectTask> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("isNotify", 0);
RejectTask task = rejectTaskService.getOne(queryWrapper);
if (task != null) {
    sseEmitter.send("任务被挂起");
    sseEmitter.complete();
    return sseEmitter;
}
 
// 调度执行任务
threadPoolExecutor.execute(callRunable);

3.5.5 配置定时任务

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.langchao.ai.model.entity.ChatWindows;
import com.langchao.ai.model.entity.RejectTask;
import com.langchao.ai.service.ChatWindowsService;
import com.langchao.ai.service.RejectTaskService;
import com.langchao.ai.utils.DoAsyncAI;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;
 
@Slf4j
@Component
public class ListenThreadQueue {
 
    @Resource
    private ThreadPoolExecutor threadPoolExecutor;
 
    @Resource
    private DoAsyncAI doAsyncAI;
 
    @Resource
    private RejectTaskService rejectTaskService;
 
    @Resource
    private ChatWindowsService chatWindowsService;
 
    /**
     * 每分钟执行一次
     */
    // @Scheduled(fixedRate = 60 * 1000)
    public void run() {
        // todo 最好还是加把锁
        // 监听队列是否为空
        if (threadPoolExecutor.getTaskCount() < 2 || threadPoolExecutor.getTaskCount() > 3) {
            return;
        }
        // 先查出任务
        QueryWrapper<RejectTask> queryWrapper = new QueryWrapper<>();
        // 指定按照某个日期字段升序排列
        queryWrapper.orderByAsc("createTime");
        // 查询第一条记录
        Page<RejectTask> page = new Page<>(1, 1); // 第一个参数为当前页码,第二个参数为每页显示的记录数
        RejectTask rejectTask = rejectTaskService.page(page, queryWrapper).getRecords().get(0);
        if (rejectTask == null) {
            // 没查出来 就走吧
            return;
        }
        // 查询出chatWindows
        ChatWindows chatWindows = chatWindowsService.getById(rejectTask.getChatWindowId());
        // 将任务添加到队列
        threadPoolExecutor.execute(() -> doAsyncAI.asyncUserAI(rejectTask.getUserId(), rejectTask.getChatWindowId(), null, chatWindows, rejectTask, rejectTask.getTask()));
    }
}
标签: #软件开发 1171 #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.