Nacos CP模式下Raft协议的服务注册和数据同步
这篇文章主要讲解了“Nacos CP模式下Raft协议的服务注册和数据同步”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Nacos CP模式下Raft协议的服务注册和数据同步”吧!
Raft协议:
所有节点有三种状态Follower、Candidate、Leader
Leader选举:
所有节点都有一个随机的休眠时间
某节点最先休眠完成,会先给自己一票
之后将投票请求发给其它节点
(如果有节点同时苏醒并发起投票,则重新开始投票)
一开始都是Follower状态,某个节点发起投票前会是Candidate,将投票发给其它节点,如果超过半数节点返回同意,则发起投票节点状态置为Leader
数据同步:
所有的写操作都经过Leader
写操作到Leader时写入节点,此时状态是未提交
之后发送给其它节点,其它节点都返回确认后,Leader将状态置为提交,同时通知其它节点去写数据
Leader会定时向Follower发送心跳包,Follower发现需要更新数据则会主动向Leader拉取数据
Raft和ZAB区别:ZAB所有节点都可以发起投票,之后进行票数的比较,而Raft是休眠后最先苏醒的节点发起投票
Raft演示网站:http://thesecretlivesofdata.com/raft/
上篇博客《Nacos源码分析(注册发现、集群同步、心跳、Eureka对比)》写到添加实例的addInstance方法中调用consistencyService.put方法,这里的consistencyService是通过key中是否存在"ephemeral."匹配的,上篇梳理了AP模式,这里梳理一下CP模式服务注册的逻辑
consistencyService.put(key, instances); @Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl#put:
raftCore.signalPublish(key, value);
这里核心就是调用signalPublish方法:
public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } // 如果当前不是leader节点 if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); // 构造post请求将本次注册转发到leader节点 raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } OPERATE_LOCK.lock(); try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); // 更新注册实例数据到内存和磁盘文件上 onPublish(datum, peers.local()); final String content = json.toString(); // 构建CountDownLatch,值为实例数/2+1 即半数以上 final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); // 遍历包括自己的所有节点 for (final String server : peers.allServersIncludeMyself()) { // 是leader则-1并跳过 if (isLeader(server)) { latch.countDown(); continue; } // 其它节点则异步调用/raft/datum/commit final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode()); return; } // 调用成功,latch-1 latch.countDown(); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable); } @Override public void onCancel() { } }); } // 如果等待时间超过5s则抛异常(但是在之前本地已经修改了注册信息了,抛异常也没用) if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); } }
更新注册实例数据的逻辑:
public void onPublish(Datum datum, RaftPeer source) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } RaftPeer local = peers.local(); if (datum.value == null) { Loggers.RAFT.warn("received empty datum"); throw new IllegalStateException("received empty datum"); } if (!peers.isLeader(source.ip)) { Loggers.RAFT .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader"); } if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local)); throw new IllegalStateException( "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } local.resetLeaderDue(); // if data should be persisted, usually this is true: if (KeyBuilder.matchPersistentKey(datum.key)) { // 实例数据写入磁盘文件 NACOS_HOME/data/naming/data raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term: getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } raftStore.updateTerm(local.term.get()); // 发布ValueChangeEvent事件 NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }
com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier#onEvent接收到事件,执行notify方法
@Override public void onEvent(ValueChangeEvent event) { notify(event.getKey(), event.getAction(), find.apply(event.getKey())); }
最终调用listener.onChange(key, value);去刷新内存注册信息。
以上是CP模式服务注册逻辑,接下来分析服务选举
com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore#init:
@PostConstruct public void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system"); final long start = System.currentTimeMillis(); // 从磁盘中加载配置信息 raftStore.loadDatums(notifier, datums); setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); // 定时线程池中发布主节点选举任务,500ms masterTask = GlobalExecutor.registerMasterElection(new MasterElection()); // 心跳任务,500ms heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat()); versionJudgement.registerObserver(isAllNewVersion -> { stopWork = isAllNewVersion; if (stopWork) { try { shutdown(); raftListener.removeOldRaftMetadata(); } catch (NacosException e) { throw new NacosRuntimeException(NacosException.SERVER_ERROR, e); } } }, 100); NotifyCenter.registerSubscriber(notifier); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); }
主节点选举任务逻辑:
@Override public void run() { try { if (stopWork) { return; } if (!peers.isReady()) { return; } // 选举前休眠,leaderDueMs大于0则直接返回 RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } // reset timeout 重置选举时间和心跳时间 local.resetLeaderDue(); local.resetHeartbeatDue(); // 发送投票 sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } }
发送投票逻辑:
private void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term); peers.reset(); local.term.incrementAndGet(); // 周期+1 local.voteFor = local.ip; // 投票给自己 local.state = RaftPeer.State.CANDIDATE; // 状态改为候选 Map<String, String> params = new HashMap<>(1); params.put("vote", JacksonUtils.toJson(local)); // 遍历除自己外节点 for (final String server : peers.allServersWithoutMySelf()) { // 异步调用/raft/vote final String url = buildUrl(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url); return; } RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer)); // 解析其它节点返回数据,决定leader节点 // 收到其它节点的同意加上自己的同意,超过半数则将自己状态置为leader peers.decideLeader(peer); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("error while sending vote to server: {}", server, throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } }
接收投票请求的处理逻辑:
public synchronized RaftPeer receivedVote(RaftPeer remote) { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } // 获得当前服务 RaftPeer local = peers.get(NetUtils.localServer()); // 如果收到的候选节点term小于等于当前服务的term if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); // 将本地的voteFor置为自己的ip,即自己更适合做leader if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } // 重置时间,将本地voteFor置为收到的节点ip 即本次投票通过 local.resetLeaderDue(); local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; }
接下来分析心跳部分逻辑:
@Override public void run() { try { if (stopWork) { return; } if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.heartbeatDueMs > 0) { return; } local.resetHeartbeatDue(); sendBeat(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while sending beat {}", e); } }
开始和选举逻辑类似,休眠+重置时间,这里主要看sendBeat逻辑:
private void sendBeat() throws IOException, InterruptedException { RaftPeer local = peers.local(); // 如果当前节点状态不是leader则不能发送心跳,直接return if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) { return; } if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size()); } local.resetLeaderDue(); // build data ObjectNode packet = JacksonUtils.createEmptyJsonNode(); packet.replace("peer", JacksonUtils.transferToJsonNode(local)); ArrayNode array = JacksonUtils.createEmptyArrayNode(); if (switchDomain.isSendBeatOnly()) { Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly()); } if (!switchDomain.isSendBeatOnly()) { // 从内存中取出注册信息,将key和时间戳封装为element并放入array for (Datum datum : datums.values()) { ObjectNode element = JacksonUtils.createEmptyJsonNode(); if (KeyBuilder.matchServiceMetaKey(datum.key)) { element.put("key", KeyBuilder.briefServiceMetaKey(datum.key)); } else if (KeyBuilder.matchInstanceListKey(datum.key)) { element.put("key", KeyBuilder.briefInstanceListkey(datum.key)); } element.put("timestamp", datum.timestamp.get()); array.add(element); } } // 封装参数 packet.replace("datums", array); // broadcast Map<String, String> params = new HashMap<String, String>(1); params.put("beat", JacksonUtils.toJson(packet)); String content = JacksonUtils.toJson(params); ByteArrayOutputStream out = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(out); gzip.write(content.getBytes(StandardCharsets.UTF_8)); gzip.close(); byte[] compressedBytes = out.toByteArray(); String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8); if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(), compressedContent.length()); } // 遍历除自己外节点,异步调用/raft/beat发送心跳请求 for (final String server : peers.allServersWithoutMySelf()) { try { final String url = buildUrl(server, API_BEAT); if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("send beat to server " + server); } HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server); MetricsMonitor.getLeaderSendBeatFailedException().increment(); return; } peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class)); if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("receive beat response from: {}", url); } } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, throwable); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } } }
服务端接收心跳请求逻辑:
com.alibaba.nacos.naming.controllers.RaftController#beat
@PostMapping("/beat") public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception { if (versionJudgement.allMemberIsNewVersion()) { throw new IllegalStateException("old raft protocol already stop"); } String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8); String value = URLDecoder.decode(entity, "UTF-8"); value = URLDecoder.decode(value, "UTF-8"); // 参数转换 JsonNode json = JacksonUtils.toObj(value); // 处理心跳逻辑 RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText())); return JacksonUtils.transferToJsonNode(peer); }
处理心跳逻辑:
// 如果收到的心跳不是来自leader则抛异常 if (remote.state != RaftPeer.State.LEADER) { Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JacksonUtils.toJson(remote)); throw new IllegalArgumentException("invalid state from master, state: " + remote.state); } // 如果收到心跳时当前节点不为FOLLOWER,则置为FOLLOWER(即投票中的话就没必要去再投票了) if (local.state != RaftPeer.State.FOLLOWER) { Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote)); // mk follower local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; } // 构建receivedKeysMap,将本地内存的节点数据放入,value为0;即本地的数据 Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size()); for (Map.Entry<String, Datum> entry : datums.entrySet()) { receivedKeysMap.put(entry.getKey(), 0); } // 遍历传入的数据包 for (Object object : beatDatums) { processedCount = processedCount + 1; JsonNode entry = (JsonNode) object; String key = entry.get("key").asText(); final String datumKey; if (KeyBuilder.matchServiceMetaKey(key)) { datumKey = KeyBuilder.detailServiceMetaKey(key); } else if (KeyBuilder.matchInstanceListKey(key)) { datumKey = KeyBuilder.detailInstanceListkey(key); } else { // ignore corrupted key: continue; } long timestamp = entry.get("timestamp").asLong(); // 取出key放入receivedKeysMap中,即1代表leader发过来的数据,下面会进行节点删除 receivedKeysMap.put(datumKey, 1); try { // 如果传入的key内存中已存在且版本号更大,则跳过 if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) { continue; } // 如果内存中不存在且版本号更校则添加入batch中 if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) { batch.add(datumKey); } // 如果batch小于50且处理次数小于本次收到数据的数量则跳过(为了批量处理) if (batch.size() < 50 && processedCount < beatDatums.size()) { continue; } String keys = StringUtils.join(batch, ","); if (batch.size() <= 0) { continue; } // update datum entry // 调用leader地址的/raft/datum方法,传入key拉取数据 String url = buildUrl(remote.ip, API_GET); Map<String, String> queryParam = new HashMap<>(1); queryParam.put("keys", URLEncoder.encode(keys, "UTF-8")); HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { return; } List<JsonNode> datumList = JacksonUtils .toObj(result.getData(), new TypeReference<List<JsonNode>>() { }); // 遍历返回结果,通过raftStore.write写入本地文件 // 且调用 notifier.notify更新内存数据 for (JsonNode datumJson : datumList) { Datum newDatum = null; OPERATE_LOCK.lock(); try { Datum oldDatum = getDatum(datumJson.get("key").asText()); if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp .get()) { continue; } if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) { Datum<Service> serviceDatum = new Datum<>(); serviceDatum.key = datumJson.get("key").asText(); serviceDatum.timestamp.set(datumJson.get("timestamp").asLong()); serviceDatum.value = JacksonUtils .toObj(datumJson.get("value").toString(), Service.class); newDatum = serviceDatum; } if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) { Datum<Instances> instancesDatum = new Datum<>(); instancesDatum.key = datumJson.get("key").asText(); instancesDatum.timestamp.set(datumJson.get("timestamp").asLong()); instancesDatum.value = JacksonUtils .toObj(datumJson.get("value").toString(), Instances.class); newDatum = instancesDatum; } if (newDatum == null || newDatum.value == null) { Loggers.RAFT.error("receive null datum: {}", datumJson); continue; } raftStore.write(newDatum); datums.put(newDatum.key, newDatum); notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value); local.resetLeaderDue(); if (local.term.get() + 100 > remote.term.get()) { getLeader().term.set(remote.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(100); } raftStore.updateTerm(local.term.get()); Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}", newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term); } catch (Throwable e) { Loggers.RAFT .error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e); } finally { OPERATE_LOCK.unlock(); } } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e); } return; } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable); } @Override public void onCancel() { } }); batch.clear(); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey); } } // 遍历receivedKeysMap取出值为0的数据放入deadKeys中 List<String> deadKeys = new ArrayList<>(); for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) { if (entry.getValue() == 0) { deadKeys.add(entry.getKey()); } } // 将内存和文件中被删除的节点移除掉 for (String deadKey : deadKeys) { try { deleteDatum(deadKey); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e); } }
感谢各位的阅读,以上就是“Nacos CP模式下Raft协议的服务注册和数据同步”的内容了,经过本文的学习后,相信大家对Nacos CP模式下Raft协议的服务注册和数据同步这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!