锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. 多线程案例-阻塞式队列

多线程案例-阻塞式队列

0
  • 软件开发
  • 发布于 2024-09-19
  • 0 次阅读
黄健
黄健

1.什么是阻塞队列

阻塞队列是一种特殊的队列,在"先进先出"的原则下又引入了"阻塞"功能

阻塞队列能是一种线程安全的数据结构,具有以下特性:

当队列满的时候,继续入队列就会阻塞,直到其它线程从队列中取走元素
当队列空的时候,继续出队列就会阻塞,直到其它队列向队列中插入元素

阻塞式队列的典型应用场景是"生产者消费者模型”

2.生产者消费者模型

生产者消费者模型通过一个容器来解决生产者和消费者的强耦合问题,生产者与消费者之间不直接通讯,通过阻塞队列来通讯,生产数据后放入阻塞队列,消费者可以直接从阻塞队列获取数据

生产者消费者模型能带来两个非常重要的好处:

1.阻塞队列能使生产者和消费者之间解耦
2.阻塞队列起到缓冲的作用,平衡了生产者消费者的处理能力

1.开发中典型的场景:服务器之间的相互调用

当客户端程序向A服务器发起一个请求后,A服务器将请求转发给B服务器处理,然后B服务器处理完成后将结果返回,此时可以视为:A调用了B

这种场景下,AB两个服务器的耦合程度是比较高的!!如果B服务器出现问题,也会引起A的bug,不仅如此,如果A再需要调用C服务器,还需要修改A的代码,非常麻烦..
针对这种场景使用生产者消费者模型,能有效降低耦合

此时,AB之间的耦合就降低很多了,AB都只知道队列的存在,A的代码不与B相关,B的代码也不与A相关,AB任何一方出现问题不会影响到另一方

2.服务器开发中,用户发送的请求的数量是不可控的,如果没有充分的准备并且请求量超过了服务器的承受范围,服务器有可能直接被大量的请求冲垮.例如"秒杀"这种场景,服务器就会同一时刻受到大量请求,这个时候可以把这些请求放到阻塞队列中,让线程慢慢处理,能有效防止服务器被大量的请求冲垮

在Java标准中内置有阻塞队列BlockingQueue,是一个接口,实现类是LinkedBlockingQueue,put方法是入队列,take是出队列.这两个方法具有阻塞特性

下面使用标准库中的阻塞队列实现生产者消费者模型

public class ThreadDemo1 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        //消费者
        Thread customer = new Thread(()->{
            while(true){
                try {
                    int value = queue.take();
                    System.out.println("消费元素: "+value);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        },"消费者");
        customer.start();
        //生产者
        Thread producer = new Thread(()->{
            Random random = new Random();
            while(true){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                int num = random.nextInt(1000);
                System.out.println("生产元素: "+num);
                try {
                    queue.put(num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        },"生产者");
        producer.start();
        try {
            customer.join();
            producer.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

结果

我们可以发现,生产和消费是成对出现的,程序开始运行,因为在生产者线程中让线程休眠500ms后再执行,此时阻塞队列中为空,而消费者线程要再阻塞队列中使用take方法取元素,就会陷入阻塞状态,等到阻塞队列中有生产者插入元素后才继续执行取元素!!

接下来通过"循环队列"来实现一个阻塞队列

3.阻塞队列实现

实现阻塞队列是要实现一个普通队列然后加上"阻塞功能”

这里我们使用循环队列实现阻塞队列,下面是循环队列的三种状态

我们先写一个普通的循环队列的入队和出队操作

class MyBlockingQueue{
    private int[] items = new int[1000];
    private int head = 0;
    private int tail = 0;
    private int size = 0;
    //入队列
    public void put(int value){
        if(size == items.length){
            //队列满了.不能插入
            return;
        }
        items[tail] = value;
        tail++;
        //针对tail的处理
        //1)这个写法非常常见
        //tail = (tail+1)%items.length;
        //2)可读性好并且比求余的代码效率高
        if(tail >= items.length){
            tail = 0;
        }
        //插入成功
        size++;

    }
    //出队列
    public Integer take(){
        if(size == 0){
            //队列为空,不能出队
            return null;
        }
        int result = items[head];
        head++;
        if(head >= items.length){
            head = 0;
        }
        size--;
        return result;
    }
}

我们在队列中插入几个元素并取出

 public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        queue.put(1);
        queue.put(2);
        queue.put(3);
        queue.put(4);
        int result = 0;
        result = queue.take();
        System.out.println(result);
        result = queue.take();
        System.out.println(result);
        result = queue.take();
        System.out.println(result);
        result = queue.take();
        System.out.println(result);
    }

现在我们在普通队列的基础上加上阻塞功能

阻塞功能意味着该队列是要在多线程环境下使用的

多线程环境下要保证线程安全,需要给方法加锁
使用wait()notify()方法添加阻塞功能
当队列为空时和队列满时都需要阻塞
sleep()方法是指定休眠的时间后唤醒,但是我们不能确定指定的时间是多少,需要看程序运行情况

修改后:

public void put(int value){
        synchronized (this){
            if(size == items.length){
                //队列满了.不能插入
                //return;
                //阻塞
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            items[tail] = value;
            tail++;
            //针对tail的处理
            //1)这个写法非常常见
            //tail = (tail+1)%items.length;
            //2)可读性好并且比求余的代码效率高
            if(tail >= items.length){
                tail = 0;
            }
            //插入成功
            size++;
            //唤醒队列为空处的wait()
            this.notify();
        }

    }
    //出队列
    public Integer take(){
        int result = 0;
        synchronized (this){
            if(size == 0){
                //队列为空,不能出队
                //return null;
                //阻塞
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            result = items[head];
            head++;
            if(head >= items.length){
                head = 0;
            }
            size--;
            //
            //唤醒队列满的wait()
            this.notify();
        }
        return result;
    }

上述代码还有个问题

如果notifyAll()了,这里的wait()一定会被唤醒!但是该线程还没有抢占到锁,当锁被这个线程抢占到时,队列的状态可能会是满的,因此我们最好用while循环,然后继续判断队列状态

这样是比较稳妥的方法

我们使用MyBlockingQueue再写一个生产者消费者模型,看是否能达到效果

MyBlockingQueue queue = new MyBlockingQueue();
        Thread customer = new Thread(()->{
            while(true){
                int result = queue.take();
                System.out.println("消费: "+result);
            }
        });
        customer.start();
        Thread producer = new Thread(()->{
            int count = 0;
            while(true){
                System.out.println("生产者: "+count);
                queue.put(count);
                count++;
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();

也是成对出现的,只有生产者向队中插入了,消费者才能获取

我们调整代码,让消费者速度降低

再来看结果

生产者生产的数据将循环队列充满后开始阻塞,消费者线程休眠结束后开始获取数据,获取一个后,阻塞队列便出现一个空位置,唤醒put的wait()后,继续生产数据,然后又阻塞等待队列不满…..

至此就用循环队列实现了阻塞队列


原文链接: https://blog.csdn.net/chenchenchencl/article/details/128549576

标签: #软件开发 1171
相关文章

万字:支付“核心系统”详解 2024-11-02 15:33

专栏作者:隐墨星辰 \| 主编:陈天宇宙 这篇文章也尝试化繁为简,探寻支付系统的本质,讲清楚在线支付系统最核心的一些概念和设计理念。 虽然支付行业已经过了风头最劲的时光,但跨境支付仍然在蓬勃发展,每年依然有很多新人进入这个行业,这篇文章尝试为这些刚入行的新人提供一点帮助。 文章只介绍一些支付行业十几

资深支付架构师视角:实战从问题定义到代码落地的完整套路 2024-11-02 15:33

前言 今天从一个实际案例入手,介绍站在架构师的角度,如何识别并定义问题,提炼需求,技术方案选型,再到详细设计,最后利用AI的能力协助写出核心的代码,验证与调优。 解决问题存在一定的模式,也可以称之为框架,总结出自己的思考和解题框架,以后再碰到同类型的问题就可以如庖丁解牛一样容易。 很多年前,我写代码

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

设计模式第16讲——迭代器模式(Iterator) 2024-10-08 11:24

一、什么是迭代器模式 迭代器模式是一种行为型设计模式,它提供了一种统一的方式来访问集合对象中的元素,而不是暴露集合内部的表示方式。简单地说,就是将遍历集合的责任封装到一个单独的对象中,我们可以按照特定的方式访问集合中的元素。 二、角色组成 抽象迭代器(Iterator):定义了遍历聚合对象所需的方法

vue2路由和vue3路由区别及原理 2024-10-08 11:24

一、Vue2 与 Vue3 路由的区别 1. 创建路由实例方式的不同 Vue 2 中,通过 Vue.use() 注册路由插件,并通过 new VueRouter() 来创建路由实例。 import Vue from 'vue';import VueRouter from 'vue-router';i

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.