比特币源码解读之线程处理-其他线程
(本文使用的是比特币v0.1.0版本 点击下载源码)
比特币源码解读之线程处理分为两篇,矿工线程处理和其他线程处理两篇,本文描述其他线程处理(上篇是《比特币源码解读之线程处理-矿工线程》)。本文主要描述启动节点时创建的四类线程:
(1)通过IRC获取其他比特币服务器节点的地址;
(2)当前节点接受其他比特币服务器的连接请求;
(3)当前节点主动连接其他比特币服务器;
(4)当前节点的消息处理。
流程图如下所示:
获取其他比特币服务器节点地址ThreadIRCSeed
连接IRC服务器
连接IRC服务器,服务器名称为chat.freenode.net
struct hostent* phostent = gethostbyname("chat.freenode.net");
CAddress addrConnect(*(u_long*)phostent->h_addr_list[0], htons(6667));
SOCKET hSocket;
if (!ConnectSocket(addrConnect, hSocket))
{
printf("IRC connect failed\n");
return;
}
备注:IRC(Internet Relay Chat)互联网中继聊天,IRC的最大特点是实现了在线实时交谈,速度快、功能多的优点使它比电子邮件或新闻组等联络沟通方式更具吸引力。IRC可以设置单独的频道,在这个频道内,输出的文字可供所有人都看到。这样,来自世界不同角落的人能同时得到有关信息。而如果是两个人之间的单独交谈,甚至可以不用通过服务器,以保证谈话的保密性。
获取合适的比特币服务器地址并解析
if (pszName[0] == 'u')
{
CAddress addr;
if (DecodeAddress(pszName, addr))
{
CAddrDB addrdb;
if (AddAddress(addrdb, addr))
printf("new ");
addr.print();
}
else
{
printf("decode failed\n");
}
}
将地址保存到mapAddresses向量中
bool AddAddress(CAddrDB& addrdb, const CAddress& addr)
{
if (!addr.IsRoutable())
return false;
if (addr.ip == addrLocalHost.ip)
return false;
CRITICAL_BLOCK(cs_mapAddresses)
{
map<vector<unsigned char>, CAddress>::iterator it = mapAddresses.find(addr.GetKey());
if (it == mapAddresses.end())
{
// New address
mapAddresses.insert(make_pair(addr.GetKey(), addr));
addrdb.WriteAddress(addr);
return true;
}
else
{
CAddress& addrFound = (*it).second;
if ((addrFound.nServices | addr.nServices) != addrFound.nServices)
{
// Services have been added
addrFound.nServices |= addr.nServices;
addrdb.WriteAddress(addrFound);
return true;
}
}
}
return false;
}
接受其他比特币服务器节点连接请求ThreadSocketHandler
断开vNodes中重复连接的节点
map<unsigned int, CNode*> mapFirst;
foreach(CNode* pnode, vNodes)
{
if (pnode->fDisconnect)
continue;
unsigned int ip = pnode->addr.ip;
if (mapFirst.count(ip) && addrLocalHost.ip < ip)
{
// In case two nodes connect to each other at once,
// the lower ip disconnects its outbound connection
CNode* pnodeExtra = mapFirst[ip];
if (pnodeExtra->GetRefCount() > (pnodeExtra->fNetworkNode ? 1 : 0))
swap(pnodeExtra, pnode);
if (pnodeExtra->GetRefCount() <= (pnodeExtra->fNetworkNode ? 1 : 0))
{
printf("(%d nodes) disconnecting duplicate: %s\n", vNodes.size(), pnodeExtra->addr.ToString().c_str());
if (pnodeExtra->fNetworkNode && !pnode->fNetworkNode)
{
pnode->AddRef();
swap(pnodeExtra->fNetworkNode, pnode->fNetworkNode);
pnodeExtra->Release();
}
pnodeExtra->fDisconnect = true;
}
}
mapFirst[ip] = pnode;
}
断开没有使用的节点
vector<CNode*> vNodesCopy = vNodes;
foreach(CNode* pnode, vNodesCopy)
{
if (pnode->ReadyToDisconnect() && pnode->vRecv.empty() && pnode->vSend.empty())
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
pnode->Disconnect();
// hold in disconnected pool until all refs are released
pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 5 * 60);
if (pnode->fNetworkNode)
pnode->Release();
vNodesDisconnected.push_back(pnode);
}
}
清除已经断开的节点
list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
foreach(CNode* pnode, vNodesDisconnectedCopy)
{
// wait until threads are done using it
if (pnode->GetRefCount() <= 0)
{
bool fDelete = false;
TRY_CRITICAL_BLOCK(pnode->cs_vSend)
TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
TRY_CRITICAL_BLOCK(pnode->cs_mapRequests)
TRY_CRITICAL_BLOCK(pnode->cs_inventory)
fDelete = true;
if (fDelete)
{
vNodesDisconnected.remove(pnode);
delete pnode;
}
}
}
接受新的连接并保存到vNodes中
if (FD_ISSET(hListenSocket, &fdsetRecv))
{
struct sockaddr_in sockaddr;
int len = sizeof(sockaddr);
SOCKET hSocket = accept(hListenSocket, (struct sockaddr*)&sockaddr, &len);
CAddress addr(sockaddr);
if (hSocket == INVALID_SOCKET)
{
if (WSAGetLastError() != WSAEWOULDBLOCK)
printf("ERROR ThreadSocketHandler accept failed: %d\n", WSAGetLastError());
}
else
{
printf("accepted connection from %s\n", addr.ToString().c_str());
CNode* pnode = new CNode(hSocket, addr, true);
pnode->AddRef();
CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode);
}
}
接收信息并保存到pnode->vRecv中
if (FD_ISSET(hSocket, &fdsetRecv))
{
TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
{
CDataStream& vRecv = pnode->vRecv;
unsigned int nPos = vRecv.size();
// typical socket buffer is 8K-64K
const unsigned int nBufSize = 0x10000;
vRecv.resize(nPos + nBufSize);
int nBytes = recv(hSocket, &vRecv[nPos], nBufSize, 0);
vRecv.resize(nPos + max(nBytes, 0));
if (nBytes == 0)
{
// socket closed gracefully
if (!pnode->fDisconnect)
printf("recv: socket closed\n");
pnode->fDisconnect = true;
}
else if (nBytes < 0)
{
// socket error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
if (!pnode->fDisconnect)
printf("recv failed: %d\n", nErr);
pnode->fDisconnect = true;
}
}
}
}
从pnode->vSend中获取消息并发送出去
if (FD_ISSET(hSocket, &fdsetSend))
{
TRY_CRITICAL_BLOCK(pnode->cs_vSend)
{
CDataStream& vSend = pnode->vSend;
if (!vSend.empty())
{
int nBytes = send(hSocket, &vSend[0], vSend.size(), 0);
if (nBytes > 0)
{
vSend.erase(vSend.begin(), vSend.begin() + nBytes);
}
else if (nBytes == 0)
{
if (pnode->ReadyToDisconnect())
pnode->vSend.clear();
}
else
{
printf("send error %d\n", nBytes);
if (pnode->ReadyToDisconnect())
pnode->vSend.clear();
}
}
}
}
连接其他比特币服务器节点ThreadOpenConnections
遍历地址映射表mapAddresses并保存地址到mapIP中
for (map<vector<unsigned char>, CAddress>::iterator mi = mapAddresses.lower_bound(CAddress(ipC, 0).GetKey());
mi != mapAddresses.upper_bound(CAddress(ipC | ~nIPCMask, 0xffff).GetKey());
++mi)
{
const CAddress& addr = (*mi).second;
unsigned int nRandomizer = (addr.nLastFailed * addr.ip * 7777U) % 20000;
if (GetTime() - addr.nLastFailed > nDelay * nRandomizer / 10000)
mapIP[addr.ip].push_back(addr);
}
遍历mapIP并与节点建立连接ConnectNode
map<unsigned int, vector<CAddress> >::iterator mi = mapIP.begin();
advance(mi, GetRand(mapIP.size()));
// Once we've chosen an IP, we'll try every given port before moving on
foreach(const CAddress& addrConnect, (*mi).second)
{
if (addrConnect.ip == addrLocalHost.ip || !addrConnect.IsIPv4() || FindNode(addrConnect.ip))
continue;
CNode* pnode = ConnectNode(addrConnect);
if (!pnode)
continue;
pnode->fNetworkNode = true;
... ...
}
连接到其他节点,并将节点信息保存到vNodes中
SOCKET hSocket;
if (ConnectSocket(addrConnect, hSocket))
{
/// debug print
printf("connected %s\n", addrConnect.ToString().c_str());
// Add node
CNode* pnode = new CNode(hSocket, addrConnect, false);
if (nTimeout != 0)
pnode->AddRef(nTimeout);
else
pnode->AddRef();
CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode);
CRITICAL_BLOCK(cs_mapAddresses)
mapAddresses[addrConnect.GetKey()].nLastFailed = 0;
return pnode;
}
消息处理线程ThreadMessageHandler
遍历vNodes节点,获取pnode节点中的消息,通过ProcessMessages函数处理当前节点收到的消息,通过SendMessages处理当前节点需要发送的消息。消息处理具体流程在后续“比特币源码解读之消息处理”一文中介绍。
vector<CNode*> vNodesCopy;
CRITICAL_BLOCK(cs_vNodes)
vNodesCopy = vNodes;
foreach(CNode* pnode, vNodesCopy)
{
pnode->AddRef();
// Receive messages
TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
ProcessMessages(pnode);
// Send messages
TRY_CRITICAL_BLOCK(pnode->cs_vSend)
SendMessages(pnode);
pnode->Release();
}
上一篇: 比特币源码解读之线程处理-矿工线程
下一篇: 比特币源码解读之私钥、公钥和地址
版权声明:B链网原创,严禁修改。转载请注明作者和原文链接
作者:雨后的蚊子