staticfinalclassNode { /** Marker to indicate a node is waiting in shared mode 共享模式*/ staticfinalNodeSHARED=newNode(); /** Marker to indicate a node is waiting in exclusive mode 独占模式*/ staticfinalNodeEXCLUSIVE=null;
// @return the predecessor of this node final Node predecessor()throws NullPointerException { Nodep= prev; if (p == null) thrownewNullPointerException(); else return p; }
Node() {} // Used to establish initial head or SHARED marker
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
// 如果当前线程前面有一个排队的线程, 则返回 true // 如果当前线程位于队列的顶部或队列为空, 则返回 false publicfinalbooleanhasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Nodet= tail; // Read fields in reverse initialization order Nodeh= head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Nodenode=newNode(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Nodepred= tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); // 入队 return node; }
privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; // 获得前置节点的状态 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 这里会将前置节点的status从初始化的0变为-1, 用于后续唤醒操作 } returnfalse; }
parkAndCheckInterrupt()
1 2 3 4 5 6 7 8 9
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ privatefinalbooleanparkAndCheckInterrupt() { LockSupport.park(this); // 线程挂起, 程序不会继续向下执行 return Thread.interrupted(); // 在调用unpark方法后, 会继续执行 }
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ privatevoidcancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return;
node.thread = null;
// Skip cancelled predecessors Nodepred= node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. NodepredNext= pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Nodenext= node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }
privatevoidunparkSuccessor(Node node) { // node是头节点 /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 哨兵节点状态置0
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Nodes= node.next; if (s == null || s.waitStatus > 0) { s = null; for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 为线程发放许可 }