std::shared_ptr std::weak_ptr 线程安全性

本篇主要基于GCC-4.8.5源码来分析std::shared_ptr的线程安全性,std::weak_ptr是否线程安全,再者两者结合是否可以应用在某些多线程并发场景而无需显式的使用std::mutex锁。

相当辛苦的画了shared_ptr和weak_ptr的底层实现及对应关系的类图:

先来看shared_ptr,众所周知其底层实现基于引用计数,那多线程下多个shared_ptr引用同一对象实例是否安全?内部对引用计数的修改是否多线程安全?持有的唯一对象析构销毁是否安全?
shared_ptr继承自__shared_ptr,持有一个引用计数对象_M_refcount和一个真正的对象指针。先来看shared_ptr的构造:

template <typename _Tp1>
explicit shared_ptr(_Tp1* __p) : __shared_ptr<_Tp>(__p) { }
 
template<typename _Tp1>
explicit __shared_ptr(_Tp1* __p) : _M_ptr(__p), _M_refcount(__p) {
 __glibcxx_function_requires(_ConvertibleConcept<_Tp1*, _Tp*>)
 static_assert( sizeof(_Tp1) > 0, "incomplete type" );
 __enable_shared_from_this_helper(_M_refcount, __p, __p);
}

template<typename _Tp1>
shared_ptr& operator=(const shared_ptr<_Tp1>& __r) noexcept {
 this->__shared_ptr<_Tp>::operator=(__r);
 return *this;
}

template<typename _Tp1>
__shared_ptr&operator=(const __shared_ptr<_Tp1, _Lp>& __r) noexcept {
 _M_ptr = __r._M_ptr;
 _M_refcount = __r._M_refcount; // __shared_count::op= doesn't throw
 return *this;
}

template<class _Tp1>
__shared_ptr& operator=(__shared_ptr<_Tp1, _Lp>&& __r) noexcept {
 __shared_ptr(std::move(__r)).swap(*this);
 return *this;
}

void reset() noexcept { __shared_ptr().swap(*this); }

template<typename _Tp1>
void reset(_Tp1* __p) // _Tp1 must be complete.
{
 // Catch self-reset errors.
 _GLIBCXX_DEBUG_ASSERT(__p == 0 || __p != _M_ptr);
 __shared_ptr(__p).swap(*this);
}

void swap(__shared_ptr<_Tp, _Lp>& __other) noexcept{
 std::swap(_M_ptr, __other._M_ptr);
 _M_refcount._M_swap(__other._M_refcount);
}

由上可以看出shared_ptr其构造、swap、reset等包含两步操作:对象原始指针的拷贝或交换、引用计数的构造或者交换。
这两步操作并非原子,在多线程下通过shared_ptr持有同一个对象实例,多读多写,拷贝、构造、修改、销毁均存在时,极有可能获取到已销毁的指针,从而程序挂掉。
尤其注意swap的实现,基于新对象的指针栈上构造一个临时的shared_ptr对象,再与当前shared_ptr对象进行两步交换操作。
从陈硕的几个图例也可以清晰看到,“为什么多线程读写 shared_ptr 要加锁?”。

再来看对引用计数的管理:

// _Sp_counted_base
void _M_add_ref_copy() { __gnu_cxx::__atomic_add_dispatch(&_M_use_count, 1); } 

void _M_release() noexcept {
 // Be race-detector-friendly.  For more info see bits/c++config.
 _GLIBCXX_SYNCHRONIZATION_HAPPENS_BEFORE(&_M_use_count);
 if (__gnu_cxx::__exchange_and_add_dispatch(&_M_use_count, -1) == 1) {
   _GLIBCXX_SYNCHRONIZATION_HAPPENS_AFTER(&_M_use_count);
   _M_dispose();
   // There must be a memory barrier between dispose() and destroy()
   // to ensure that the effects of dispose() are observed in the
   // thread that runs destroy().
   // See http://gcc.gnu.org/ml/libstdc++/2005-11/msg00136.html
   if (_Mutex_base<_Lp>::_S_need_barriers) {
    _GLIBCXX_READ_MEM_BARRIER;
    _GLIBCXX_WRITE_MEM_BARRIER;
   }

   // Be race-detector-friendly.  For more info see bits/c++config.
   _GLIBCXX_SYNCHRONIZATION_HAPPENS_BEFORE(&_M_weak_count);
   if (__gnu_cxx::__exchange_and_add_dispatch(&_M_weak_count, -1) == 1) {
    _GLIBCXX_SYNCHRONIZATION_HAPPENS_AFTER(&_M_weak_count);
    _M_destroy();
   }
 }
}

long _M_get_use_count() const noexcept {
 // No memory barrier is used here so there is no synchronization
 // with other threads.
 return __atomic_load_n(&_M_use_count, __ATOMIC_RELAXED);
}

// __shared_count
__shared_count(const __shared_count& __r) noexcept : _M_pi(__r._M_pi) {
 if (_M_pi != 0)
  _M_pi->_M_add_ref_copy();
}

__shared_count& operator=(const __shared_count& __r) noexcept {
 _Sp_counted_base<_Lp>* __tmp = __r._M_pi;
 if (__tmp != _M_pi) {
  if (__tmp != 0)
   __tmp->_M_add_ref_copy();
  if (_M_pi != 0)
   _M_pi->_M_release();
  _M_pi = __tmp;
 }
 return *this;
}

void _M_swap(__shared_count& __r) noexcept {
 _Sp_counted_base<_Lp>* __tmp = __r._M_pi;
 __r._M_pi = _M_pi;
 _M_pi = __tmp;
}

template<_Lock_policy _Lp>
inline __shared_count<_Lp>:: __shared_count(const __weak_count<_Lp>& __r)
 : _M_pi(__r._M_pi) {
 if (_M_pi != 0)
 _M_pi->_M_add_ref_lock();
 else
 __throw_bad_weak_ptr();
}

template<> 
inline void
_Sp_counted_base<_S_atomic>::_M_add_ref_lock() {
 // Perform lock-free add-if-not-zero operation.
 _Atomic_word __count = _M_get_use_count();
 do
 {
 if (__count == 0)
 __throw_bad_weak_ptr();
 // Replace the current counter value with the old value + 1, as
 // long as it's not changed meanwhile. 
 }
 while (!__atomic_compare_exchange_n(&_M_use_count, &__count, __count + 1,
  true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED));
}

_M_add_ref_copy增加引用计数使用了__atomic_add_dispatch,为原子的。
_M_release则涉及到多线程下竞态处理,__gnu_cxx::__exchange_and_add_dispatch(&_M_use_count, -1) == 1,将当前引用计数减1并返回原值,这个原子性操作则保证了只有一个线程会将引用计数减为0,继而只有这个线程将真正的对象析构销毁掉。_M_weak_count减为0,则作为管理者也自行销毁。
_M_dispose和_M_destroy之间插入了读写内存屏障,保证了其他线程对这个操作顺序的可见。
_M_get_use_count并不需要施加额外的内存屏障,用于传入weak_ptr来构造shared_ptr,此时通过类似CAS的lock free方式修改引用计数,用于weak_ptr的lock操作线程安全构造shared_ptr。

可见,多个shared_ptr通过引用计数持有同一个对象实例,引用计数本身的管理完全是线程安全的,也说明了只有一个线程会执行安全的销毁掉持有的对象实例,正常情况下的使用不会发生两次释放同一对象实例的指针。

下面来看weak_ptr,其包含一个__weak_count对象来实现对管理者(weak_ptr)的计数标记,weak_ptr其使用常常伴随着shared_ptr,用于解决可能发生的shared_ptr循环引用、通过弱引用转换检查对象的有效性等。

// __weak_ptr
template<typename _Tp1, typename = typename
std::enable_if<std::is_convertible<_Tp1*, _Tp*>::value>::type>
 __weak_ptr(const __shared_ptr<_Tp1, _Lp>& __r) noexcept
 : _M_ptr(__r._M_ptr), _M_refcount(__r._M_refcount) { 
} 

template<typename _Tp1>
__weak_ptr& operator=(const __shared_ptr<_Tp1, _Lp>& __r) noexcept {
 _M_ptr = __r._M_ptr;
 _M_refcount = __r._M_refcount;
 return *this;
}

bool expired() const noexcept {
 return _M_refcount._M_get_use_count() == 0;
}

__shared_ptr<_Tp, _Lp> lock() const noexcept {
 #ifdef __GTHREADS
 // Optimization: avoid throw overhead.
 if (expired())
  return __shared_ptr<element_type, _Lp>();

 __try {
  return __shared_ptr<element_type, _Lp>(*this);
 }
 __catch(const bad_weak_ptr&) {
  // Q: How can we get here? 
  // A: Another thread may have invalidated r after the
  //    use_count test above.
  return __shared_ptr<element_type, _Lp>();
 }

 #else
 // Optimization: avoid try/catch overhead when single threaded.
 return expired() ? __shared_ptr<element_type, _Lp>()
		  : __shared_ptr<element_type, _Lp>(*this);

 #endif
} // XXX MT

__weak_count(const __shared_count<_Lp>& __r) noexcept
 : _M_pi(__r._M_pi) {
 if (_M_pi != 0)
  _M_pi->_M_weak_add_ref();
}

__weak_count(const __weak_count<_Lp>& __r) noexcept
 : _M_pi(__r._M_pi) {
 if (_M_pi != 0)
  _M_pi->_M_weak_add_ref();
}

__weak_count<_Lp>& operator=(const __shared_count<_Lp>& __r) noexcept {
 _Sp_counted_base<_Lp>* __tmp = __r._M_pi;
 if (__tmp != 0)
  __tmp->_M_weak_add_ref();
 if (_M_pi != 0)
  _M_pi->_M_weak_release();
  _M_pi = __tmp;
  return *this;
}

template<typename _Tp1>
explicit __shared_ptr(const __weak_ptr<_Tp1, _Lp>& __r)
 : _M_refcount(__r._M_refcount) // may throw	
{
 __glibcxx_function_requires(_ConvertibleConcept<_Tp1*, _Tp*>)
 // It is now safe to copy __r._M_ptr, as
 // _M_refcount(__r._M_refcount) did not throw.
 _M_ptr = __r._M_ptr;
}

需要特别注意的地方是weak_ptr的lock方法返回的是一个shared_ptr对象,可能是一个是失败的空shared_ptr,但是只要返回非空成功,则一定是持有一个未销毁的有效可用的对象实例。
C++11标准规定了这个lock操作是线程安全的,GCC4.8.5的lock实现也是线程安全的,上下文已贴出和指出相关代码,不再赘述。

假设有这么一种应用场景,一个读者线程,一个写者线程,持有同一个智能指针shared_ptr,读者线程使用持有的对象,但是写者线程可能会调用reset操作修改这个shared_ptr,如果读者不加任何操作直接使用同一shared_ptr则可能发生写者将旧有的对象已释放掉而读者继续使用该对象,so不美好的事情发生了。

reset这个操作前面已分析过,两步交换操作,非线程安全,先交换对象实例指针,后交换引用计数对象。

如果读者线程现在使用这么一段伪代码:

std::weak_ptr<Type> weak = shared_; //weak仅先后拷贝shared持有的对象指针和引用计数
if (auto holder = weak.lock()) {
 // happy
} else {
 // whatever
}

多线程场景下,一读一写,多读一写,lock的线程安全保证了只要返回了非空的shared_ptr,则最后一定是安全的可访问的有效对象。

这个例子,没有使用额外的std::mutex锁同步措施,实现了容许读到旧对象的应用场景并发控制。
无锁高效,何乐为不为。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注