Java有锁并发、无锁并发和CAS实例分析

有锁并发

对于大多数程序员(当然我也基本上是其中一员),并发编程几乎就等价于给相关数据结构加上一个锁(Mutex)。比如如果我们需要一个支持并发的栈,那最简单的方法就是给一个单线程的栈加上锁 std::sync::Mutex 。(加上Arc是为了能让多个线程都拥有栈的所有权)

<code>
   
   
  <p>use std::sync::{Mutex, Arc};<br/><br/>#[derive(Clone)]<br/>struct ConcurrentStack<T> {<br/>    inner: Arc<Mutex<Vec<T>>>,<br/>}<br/><br/>impl<T> ConcurrentStack<T> {<br/>    pub fn new() -> Self {<br/>        ConcurrentStack {<br/>            inner: Arc::new(Mutex::new(Vec::new())),<br/>        }<br/>    }<br/><br/>    pub fn push(&self, data: T) {<br/>        let mut inner = self.inner.lock().unwrap();<br/>        (*inner).push(data);<br/>    }<br/><br/>    pub fn pop(&self) -> Option<T> {<br/>        let mut inner = self.inner.lock().unwrap();<br/>        (*inner).pop()<br/>    }<br/><br/>}<br/>
  
    

   
   
  </p></code>

代码写起来十分方便,因为它几乎与单线程版本相同,这很明显是一个好处。只需要按照单线程的版本写完,然后给数据结构加上锁,然后在必要的时候获取和释放(在Rust中基本上是自动的)锁即可。

那么问题是什么呢?首先不谈你可能会忘记获取和释放锁(这一点要感谢Rust,在Rust中几乎不可能发生),你可能会面临死锁的问题(哲学家就餐问题)。然后也不谈一些低优先级的任务可能会长期抢占高优先级任务的资源(因为锁是第一位的),当线程数量比较大的时候,大部分的时间都被用在了同步上(等待锁能被获取),性能就会变得非常差。考虑一个大量读取而偶尔写入的并发数据库,如果用锁去处理,即使数据库没有任何更新,每两个读之间也需要做一次同步,代价太大!

无锁并发

于是,大量计算机科学家和程序员就把目光转向了无锁(lock-free)并发。无锁对象:如果一个共享对象保证了无论其他线程做何种操作,总有一些线程会在有限的系统操作步骤后完成一个对其的操作 Her91 。也就是说,至少有一个线程对其操作会取得成效。使用锁的并发明显就不属于这一范畴:如果获取了锁的线程被延迟,那么这段时间里没有任何线程能够完成任何操作。极端情况下如果出现了死锁,那么没有任何线程能够完成任何操作。

CAS(compare and swap)原语

那大家可能会好奇,无锁并发要怎么实现呢?有没有例子呢?在此之前让我们先看一下一个公认的在无锁并发中非常重要的原子原语:CAS。CAS的过程是用指定值去比较一储存值,只有当他们相同时,才会修改储存值为新的指定值。CAS是个原子操作(由处理器支持,比如x86的compare and exchange (CMPXCHG)),该原子性保证了如果其他线程已经改变了储存值,那么写入就会失败。Rust标准库中的 std::sync::atomic 中的类型就提供了CAS操作,比如原子指针 std::sync::atomic::AtomicPtr

<code>
   
   
  <p>pub fn compare_and_swap(<br/>    &self,<br/>    current: *mut T,<br/>    new: *mut T,<br/>    order: Ordering<br/>) -> *mut T<br/>
  
    

   
   
  </p></code>

(在这里,不用纠结ordering是什么,也就是说请尽管忽略忽略 Acquire , Release , Relaxed

无锁栈(天真版)

<code>
   
   
  <p>#![feature(box_raw)]<br/><br/>use std::ptr::{self, null_mut};<br/>use std::sync::atomic::AtomicPtr;<br/>use std::sync::atomic::Ordering::{Relaxed, Release, Acquire};<br/><br/>pub struct Stack<T> {<br/>    head: AtomicPtr<Node<T>>,<br/>}<br/><br/>struct Node<T> {<br/>    data: T,<br/>    next: *mut Node<T>,<br/>}<br/><br/>impl<T> Stack<T> {<br/>    pub fn new() -> Stack<T> {<br/>        Stack {<br/>            head: AtomicPtr::new(null_mut()),<br/>        }<br/>    }<br/><br/>    pub fn pop(&self) -> Option<T> {<br/>        loop {<br/>            // 快照<br/>            let head = self.head.load(Acquire);<br/><br/>            // 如果栈为空<br/>            if head == null_mut() {<br/>                return None<br/>            } else {<br/>                let next = unsafe { (*head).next };<br/><br/>                // 如果现状较快照并没有发生改变<br/>                if self.head.compare_and_swap(head, next, Release) == head {<br/><br/>                    // 读取内容并返回<br/>                    return Some(unsafe { ptr::read(&(*head).data) })<br/>                }<br/>            }<br/>        }<br/>    }<br/><br/>    pub fn push(&self, t: T) {<br/>        // 创建node并转化为*mut指针<br/>        let n = Box::into_raw(Box::new(Node {<br/>            data: t,<br/>            next: null_mut(),<br/>        }));<br/>        loop {<br/>            // 快照<br/>            let head = self.head.load(Relaxed);<br/><br/>            // 基于快照更新node<br/>            unsafe { (*n).next = head; }<br/><br/>            // 如果在此期间,快照仍然没有过时<br/>            if self.head.compare_and_swap(head, n, Release) == head {<br/>                break<br/>            }<br/>        }<br/>    }<br/>}<br/>
  
    

   
   
  </p></code>

我们可以看到,无论是pop还是push思路都是一样的:先在快照上pop或者是push,然后尝试用CAS替换原来的数据。如果快照和数据相等,就意味着在此期间没有执行写操作,因此更新就能成功。如果数据不一致,则表明在此期间有其他线程对其进行了修改,需要重新开始。这就是一个无锁的栈。似乎一切都已经大功告成了!

内存释放

如果你正在使用Java或其他带有GC的编程语言,你可能已经完成了大量的工作。现在的问题在于,在Rust这种没有GC的语言中,pop中

return Some(unsafe { ptr::read(&(*head).data) })

并没有人去释放 head ,这是一个内存泄漏!哎,看来无锁并发好不容易呢。

以上就是Java有锁并发、无锁并发和CAS实例分析的详细内容,更多请关注www.sxiaw.com其它相关文章!