本文介绍一种高效的,很有价值的无等待队列实现 。
它基于FAA(fetch-and-add)+CAS(compare-and-swap)来实现。第一步的FAA避开高度竞争情况下第一个CAS造成的竞争。同时它还提供最强的wait-freedom保证progress。我个人给它提供了Java的一个实现:
首先:它包含如下5种数据结构。
对于一个cell,它的初始值为BOT,所以第一步是:
接着,假如enqueue Thread成功,条件变为:cell->value==val, 那么enqueue就完成操作,结束对该cell的访问。所以接下来dequeue Thread由于可能存在竞争,需要把cell的deq原子地改变为TOP,从而结束操作:
所以现在val已经被Thread ONE或者Thread TWO获取了。
但是,假如STEP ONE中的enqueue操作失败了呢?那么会怎么样呢?
此时,cell->val == TOP,做法是enqueue保留该cell的index,如果在尝试次数内就再次获取一个新的cell来尝试。否则就构造自己的enq Request,改为将enq Request塞入新的cell.而dequeue则在将val变为TOP之后,查看该cell的enq Request,如果已经有了,则尝试配对这个enq Request和该cell(enq.id == cell.index)。如果没有,则会从线程handle_t构成的ring中抽取下一个enq Request,接着尝试将该enq Request塞入该cell。如果没有enq Request则将该cell的enq改为TOP。如果去到enq Request则尝试将它配对(enq.id == cell.index)。无论如何,该cell必定要被消耗掉:- 要么因为cell->enq变为TOP从而消耗掉。
- 要么因为cell->enq变为了某个enq request,但是和该request没有配对成功,从而消耗掉。
- 要么cell->enq变为了某个enq request,并且和该request配对成功,从而cell->val获得某个该request的值。
之后,同样的dequeue通过竞争cell->deq来获取val。此过程如图下:
Atomic: cell->enq => ENQAtomic: cell->enq-id => (-cell->index)
好,到此为止,enqueue操作相关的所有操作都已走完。
另一方面,假如dequeue操作经过多次还未成功,那么它就会将该deq request放入线程的线程ring环里面。
当其他线程dequeue成功之后,就会顺着线程ring环去获取协助该deq。他们不再把deq直接变为TOP,而是变为指向那个deq Request。并且通过将deq的idx先指向选定的cell,然后改变idx的值为负,从而通知deq request的线程操作已完成。如图所示:它的enqueue代码如下:
void enqueue(queue_t *q, handle_t *th, void *v) { th->hzd_node_id = th->enq_node_id; long id; int p = MAX_PATIENCE; while (!enq_fast(q, th, v, &id) && p-- > 0) ; if (p < 0) enq_slow(q, th, v, id); th->enq_node_id = th->Ep->id; RELEASE(&th->hzd_node_id, -1);}static int enq_fast(queue_t *q, handle_t *th, void *v, long *id) { long i = FAAcs(&q->Ei, 1); cell_t *c = find_cell(&th->Ep, i, th); void *cv = BOT; if (CAS(&c->val, &cv, v)) {#ifdef RECORD th->fastenq++;#endif return 1; } else { *id = i; return 0; }}static void enq_slow(queue_t *q, handle_t *th, void *v, long id) { enq_t *enq = &th->Er; enq->val = v; RELEASE(&enq->id, id); node_t *tail = th->Ep; long i; cell_t *c; do { i = FAA(&q->Ei, 1); c = find_cell(&tail, i, th); enq_t *ce = BOT; if (CAScs(&c->enq, &ce, enq) && c->val != TOP) { if (CAS(&enq->id, &id, -i)) id = -i; break; } } while (enq->id > 0); id = -enq->id; c = find_cell(&th->Ep, id, th); if (id > i) { long Ei = q->Ei; while (Ei <= id && !CAS(&q->Ei, &Ei, id + 1)) ; } c->val = v;#ifdef RECORD th->slowenq++;#endif}static void *help_enq(queue_t *q, handle_t *th, cell_t *c, long i) { void *v = spin(&c->val); if ((v != TOP && v != BOT) || (v == BOT && !CAScs(&c->val, &v, TOP) && v != TOP)) { return v; } enq_t *e = c->enq; if (e == BOT) { handle_t *ph; enq_t *pe; long id; ph = th->Eh, pe = &ph->Er, id = pe->id; if (th->Ei != 0 && th->Ei != id) { th->Ei = 0; th->Eh = ph->next; ph = th->Eh, pe = &ph->Er, id = pe->id; } if (id > 0 && id <= i && !CAS(&c->enq, &e, pe)) th->Ei = id; else th->Eh = ph->next; if (e == BOT && CAS(&c->enq, &e, TOP)) e = TOP; } if (e == TOP) return (q->Ei <= i ? BOT : TOP); long ei = ACQUIRE(&e->id); void *ev = ACQUIRE(&e->val); if (ei > i) { if (c->val == TOP && q->Ei <= i) return BOT; } else { if ((ei > 0 && CAS(&e->id, &ei, -i)) || (ei == -i && c->val == TOP)) { long Ei = q->Ei; while (Ei <= i && !CAS(&q->Ei, &Ei, i + 1)) ; c->val = ev; } } return c->val;}
dequeue代码如下:
void *dequeue(queue_t *q, handle_t *th) { th->hzd_node_id = th->deq_node_id; void *v; long id = 0; int p = MAX_PATIENCE; do v = deq_fast(q, th, &id); while (v == TOP && p-- > 0); if (v == TOP) v = deq_slow(q, th, id); else {#ifdef RECORD th->fastdeq++;#endif } if (v != EMPTY) { help_deq(q, th, th->Dh); th->Dh = th->Dh->next; } th->deq_node_id = th->Dp->id; RELEASE(&th->hzd_node_id, -1); if (th->spare == NULL) { cleanup(q, th); th->spare = new_node(); }#ifdef RECORD if (v == EMPTY) th->empty++;#endif return v;}static void *deq_fast(queue_t *q, handle_t *th, long *id) { long i = FAAcs(&q->Di, 1); cell_t *c = find_cell(&th->Dp, i, th); void *v = help_enq(q, th, c, i); deq_t *cd = BOT; if (v == BOT) return BOT; if (v != TOP && CAS(&c->deq, &cd, TOP)) return v; *id = i; return TOP;}static void *deq_slow(queue_t *q, handle_t *th, long id) { deq_t *deq = &th->Dr; RELEASE(&deq->id, id); RELEASE(&deq->idx, id); help_deq(q, th, th); long i = -deq->idx; cell_t *c = find_cell(&th->Dp, i, th); void *val = c->val;#ifdef RECORD th->slowdeq++;#endif return val == TOP ? BOT : val;}static void help_deq(queue_t *q, handle_t *th, handle_t *ph) { deq_t *deq = &ph->Dr; long idx = ACQUIRE(&deq->idx); long id = deq->id; if (idx < id) return; node_t *Dp = ph->Dp; th->hzd_node_id = ph->hzd_node_id; FENCE(); idx = deq->idx; long i = id + 1, old = id, new = 0; while (1) { node_t *h = Dp; for (; idx == old && new == 0; ++i) { cell_t *c = find_cell(&h, i, th); long Di = q->Di; while (Di <= i && !CAS(&q->Di, &Di, i + 1)) ; void *v = help_enq(q, th, c, i); if (v == BOT || (v != TOP && c->deq == BOT)) new = i; else idx = ACQUIRE(&deq->idx); } if (new != 0) { if (CASra(&deq->idx, &idx, new)) idx = new; if (idx >= new) new = 0; } if (idx < 0 || deq->id != id) break; cell_t *c = find_cell(&Dp, idx, th); deq_t *cd = BOT; if (c->val == TOP || CAS(&c->deq, &cd, deq) || cd == deq) { CAS(&deq->idx, &idx, -idx); break; } old = idx; if (idx >= i) i = idx + 1; }}