博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Fast Wait-free Queue
阅读量:6504 次
发布时间:2019-06-24

本文共 6023 字,大约阅读时间需要 20 分钟。

本文介绍一种高效的,很有价值的无等待队列实现 。

它基于FAA(fetch-and-add)+CAS(compare-and-swap)来实现。第一步的FAA避开高度竞争情况下第一个CAS造成的竞争。
同时它还提供最强的wait-freedom保证progress。

我个人给它提供了Java的一个实现:

首先:它包含如下5种数据结构。

图片描述

对于一个cell,它的初始值为BOT,所以第一步是:

图片描述

接着,假如enqueue Thread成功,条件变为:cell->value==val, 那么enqueue就完成操作,结束对该cell的访问。所以接下来dequeue Thread由于可能存在竞争,需要把cell的deq原子地改变为TOP,从而结束操作:

图片描述

所以现在val已经被Thread ONE或者Thread TWO获取了。

但是,假如STEP ONE中的enqueue操作失败了呢?那么会怎么样呢?

此时,cell->val == TOP,做法是enqueue保留该cell的index,如果在尝试次数内就再次获取一个新的cell来尝试。
否则就构造自己的enq Request,改为将enq Request塞入新的cell.
而dequeue则在将val变为TOP之后,查看该cell的enq Request,如果已经有了,则尝试配对这个enq Request和该cell(enq.id == cell.index)。如果没有,则会从线程handle_t构成的ring中抽取下一个enq Request,接着尝试将该enq Request塞入该cell。如果没有enq Request则将该cell的enq改为TOP。如果去到enq Request则尝试将它配对(enq.id == cell.index)
无论如何,该cell必定要被消耗掉:

  • 要么因为cell->enq变为TOP从而消耗掉。
  • 要么因为cell->enq变为了某个enq request,但是和该request没有配对成功,从而消耗掉。
  • 要么cell->enq变为了某个enq request,并且和该request配对成功,从而cell->val获得某个该request的值。

之后,同样的dequeue通过竞争cell->deq来获取val。此过程如图下:

Atomic: cell->enq => ENQ
图片描述

Atomic: cell->enq-id => (-cell->index)

图片描述

好,到此为止,enqueue操作相关的所有操作都已走完。

另一方面,假如dequeue操作经过多次还未成功,那么它就会将该deq request放入线程的线程ring环里面。

当其他线程dequeue成功之后,就会顺着线程ring环去获取协助该deq。
他们不再把deq直接变为TOP,而是变为指向那个deq Request。
并且通过将deq的idx先指向选定的cell,然后改变idx的值为负,从而通知deq request的线程操作已完成。
如图所示:
图片描述

它的enqueue代码如下:

void enqueue(queue_t *q, handle_t *th, void *v) {    th->hzd_node_id = th->enq_node_id;    long id;    int p = MAX_PATIENCE;    while (!enq_fast(q, th, v, &id) && p-- > 0)        ;    if (p < 0) enq_slow(q, th, v, id);    th->enq_node_id = th->Ep->id;    RELEASE(&th->hzd_node_id, -1);}static int enq_fast(queue_t *q, handle_t *th, void *v, long *id) {    long i = FAAcs(&q->Ei, 1);    cell_t *c = find_cell(&th->Ep, i, th);    void *cv = BOT;    if (CAS(&c->val, &cv, v)) {#ifdef RECORD        th->fastenq++;#endif        return 1;    } else {        *id = i;        return 0;    }}static void enq_slow(queue_t *q, handle_t *th, void *v, long id) {    enq_t *enq = &th->Er;    enq->val = v;    RELEASE(&enq->id, id);    node_t *tail = th->Ep;    long i;    cell_t *c;    do {        i = FAA(&q->Ei, 1);        c = find_cell(&tail, i, th);        enq_t *ce = BOT;        if (CAScs(&c->enq, &ce, enq) && c->val != TOP) {            if (CAS(&enq->id, &id, -i)) id = -i;            break;        }    } while (enq->id > 0);    id = -enq->id;    c = find_cell(&th->Ep, id, th);    if (id > i) {        long Ei = q->Ei;        while (Ei <= id && !CAS(&q->Ei, &Ei, id + 1))            ;    }    c->val = v;#ifdef RECORD    th->slowenq++;#endif}static void *help_enq(queue_t *q, handle_t *th, cell_t *c, long i) {    void *v = spin(&c->val);    if ((v != TOP && v != BOT) ||        (v == BOT && !CAScs(&c->val, &v, TOP) && v != TOP)) {        return v;    }    enq_t *e = c->enq;    if (e == BOT) {        handle_t *ph;        enq_t *pe;        long id;        ph = th->Eh, pe = &ph->Er, id = pe->id;        if (th->Ei != 0 && th->Ei != id) {            th->Ei = 0;            th->Eh = ph->next;            ph = th->Eh, pe = &ph->Er, id = pe->id;        }        if (id > 0 && id <= i && !CAS(&c->enq, &e, pe))            th->Ei = id;        else            th->Eh = ph->next;        if (e == BOT && CAS(&c->enq, &e, TOP)) e = TOP;    }    if (e == TOP) return (q->Ei <= i ? BOT : TOP);    long ei = ACQUIRE(&e->id);    void *ev = ACQUIRE(&e->val);    if (ei > i) {        if (c->val == TOP && q->Ei <= i) return BOT;    } else {        if ((ei > 0 && CAS(&e->id, &ei, -i)) || (ei == -i && c->val == TOP)) {            long Ei = q->Ei;            while (Ei <= i && !CAS(&q->Ei, &Ei, i + 1))                ;            c->val = ev;        }    }    return c->val;}

dequeue代码如下:

void *dequeue(queue_t *q, handle_t *th) {    th->hzd_node_id = th->deq_node_id;    void *v;    long id = 0;    int p = MAX_PATIENCE;    do        v = deq_fast(q, th, &id);    while (v == TOP && p-- > 0);    if (v == TOP)        v = deq_slow(q, th, id);    else {#ifdef RECORD        th->fastdeq++;#endif    }    if (v != EMPTY) {        help_deq(q, th, th->Dh);        th->Dh = th->Dh->next;    }    th->deq_node_id = th->Dp->id;    RELEASE(&th->hzd_node_id, -1);    if (th->spare == NULL) {        cleanup(q, th);        th->spare = new_node();    }#ifdef RECORD    if (v == EMPTY) th->empty++;#endif    return v;}static void *deq_fast(queue_t *q, handle_t *th, long *id) {    long i = FAAcs(&q->Di, 1);    cell_t *c = find_cell(&th->Dp, i, th);    void *v = help_enq(q, th, c, i);    deq_t *cd = BOT;    if (v == BOT) return BOT;    if (v != TOP && CAS(&c->deq, &cd, TOP)) return v;    *id = i;    return TOP;}static void *deq_slow(queue_t *q, handle_t *th, long id) {    deq_t *deq = &th->Dr;    RELEASE(&deq->id, id);    RELEASE(&deq->idx, id);    help_deq(q, th, th);    long i = -deq->idx;    cell_t *c = find_cell(&th->Dp, i, th);    void *val = c->val;#ifdef RECORD    th->slowdeq++;#endif    return val == TOP ? BOT : val;}static void help_deq(queue_t *q, handle_t *th, handle_t *ph) {    deq_t *deq = &ph->Dr;    long idx = ACQUIRE(&deq->idx);    long id = deq->id;    if (idx < id) return;    node_t *Dp = ph->Dp;    th->hzd_node_id = ph->hzd_node_id;    FENCE();    idx = deq->idx;    long i = id + 1, old = id, new = 0;    while (1) {        node_t *h = Dp;        for (; idx == old && new == 0; ++i) {            cell_t *c = find_cell(&h, i, th);            long Di = q->Di;            while (Di <= i && !CAS(&q->Di, &Di, i + 1))                ;            void *v = help_enq(q, th, c, i);            if (v == BOT || (v != TOP && c->deq == BOT))                new = i;            else                idx = ACQUIRE(&deq->idx);        }        if (new != 0) {            if (CASra(&deq->idx, &idx, new)) idx = new;            if (idx >= new) new = 0;        }        if (idx < 0 || deq->id != id) break;        cell_t *c = find_cell(&Dp, idx, th);        deq_t *cd = BOT;        if (c->val == TOP || CAS(&c->deq, &cd, deq) || cd == deq) {            CAS(&deq->idx, &idx, -idx);            break;        }        old = idx;        if (idx >= i) i = idx + 1;    }}

转载地址:http://tdqyo.baihongyu.com/

你可能感兴趣的文章
文档转换拾遗
查看>>
操作主机 Schema Master[为企业维护windows server 2008系列九]
查看>>
AI产品开发指南:5大核心环节搞定机器学习工作流
查看>>
解决客户一例:使用域超级管理员打开Exchange 2010发现没有权限
查看>>
Maven中解决system的jar依赖,打包复制问题
查看>>
物联网操作系统已现中国时机
查看>>
Eclipse下使用Subversion =subclipse
查看>>
架构语言ArchiMate - ArchiMate提供的基本视角(Viewpoints)介绍二
查看>>
警告okyep之辈,我要让你们抱憾终生
查看>>
SCCM2012之部署安装
查看>>
.NET/ASP.NETMVC 深入剖析 Model元数据、HtmlHelper、自定义模板、模板的装饰者模式(二)...
查看>>
配套自测连载(五)
查看>>
大型企业网络配置系列课程详解(六) --PPP链路的配置与相关概念的理解
查看>>
nginx lua redis解决saltstack下发传输文件慢的问题思路
查看>>
基于嵌入式操作系统VxWorks的多任务并发程序设计(4)――任务间通信B
查看>>
SharePoint 2010 服务应用程序(Service Application)架构(1)
查看>>
JDBC+Servlet+JSP整合开发之25.JSP动作元素
查看>>
烂泥:rsync与inotify集成实现数据实时同步更新
查看>>
go语言笔记——go环境变量goroot是安装了路径和gopath是三方包路径
查看>>
数据操作类 SQLHelper.cs
查看>>