C++无锁栈已损坏
问题描述:
我试图实现一个无锁栈以便与有界的普通c数组中的外部托管内存一起使用。我知道参考实现(例如来自Anthony Williams:Action中的Concurrency)以及其他书籍和博客/文章。C++无锁栈已损坏
该实现遵循这些引用并避免ABA问题,因为外部存储器位置使用唯一索引而不是回收指针寻址。因此它根本不需要处理成员管理,而且很简单。
我写了一些测试,在高负载和争用(压力测试)和单线程下在该堆栈上执行弹出和推送操作。前者失败后出现奇怪的问题,我不明白,对我来说看起来很模糊。
也许有人有一个想法?
-
问题:将已弹出的节点推回堆栈失败,因为违反了前提条件,该节点没有后继(下一个)。
BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next);
复制设置:至少3个线程和16的容量。大约500次传球。然后推送操作失败。
-
问题:所有线程弹出的元素数量和连接后堆栈中剩余的元素数量不匹配容量(转换中丢失的节点)。
BOOST_ASSERT(aNodes.size()+nPopped == nCapacity);
繁殖设置:2个线程和容量2.需要大量通行证发生,至少对我700后堆叠的该头是0,但只有一个节点存在于弹出容器。节点{2,0}处于悬挂状态。
我用vs2005,vs2013和vs2015编译。都有同样的问题(vs2005也是代码看起来像C + + 03的原因)。
这里是节点+堆栈
template <typename sizeT> struct node
{
sizeT cur; //!< construction invariant
atomic<sizeT> next;
atomic<sizeT> data;
explicit node() // invalid node
: cur(0), next(0), data(0)
{}
explicit node(sizeT const& nCur, sizeT const& nNext, sizeT const& nData)
: cur(nCur), next(nNext), data(nData)
{}
node& operator=(node const& rhs)
{
cur = rhs.cur;
next.store(rhs.next.load(memory_order_relaxed));
data.store(rhs.data.load(memory_order_relaxed));
return *this;
}
};
template <typename sizeT> struct stack
{
private:
static memory_order const relaxed = memory_order_relaxed;
atomic<sizeT> m_aHead;
public:
explicit stack(sizeT const& nHead) : m_aHead(nHead) {}
template <typename tagT, typename T, std::size_t N>
typename enable_if<is_same<tagT,Synchronized>,sizeT>::type
pop(T (&aNodes)[N])
{
sizeT nOldHead = m_aHead.load();
for(;;)
{
if(!nOldHead) return 0;
BOOST_ASSERT(nOldHead <= N);
T& aOldHead = aNodes[nOldHead-1];
sizeT const nNewHead = aOldHead.next.load(/*relaxed*/);
BOOST_ASSERT(nNewHead <= N);
sizeT const nExpected = nOldHead;
if(m_aHead.compare_exchange_weak(nOldHead,nNewHead
/*,std::memory_order_acquire,std::memory_order_relaxed*/))
{
BOOST_ASSERT(nExpected == nOldHead);
// <--- from here on aOldHead is thread local ---> //
aOldHead.next.store(0 /*,relaxed*/);
return nOldHead;
}
// TODO: add back-off strategy under contention (use loop var)
}
}
template <typename tagT, typename T, std::size_t N>
typename enable_if<is_same<tagT,Synchronized>,void>::type
push(T (&aNodes)[N], sizeT const& nNewHead)
{
#ifndef NDEBUG
{
BOOST_ASSERT(0 < nNewHead && nNewHead <= N);
sizeT const nNext = aNodes[nNewHead-1].next;
BOOST_ASSERT(!nNext);
}
#endif
sizeT nOldHead = m_aHead.load(/*relaxed*/);
for(;;)
{
aNodes[nNewHead-1].next.store(nOldHead /*,relaxed*/);
sizeT const nExpected = nOldHead;
BOOST_ASSERT(nOldHead <= N);
if(m_aHead.compare_exchange_weak(nOldHead,nNewHead
/*,std::memory_order_release,std::memory_order_relaxed*/))
{
BOOST_ASSERT(nExpected == nOldHead);
return;
}
// TODO: add back-off strategy under contention (use loop var)
}
}
};
和相当嘈杂测试类
class StackTest
{
private:
typedef boost::mpl::size_t<64> Capacity;
//typedef boost::uint_t<static_log2_ceil<Capacity::value>::value>::least size_type;
typedef std::size_t size_type;
static size_type const nCapacity = Capacity::value;
static size_type const nNodes = Capacity::value;
typedef node<size_type> Node;
typedef stack<size_type> Stack;
typedef mt19937 Twister;
typedef random::uniform_int_distribution<std::size_t> Distribution;
typedef variate_generator<Twister,Distribution> Die;
struct Data //!< shared along threads
{
Node m_aNodes[nNodes];
Stack m_aStack;
explicit Data() : m_aStack(nNodes)
{
m_aNodes[0] = Node(1,0,0); // tail of stack
for(size_type i=1; i<nNodes; ++i)
{
m_aNodes[i] = Node(static_cast<size_type>(i+1),i,0);
}
}
template <typename syncT>
void Run(
uuids::random_generator& aUUIDGen,
std::size_t const& nPasses,
std::size_t const& nThreads)
{
std::vector<ThreadLocalData> aThreadLocalDatas(nThreads,ThreadLocalData(*this));
{
static std::size_t const N = 100000;
Die aRepetition(Twister(hash_value(aUUIDGen())),Distribution(0,N));
Die aAction(Twister(hash_value(aUUIDGen())),Distribution(0,1));
for(std::size_t i=0; i<nThreads; ++i)
{
std::vector<bool>& aActions = aThreadLocalDatas[i].m_aActions;
std::size_t const nRepetition = aRepetition();
aActions.reserve(nRepetition);
for(std::size_t k=0; k<nRepetition; ++k)
{
aActions.push_back(static_cast<bool>(aAction()));
}
}
}
std::size_t nPopped = 0;
if(nThreads == 1)
{
std::size_t const i = 0;
aThreadLocalDatas[i].Run<syncT>(i);
nPopped += aThreadLocalDatas[i].m_aPopped.size();
}
else
{
std::vector<boost::shared_ptr<thread> > aThreads;
aThreads.reserve(nThreads);
for(std::size_t i=0; i<nThreads; ++i)
{
aThreads.push_back(boost::make_shared<thread>(boost::bind(&ThreadLocalData::Run<syncT>,&aThreadLocalDatas[i],i)));
}
for(std::size_t i=0; i<nThreads; ++i)
{
aThreads[i]->join();
nPopped += aThreadLocalDatas[i].m_aPopped.size();
}
}
std::vector<size_type> aNodes;
aNodes.reserve(nCapacity);
while(size_type const nNode = m_aStack.pop<syncT>(m_aNodes))
{
aNodes.push_back(nNode);
}
std::clog << dump(m_aNodes,4) << std::endl;
BOOST_ASSERT(aNodes.size()+nPopped == nCapacity);
}
};
struct ThreadLocalData //!< local to each thread
{
Data& m_aData; //!< shared along threads
std::vector<bool> m_aActions; //!< either pop or push
std::vector<size_type> m_aPopped; //!< popp'ed nodes
explicit ThreadLocalData(Data& aData)
: m_aData(aData), m_aActions(), m_aPopped()
{
m_aPopped.reserve(nNodes);
}
template <typename syncT>
void Run(std::size_t const& k)
{
BOOST_FOREACH(bool const& aAction, m_aActions)
{
if(aAction)
{
if(size_type const nNode = m_aData.m_aStack.pop<syncT>(m_aData.m_aNodes))
{
BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next);
m_aPopped.push_back(nNode);
}
}
else
{
if(!m_aPopped.empty())
{
size_type const nNode = m_aPopped.back();
size_type const nNext = m_aData.m_aNodes[nNode-1].next;
ASSERT_IF(!nNext,"nNext=" << nNext << " for " << m_aData.m_aNodes[nNode-1] << "\n\n" << dump(m_aData.m_aNodes));
m_aData.m_aStack.push<syncT>(m_aData.m_aNodes,nNode);
m_aPopped.pop_back();
}
}
}
}
};
template <typename syncT>
static void PushPop(
uuids::random_generator& aUUIDGen,
std::size_t const& nPasses,
std::size_t const& nThreads)
{
BOOST_ASSERT(nThreads > 0);
BOOST_ASSERT(nThreads == 1 || (is_same<syncT,Synchronized>::value));
std::clog << BOOST_CURRENT_FUNCTION << " with threads=" << nThreads << std::endl;
for(std::size_t nPass=0; nPass<nPasses; ++nPass)
{
std::ostringstream s;
s << " " << nPass << "/" << nPasses << ": ...";
std::clog << s.str() << std::endl;
Data().Run<syncT>(aUUIDGen,nPass,nThreads);
}
}
public:
static void Run()
{
typedef StackTest self_t;
uuids::random_generator aUUIDGen;
static std::size_t const nMaxPasses = 1000;
Die aPasses(Twister(hash_value(aUUIDGen())),Distribution(0,nMaxPasses));
{
//std::size_t const nThreads = 2; // thread::hardware_concurrency()+1;
std::size_t const nThreads = thread::hardware_concurrency()+1;
self_t().PushPop<Synchronized>(aUUIDGen,aPasses(),nThreads);
}
}
};
这里的基本代码是link下载所有必需的文件。
答
这两个问题都只是ABA问题的另一个方面。
堆栈:{2,1},{1,0}
- 线程A
- 弹出
new_head = 1
...时间片超过
- 弹出
- 线程B
- 弹出
堆栈:{1,0},弹出:{2,0}
- 弹出
堆栈:{},弹出: {2,0},{1,0}
- 推({2,0})
堆栈:{2,0}
- 弹出
- 线程A
- 弹出持续
cmp_exch成功,因为头是2
堆栈:{},头= 1 --- WRONG,0将是正确
任何可能会出现问题,因为访问节点不是线程本地了。这包括对突出节点(问题1)或丢失节点(问题2)的意外修改。
head + next需要在一个cmp_exch中修改以避免该问题。
对于问题1:在pop上返回它之前,您不能将节点的下一个设置为NULL吗? – AndyG
这正是奇怪的事情。接下来设置为0,然后再从pop中返回。该节点再次被推入,其下一个不再是0。从技术上讲,这是唯一可能的,如果另一个线程在同一时间再次修改该节点。但我不明白应该怎样才能让另一个线程访问一个弹出的节点。 –
“该实现遵循这些引用并避免了aba问题,因为外部存储器位置使用唯一索引来寻址,而不是回收指针。”嗯,这些索引不是唯一的。您仍然有ABA问题。 –