HBase写流程部分源码学习

put操作:写流程主函数

  public void put(Put put) throws IOException {
    checkReadOnly();  //判断当前是否为read-only--只读状态

    // Do a rough check that we have resources to accept a write.  The check is
    // 'rough' in that between the resource check and the call to obtain a
    // read lock, resources may run out.  For now, the thought is that this
    // will be extremely rare; we'll deal with it when it happens.
    checkResources();  //进行检查,是否有足够资源去进行写操作,并有一个锁的相关操作
    startRegionOperation(Operation.PUT);
    try {
      // All edits for the given row (across all column families) must happen atomically.
      doBatchMutate(put);
    } finally {
      closeRegionOperation(Operation.PUT);
    }
  }

put调用batchMutate,在batchMutate中最后调用doMiniBatchMutation

HBase写流程部分源码学习

doMiniBatchMutation是真正执行写流程的相关具体流程

HBase写流程部分源码学习

第一步:尽可能的多获取锁,确保至少获取一个

HBase写流程部分源码学习

第二步:更新时间戳

HBase写流程部分源码学习

第三步: 构造wal预写日志

HBase写流程部分源码学习

            walEdit = new WALEdit(cellCount, isInReplay); //创建walEdit对象

第四步:先将数据写入到WAL中,但是并不进行同步(应该是和HDFS的同步)

STEP 4. Append the final edit to WAL. Do not sync wal.

第五步:写回到memstore中,

      // STEP 5. Write back to memstore
      // Write to memstore. It is ok to write to memstore
      // first without syncing the WAL because we do not roll
      // forward the memstore MVCC. The MVCC will be moved up when
      // the complete operation is done. These changes are not yet
      // visible to scanners till we update the MVCC. The MVCC is
      // moved only when the sync is complete.

第六步:释放行锁

      // STEP 6. Release row locks, etc.
      if (locked) {
        this.updatesLock.readLock().unlock();
        locked = false;
      }
      releaseRowLocks(acquiredRowLocks);

第七步:同步到wal中

 // -------------------------
      // STEP 7. Sync wal.
      // -------------------------
      if (txid != 0) {
        syncOrDefer(txid, durability);
      }

      doRollBackMemstore = false; // 如果失败会进行回滚--为true时进行回滚
      // calling the post CP hook for batch mutation
      if (!isInReplay && coprocessorHost != null) {
        MiniBatchOperationInProgress<Mutation> miniBatchOp =
          new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
        coprocessorHost.postBatchMutate(miniBatchOp);
      }

第八步:更新mvcc(数据库行锁机制),(如果失败会进行回滚)

 // ------------------------------------------------------------------
      // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
      // ------------------------------------------------------------------
      if (writeEntry != null) {
        mvcc.completeAndWait(writeEntry);
        writeEntry = null;
      } else if (isInReplay) {
        // ensure that the sequence id of the region is at least as big as orig log seq id
        mvcc.advanceTo(mvccNum);
      }

      for (int i = firstIndex; i < lastIndexExclusive; i ++) {
        if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
        }
      }

第九步:执行coprocessor的post方法,put执行postPut,delete执行postDelete

 	  // ------------------------------------
      // STEP 9. Run coprocessor post hooks. This should be done after the wal is
      // synced so that the coprocessor contract is adhered to.
      // ------------------------------------

回滚

      // if the wal sync was unsuccessful, remove keys from memstore //如果没有同步成功,将memstore和wal中的数据全部删除
      if (doRollBackMemstore) {
        for (int j = 0; j < familyMaps.length; j++) {
          for(List<Cell> cells:familyMaps[j].values()) {
            rollbackMemstore(cells);
          }
        }
Put、Delete实质调用的是同一个方法。

如果不是put,就强制转化为Delete

HBase写流程部分源码学习

本文还有很多没有分析到位,更多详情可看源码HRegion.java...