博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SynchronousQueue
阅读量:4329 次
发布时间:2019-06-06

本文共 5231 字,大约阅读时间需要 17 分钟。

之前在文章里讲到了一种特殊的队列,就是直接提交队列SynchronousQueue。SynchronousQueue的容量为0,任何一个对SynchronousQueue的写要等待一个对SynchronousQueue的读,反之也一样。因此SynchronousQueue与其说是一个队列,不如说是一个数据通道。那么我们来看看SynchronousQueue的奇妙功能是怎么实现的呢?

  对于SynchronousQueue来说,它将put()和take()两个不同的操作抽象为一个共通的方法Transferer.transfer()。从字面上的意思来看,就是数据传递的。它的签名如下:

1   abstract static class Transferer
{2 abstract E transfer(E e, boolean timed, long nanos);3 }

  当参数为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据;timed参数决定是否存在timed时间,nanos决定了timeout的时长。如果返回值为空,则表示失败(超时或者中断),如果返回值不为空,则表示数据已经接受或者正常提供。

Transferer.transfer():

Transferer.transfer()方法的实现是SynchronousQueue的核心,它的实现大体上分为三个步骤:

  1. 如果队列为空,或者队列中节点的类型和本次操作是一致的,那么将当前操作压入队列等待。比如,等待队列中是读线程等待,本此操作也是读,因此两个读都需要等待。进入等待队列的线程可能会被挂起,它们会等待一个匹配操作。
  2. 如果等待队列中的操作和本次操作是互补的(比如等待操作是读,而本次操作是写),那么就插入一个“完成”状态的节点,并且让它“匹配”到一个等待节点上。接着弹出这两个节点,并且使得对应的两个线程继续执行。
  3. 如果线程发现等待队列的节点就是“完成”节点,那么帮助这个节点完成任务。其后的操作和步骤2是一致的。

下面通过源码,来对上面的步骤进行说明:

1 E transfer(E e, boolean timed, long nanos) { 2             SNode s = null; // constructed/reused as needed 3             int mode = (e == null) ? REQUEST : DATA; 4  5             for (;;) { 6                 SNode h = head; 7                 if (h == null || h.mode == mode) {  // 队列为空或者模式相同 8                     if (timed && nanos <= 0) {      // 不进行等待 9                         if (h != null && h.isCancelled())10                             casHead(h, h.next);     // 处理取消节点11                         else12                             return null;13                     } else if (casHead(h, s = snode(s, e, h, mode))) {14                         SNode m = awaitFulfill(s, timed, nanos);//等待,直到匹配操作出现15                         if (m == s) {               // 等待被取消16                             clean(s);17                             return null;18                         }19                         if ((h = head) != null && h.next == s)20                             casHead(h, s.next);     //帮助s的fulfiller21                         return (E) ((mode == REQUEST) ? m.item : s.item);22                     }23                 } else if (!isFulfilling(h.mode)) { // 是否处于fullfill状态24                     if (h.isCancelled())            // 如果已经取消了25                         casHead(h, h.next);         // 弹出并重试26                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {27                         for (;;) { // 一直循环到匹配或者没有等待者28                             SNode m = s.next;       // m是s的匹配者29                             if (m == null) {        //已经没有等待者了30                                 casHead(s, null);   // 弹出fulfill节点31                                 s = null;           // 下一次使用新的节点32                                 break;              // 重新开始主循环33                             }34                             SNode mn = m.next;35                             if (m.tryMatch(s)) {36                                 casHead(s, mn);     // 弹出 m 和 s37                                 return (E) ((mode == REQUEST) ? m.item : s.item);38                             } else                  // 匹配失败39                                 s.casNext(m, mn);   // 帮助删除节点40                         }41                     }42                 } else {                            // 帮助一个fulfiller43                     SNode m = h.next;               // m 是 h 的match44                     if (m == null)                  // 没有等待者45                         casHead(h, null);           //弹出fulfill节点46                     else {47                         SNode mn = m.next;48                         if (m.tryMatch(h))          // 尝试 match49                             casHead(h, mn);         // 弹出 m和 h50                         else                        // match 失败51                             h.casNext(m, mn);       // 帮助删除节点52                     }53                 }54             }55         }

   步骤1,就是代码第7行到23行,第六行的SNode表示等待队列中的节点,SNode的内部封装了当前线程、next节点、匹配节点、数据内容等。第7行,判断当前等待队列为空,或者队列中元素的模式与本次操作相同(比如,都是读操作,那么都必须等待)。第13行,生成一个新的节点并置于队列头部,这个节点就代表当前线程。如果入队成功,则执行第14行代码,awaitFulfill()方法会进行自旋等待,并最终挂起当前线程,直到一个与之对应的操作产生,将其唤醒,线程唤醒后(表示已经读取到数据或者自己产生的数据已近被别的线程读取),并在代码第19、20行尝试帮助对应的线程完成两个节点的出队操作。并在最后,返回读取或者写入的数据(第21行)。

  步骤2,代码的23到42行。第23行,首先判断头部节点是否处于fulfill模式。如果是,则需进入步骤3。否则,将视自己为对应的的fulfill线程。第23行,生成一个SNode元素,设置为fulfill模式并将其压入队列头部。接着,设置m(原始的队列头部)为s的匹配节点(35行),这个tryMatch()方法将会激活一个等待线程,并将m传递给那个线程,如果设置成功,则表示数据投递完成,将s和m两个节点弹出即可(第36行)。如果tryMatch()失败,则表示已经有其他线程帮我完成了操作,那么简单的删除m节点即可(39行),因为这个节点的数据已经被传递,不需要再次处理,然后跳转到第27行的循环体,进行下一个等待线程的匹配和数据传递,知道队列中没有等待线程为止。

  步骤3,代码第42到52行。如果线程在执行时,发现头部元素恰好是fulfill模式,就会执行步骤3。步骤3的执行原理和步骤2是完全一致的,唯一不同的是步骤3不会返回,因为步骤3所进行的工作是帮助其他线程尽快的投递它们的数据,而自己并没有完成对应的操作。因此,线程再进入步骤3后,再次进入大循环体,从步骤1开始重新判断条件和投递数据。

  从整个的数据投递的过程中可看出,在SynchronousQueue中,参与工作的所有线程不仅仅是竞争资源的关系。更重要的是,线程彼此之间还会互相帮助。在一个线程内部,可能会帮助其他线程完成他们的工作。这种模式可以更大程度上减少饥饿的可能,提高系统整体的并发度。

SynchronousQueue的特性:

  ❤ SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

  ❤ 因为没有容量,所以对应的peek、contains、clear、isEmpty.....等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。

  ❤ SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。

  ❤ SynchronousQueue内部采用了无锁实现(CAS)。

参考:《Java高并发程序设计》 葛一鸣  郭超 编著:

转载于:https://www.cnblogs.com/Joe-Go/p/9802794.html

你可能感兴趣的文章
阿里负载均衡,配置中间证书问题(在starcom申请免费DV ssl)
查看>>
转:How to force a wordbreaker to be used in Sharepoint Search
查看>>
MySQL存储过程定时任务
查看>>
Python中and(逻辑与)计算法则
查看>>
POJ 3267 The Cow Lexicon(动态规划)
查看>>
设计原理+设计模式
查看>>
音视频处理
查看>>
tomcat 7服务器跨域问题解决
查看>>
前台实现ajax 需注意的地方
查看>>
Jenkins安装配置
查看>>
个人工作总结05(第二阶段)
查看>>
Java clone() 浅拷贝 深拷贝
查看>>
深入理解Java虚拟机&运行时数据区
查看>>
02-环境搭建
查看>>
spring第二冲刺阶段第七天
查看>>
搜索框键盘抬起事件2
查看>>
阿里百川SDK初始化失败 错误码是203
查看>>
透析Java本质-谁创建了对象,this是什么
查看>>
BFS和DFS的java实现
查看>>
关于jquery中prev()和next()的用法
查看>>