比特币源码解读之线程处理-其他线程

(本文使用的是比特币v0.1.0版本 点击下载源码)

  比特币源码解读之线程处理分为两篇,矿工线程处理和其他线程处理两篇,本文描述其他线程处理(上篇是《比特币源码解读之线程处理-矿工线程》)。本文主要描述启动节点时创建的四类线程:
(1)通过IRC获取其他比特币服务器节点的地址;
(2)当前节点接受其他比特币服务器的连接请求;
(3)当前节点主动连接其他比特币服务器;
(4)当前节点的消息处理。
流程图如下所示:
比特币源码解读之线程处理-其他线程

获取其他比特币服务器节点地址ThreadIRCSeed

连接IRC服务器

连接IRC服务器,服务器名称为chat.freenode.net


  1. struct hostent* phostent = gethostbyname("chat.freenode.net");
  2. CAddress addrConnect(*(u_long*)phostent->h_addr_list[0], htons(6667));
  3. SOCKET hSocket;
  4. if (!ConnectSocket(addrConnect, hSocket))
  5. {
  6. printf("IRC connect failed\n");
  7. return;
  8. }

备注:IRC(Internet Relay Chat)互联网中继聊天,IRC的最大特点是实现了在线实时交谈,速度快、功能多的优点使它比电子邮件或新闻组等联络沟通方式更具吸引力。IRC可以设置单独的频道,在这个频道内,输出的文字可供所有人都看到。这样,来自世界不同角落的人能同时得到有关信息。而如果是两个人之间的单独交谈,甚至可以不用通过服务器,以保证谈话的保密性。

获取合适的比特币服务器地址并解析


  1. if (pszName[0] == 'u')
  2. {
  3. CAddress addr;
  4. if (DecodeAddress(pszName, addr))
  5. {
  6. CAddrDB addrdb;
  7. if (AddAddress(addrdb, addr))
  8. printf("new ");
  9. addr.print();
  10. }
  11. else
  12. {
  13. printf("decode failed\n");
  14. }
  15. }

将地址保存到mapAddresses向量中


  1. bool AddAddress(CAddrDB& addrdb, const CAddress& addr)
  2. {
  3. if (!addr.IsRoutable())
  4. return false;
  5. if (addr.ip == addrLocalHost.ip)
  6. return false;
  7. CRITICAL_BLOCK(cs_mapAddresses)
  8. {
  9. map<vector<unsigned char>, CAddress>::iterator it = mapAddresses.find(addr.GetKey());
  10. if (it == mapAddresses.end())
  11. {
  12. // New address
  13. mapAddresses.insert(make_pair(addr.GetKey(), addr));
  14. addrdb.WriteAddress(addr);
  15. return true;
  16. }
  17. else
  18. {
  19. CAddress& addrFound = (*it).second;
  20. if ((addrFound.nServices | addr.nServices) != addrFound.nServices)
  21. {
  22. // Services have been added
  23. addrFound.nServices |= addr.nServices;
  24. addrdb.WriteAddress(addrFound);
  25. return true;
  26. }
  27. }
  28. }
  29. return false;
  30. }

接受其他比特币服务器节点连接请求ThreadSocketHandler

断开vNodes中重复连接的节点


  1. map<unsigned int, CNode*> mapFirst;
  2. foreach(CNode* pnode, vNodes)
  3. {
  4. if (pnode->fDisconnect)
  5. continue;
  6. unsigned int ip = pnode->addr.ip;
  7. if (mapFirst.count(ip) && addrLocalHost.ip < ip)
  8. {
  9. // In case two nodes connect to each other at once,
  10. // the lower ip disconnects its outbound connection
  11. CNode* pnodeExtra = mapFirst[ip];
  12. if (pnodeExtra->GetRefCount() > (pnodeExtra->fNetworkNode ? 1 : 0))
  13. swap(pnodeExtra, pnode);
  14. if (pnodeExtra->GetRefCount() <= (pnodeExtra->fNetworkNode ? 1 : 0))
  15. {
  16. printf("(%d nodes) disconnecting duplicate: %s\n", vNodes.size(), pnodeExtra->addr.ToString().c_str());
  17. if (pnodeExtra->fNetworkNode && !pnode->fNetworkNode)
  18. {
  19. pnode->AddRef();
  20. swap(pnodeExtra->fNetworkNode, pnode->fNetworkNode);
  21. pnodeExtra->Release();
  22. }
  23. pnodeExtra->fDisconnect = true;
  24. }
  25. }
  26. mapFirst[ip] = pnode;
  27. }

断开没有使用的节点


  1. vector<CNode*> vNodesCopy = vNodes;
  2. foreach(CNode* pnode, vNodesCopy)
  3. {
  4. if (pnode->ReadyToDisconnect() && pnode->vRecv.empty() && pnode->vSend.empty())
  5. {
  6. // remove from vNodes
  7. vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
  8. pnode->Disconnect();
  9. // hold in disconnected pool until all refs are released
  10. pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 5 * 60);
  11. if (pnode->fNetworkNode)
  12. pnode->Release();
  13. vNodesDisconnected.push_back(pnode);
  14. }
  15. }

清除已经断开的节点


  1. list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
  2. foreach(CNode* pnode, vNodesDisconnectedCopy)
  3. {
  4. // wait until threads are done using it
  5. if (pnode->GetRefCount() <= 0)
  6. {
  7. bool fDelete = false;
  8. TRY_CRITICAL_BLOCK(pnode->cs_vSend)
  9. TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
  10. TRY_CRITICAL_BLOCK(pnode->cs_mapRequests)
  11. TRY_CRITICAL_BLOCK(pnode->cs_inventory)
  12. fDelete = true;
  13. if (fDelete)
  14. {
  15. vNodesDisconnected.remove(pnode);
  16. delete pnode;
  17. }
  18. }
  19. }

接受新的连接并保存到vNodes中


  1. if (FD_ISSET(hListenSocket, &fdsetRecv))
  2. {
  3. struct sockaddr_in sockaddr;
  4. int len = sizeof(sockaddr);
  5. SOCKET hSocket = accept(hListenSocket, (struct sockaddr*)&sockaddr, &len);
  6. CAddress addr(sockaddr);
  7. if (hSocket == INVALID_SOCKET)
  8. {
  9. if (WSAGetLastError() != WSAEWOULDBLOCK)
  10. printf("ERROR ThreadSocketHandler accept failed: %d\n", WSAGetLastError());
  11. }
  12. else
  13. {
  14. printf("accepted connection from %s\n", addr.ToString().c_str());
  15. CNode* pnode = new CNode(hSocket, addr, true);
  16. pnode->AddRef();
  17. CRITICAL_BLOCK(cs_vNodes)
  18. vNodes.push_back(pnode);
  19. }
  20. }

接收信息并保存到pnode->vRecv中


  1. if (FD_ISSET(hSocket, &fdsetRecv))
  2. {
  3. TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
  4. {
  5. CDataStream& vRecv = pnode->vRecv;
  6. unsigned int nPos = vRecv.size();
  7. // typical socket buffer is 8K-64K
  8. const unsigned int nBufSize = 0x10000;
  9. vRecv.resize(nPos + nBufSize);
  10. int nBytes = recv(hSocket, &vRecv[nPos], nBufSize, 0);
  11. vRecv.resize(nPos + max(nBytes, 0));
  12. if (nBytes == 0)
  13. {
  14. // socket closed gracefully
  15. if (!pnode->fDisconnect)
  16. printf("recv: socket closed\n");
  17. pnode->fDisconnect = true;
  18. }
  19. else if (nBytes < 0)
  20. {
  21. // socket error
  22. int nErr = WSAGetLastError();
  23. if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
  24. {
  25. if (!pnode->fDisconnect)
  26. printf("recv failed: %d\n", nErr);
  27. pnode->fDisconnect = true;
  28. }
  29. }
  30. }
  31. }

从pnode->vSend中获取消息并发送出去


  1. if (FD_ISSET(hSocket, &fdsetSend))
  2. {
  3. TRY_CRITICAL_BLOCK(pnode->cs_vSend)
  4. {
  5. CDataStream& vSend = pnode->vSend;
  6. if (!vSend.empty())
  7. {
  8. int nBytes = send(hSocket, &vSend[0], vSend.size(), 0);
  9. if (nBytes > 0)
  10. {
  11. vSend.erase(vSend.begin(), vSend.begin() + nBytes);
  12. }
  13. else if (nBytes == 0)
  14. {
  15. if (pnode->ReadyToDisconnect())
  16. pnode->vSend.clear();
  17. }
  18. else
  19. {
  20. printf("send error %d\n", nBytes);
  21. if (pnode->ReadyToDisconnect())
  22. pnode->vSend.clear();
  23. }
  24. }
  25. }
  26. }

连接其他比特币服务器节点ThreadOpenConnections

遍历地址映射表mapAddresses并保存地址到mapIP中


  1. for (map<vector<unsigned char>, CAddress>::iterator mi = mapAddresses.lower_bound(CAddress(ipC, 0).GetKey());
  2. mi != mapAddresses.upper_bound(CAddress(ipC | ~nIPCMask, 0xffff).GetKey());
  3. ++mi)
  4. {
  5. const CAddress& addr = (*mi).second;
  6. unsigned int nRandomizer = (addr.nLastFailed * addr.ip * 7777U) % 20000;
  7. if (GetTime() - addr.nLastFailed > nDelay * nRandomizer / 10000)
  8. mapIP[addr.ip].push_back(addr);
  9. }

遍历mapIP并与节点建立连接ConnectNode


  1. map<unsigned int, vector<CAddress> >::iterator mi = mapIP.begin();
  2. advance(mi, GetRand(mapIP.size()));
  3. // Once we've chosen an IP, we'll try every given port before moving on
  4. foreach(const CAddress& addrConnect, (*mi).second)
  5. {
  6. if (addrConnect.ip == addrLocalHost.ip || !addrConnect.IsIPv4() || FindNode(addrConnect.ip))
  7. continue;
  8. CNode* pnode = ConnectNode(addrConnect);
  9. if (!pnode)
  10. continue;
  11. pnode->fNetworkNode = true;
  12. ... ...
  13. }

连接到其他节点,并将节点信息保存到vNodes中


  1. SOCKET hSocket;
  2. if (ConnectSocket(addrConnect, hSocket))
  3. {
  4. /// debug print
  5. printf("connected %s\n", addrConnect.ToString().c_str());
  6. // Add node
  7. CNode* pnode = new CNode(hSocket, addrConnect, false);
  8. if (nTimeout != 0)
  9. pnode->AddRef(nTimeout);
  10. else
  11. pnode->AddRef();
  12. CRITICAL_BLOCK(cs_vNodes)
  13. vNodes.push_back(pnode);
  14. CRITICAL_BLOCK(cs_mapAddresses)
  15. mapAddresses[addrConnect.GetKey()].nLastFailed = 0;
  16. return pnode;
  17. }

消息处理线程ThreadMessageHandler

  遍历vNodes节点,获取pnode节点中的消息,通过ProcessMessages函数处理当前节点收到的消息,通过SendMessages处理当前节点需要发送的消息。消息处理具体流程在后续“比特币源码解读之消息处理”一文中介绍。


  1. vector<CNode*> vNodesCopy;
  2. CRITICAL_BLOCK(cs_vNodes)
  3. vNodesCopy = vNodes;
  4. foreach(CNode* pnode, vNodesCopy)
  5. {
  6. pnode->AddRef();
  7. // Receive messages
  8. TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
  9. ProcessMessages(pnode);
  10. // Send messages
  11. TRY_CRITICAL_BLOCK(pnode->cs_vSend)
  12. SendMessages(pnode);
  13. pnode->Release();
  14. }

上一篇: 比特币源码解读之线程处理-矿工线程

下一篇: 比特币源码解读之私钥、公钥和地址

版权声明:B链网原创,严禁修改。转载请注明作者和原文链接

作者:雨后的蚊子

原文链接: http://www.360bchain.com/article/64.html