欢迎访问 生活随笔!

ag凯发k8国际

当前位置: ag凯发k8国际 > 编程语言 > java >内容正文

java

java blockingqueue-ag凯发k8国际

发布时间:2024/10/8 java 1 豆豆
ag凯发k8国际 收集整理的这篇文章主要介绍了 java blockingqueue_java多线程进阶(三一)—— j.u.c之collections框架:blockingqueue接口... 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

一、引言

从本节开始,我们将介绍juc-collections框架中的“阻塞队列”部分。阻塞队列在实际应用中非常广泛,许多消息中间件中定义的队列,通常就是一种“阻塞队列”。

那么“阻塞队列”和我们之前讨论过的concurrentlinkedqueue、concurrentlinkeddeque有什么不同呢?

concurrentlinkedqueue和concurrentlinkeddeque是以非阻塞算法实现的高性能队列,其使用场景一般是在并发环境下,需要“队列”/“栈”这类数据结构时才会使用;而“阻塞队列”通常利用了“锁”来实现,也就是会阻塞调用线程,其使用场景一般是在“生产者-消费者”模式中,用于线程之间的数据交换或系统解耦。

在java多线程基础(七)——producer-consumer模式中,我们曾简要的谈到过“生产者-消费者”这种模式。在这种模式中,“生产者”和“消费者”是相互独立的,两者之间的通信需要依靠一个队列。这个队列,其实就是本文中的“阻塞队列”。

引入“阻塞队列”的最大好处就是解耦,在软件工程中,“高内聚,低耦合”是进行模块设计的准则之一,这样“生产者”和“消费者”其实是互不影响的,将来任意一方需要升级时,可以保证系统的平滑过渡。

二、blockingqueue简介

blockingqueue是在jdk1.5时,随着j.u.c引入的一个接口:

blockingqueue继承了queue接口,提供了一些阻塞方法,主要作用如下:

当线程向队列中插入元素时,如果队列已满,则阻塞线程,直到队列有空闲位置(非满);

当线程从队列中取元素(删除队列元素)时,如果队列未空,则阻塞线程,直到队列有元素;

既然blockingqueue是一种队列,所以也具备队列的三种基本方法:插入、删除、读取:

操作类型

抛出异常

返回特殊值

阻塞线程

超时

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

删除

remove()

poll()

take()

poll(time, unit)

读取

element()

peek()

/

/

可以看到,对于每种基本方法,“抛出异常”和“返回特殊值”的方法定义和queue是完全一样的。blockingqueue只是增加了两类和阻塞相关的方法:put(e)、take();offer(e, time, unit)、poll(time, unit)。

put(e)和take()方法会一直阻塞调用线程,直到线程被中断或队列状态可用;

offer(e, time, unit)和poll(time, unit)方法会限时阻塞调用线程,直到超时或线程被中断或队列状态可用。

public interface blockingqueue extends queue {

/**

* 插入元素e至队尾, 如果队列已满, 则阻塞调用线程直到队列有空闲空间.

*/

void put(e e) throws interruptedexception;

/**

* 插入元素e至队列, 如果队列已满, 则限时阻塞调用线程,直到队列有空闲空间或超时.

*/

boolean offer(e e, long timeout, timeunit unit)

throws interruptedexception;

/**

* 从队首删除元素,如果队列为空, 则阻塞调用线程直到队列中有元素.

*/

e take() throws interruptedexception;

/**

* 从队首删除元素,如果队列为空, 则限时阻塞调用线程,直到队列中有元素或超时.

*/

e poll(long timeout, timeunit unit) throws interruptedexception;

// ...

}

除此之外,blockingqueue还具有以下特点:

blockingqueue队列中不能包含null元素;

blockingqueue接口的实现类都必须是线程安全的,实现类一般通过“锁”保证线程安全;

blockingqueue 可以是限定容量的。remainingcapacity()方法用于返回剩余可用容量,对于没有容量限制的blockingqueue实现,该方法总是返回integer.max_value 。

三、再谈“生产者-消费者”模式

最后,我们来看下如何利用blockingqueue来实现生产者-消费者模式。在生产者-消费者模式中,一共有四类角色:生产者、消费者、消息队列、消息体。我们利用blockingqueue来实现消息队列,其余部分没有什么变化。

producer(生产者)

生产者生产消息体(data),并将消息体(data)传递给通道(channel)。

/**

* 生产者

*/

public class producer implements runnable {

private channel channel;

public producer(channel channel) {

this.channel = channel;

}

@override

public void run() {

while (true) {

string v = string.valueof(threadlocalrandom.current().nextint());

data data = new data(v);

try {

channel.put(data);

system.out.println(thread.currentthread().getname() " produce :" data);

} catch (interruptedexception e) {

e.printstacktrace();

}

thread.yield();

}

}

}

consumer(消费者)

消费者从通道(channel)中获取数据,进行处理。

/**

* 消费者

*/

public class consumer implements runnable {

private final channel channel;

public consumer(channel channel) {

this.channel = channel;

}

@override

public void run() {

while (true) {

try {

object obj = channel.take();

system.out.println(thread.currentthread().getname() " consume :" obj.tostring());

} catch (interruptedexception e) {

e.printstacktrace();

}

thread.yield();

}

}

}

channel(通道)

相当于消息的队列,对消息进行排队,控制消息的传输。

/**

* 通道类

*/

public class channel {

private final blockingqueue blockingqueue;

public channel(blockingqueue blockingqueue) {

this.blockingqueue = blockingqueue;

}

public object take() throws interruptedexception {

return blockingqueue.take();

}

public void put(object o) throws interruptedexception {

blockingqueue.put(o);

}

}

data(消息体/数据)

data代表了实际生产或消费的数据。

/**

* 数据/消息

*/

public class data implements serializable {

private t data;

public data(t data) {

this.data = data;

}

public t getdata() {

return data;

}

public void setdata(t data) {

this.data = data;

}

@override

public string tostring() {

return "data{"

"data=" data

'}';

}

}

调用如下:

public class main {

public static void main(string[] args) {

blockingqueue blockingqueue = new somequeueimplementation();

channel channel = new channel(blockingqueue);

producer p = new producer(channel);

consumer c1 = new consumer(channel);

consumer c2 = new consumer(channel);

new thread(p).start();

new thread(c1).start();

new thread(c2).start();

}

}

总结

以上是ag凯发k8国际为你收集整理的java blockingqueue_java多线程进阶(三一)—— j.u.c之collections框架:blockingqueue接口...的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得ag凯发k8国际网站内容还不错,欢迎将ag凯发k8国际推荐给好友。

网站地图