FastDFS源码目录介绍
源码目录包括了common,test,client,stroage,tracker
按文件夹顺序和首字母进行分析:
common文件夹:
common_define.h:
跳过首字母a的文件先介绍这个,是因为这个文件定义了整个系统的一些环境变量,包括bool类型,全局变量等等。下文中你没见过,我也没提的变量或者宏都取自这里。
avltree.c/avltree.h:
对于avl树的定义和实现,这是FastDFS实现trunk功能和单盘恢复功能所依赖的数据结构
-
typedef struct tagAVLTreeNode {
-
void *data;
-
struct tagAVLTreeNode *left;
-
struct tagAVLTreeNode *right;
-
byte balance;
-
} AVLTreeNode;
-
typedef struct tagAVLTreeInfo {
-
AVLTreeNode *root;
-
FreeDataFunc free_data_func;
-
CompareFunc compare_func;
-
} AVLTreeInfo;
经典的数据结构,没有修改的原汁原味。
base64.c/base64.h:
FastDFS得到文件包含的信息后,用base64算法对其编码生成文件ID。
chain.c/chain.hi:
对于链表的实现。
-
typedef struct tagChainNode
-
{
-
void *data;
-
struct tagChainNode *next;
-
} ChainNode;
-
typedef struct
-
{
-
int type;
-
ChainNode *head;
-
ChainNode *tail;
-
FreeDataFunc freeDataFunc;
-
CompareFunc compareFunc;
-
} ChainList;
type变量是定义链表的使用方式的:
CHAINTYPEINSERT: insert new node before head
CHAINTYPEAPPEND: insert new node after tail
CHAINTYPESORTED: sorted chain
在fast_mblock中#include了它,但是并没有使用,直接注释了这个include也成功编译无报错,可能后续会使用吧?这里会和鱼大咨询下。mark。
connectpool.c/connectpool.h:
连接池的定义与实现
-
typedef struct
-
{
-
int sock;
-
int port;
-
char ip_addr[IP_ADDRESS_SIZE];
-
} ConnectionInfo;
-
struct tagConnectionManager;
-
typedef struct tagConnectionNode {
-
ConnectionInfo *conn;
-
struct tagConnectionManager *manager;
-
struct tagConnectionNode *next;
-
time_t atime; //last access time
-
} ConnectionNode;
-
typedef struct tagConnectionManager {
-
ConnectionNode *head;
-
int total_count; //total connections
-
int free_count; //free connections
-
pthread_mutex_t lock;
-
} ConnectionManager;
-
typedef struct tagConnectionPool {
-
HashArray hash_array; //key is ip:port, value is ConnectionManager
-
pthread_mutex_t lock;
-
int connect_timeout;
-
int max_count_per_entry; //0 means no limit
-
/*
-
connections whose the idle time exceeds this time will be closed
-
*/
-
int max_idle_time;
-
} ConnectionPool;
呃,注释已经一目了然了。
三层结构
pool->manager->node
pool使用哈希来定位manager,因为作为key的ip:port是唯一的,而后用链表来管理该节点的所有连接。
fastmblock.c/fastmblock.h:
链表的一个变种,存储有已分配的对象和已经释放的对象,大致相当于一个对象池,在trunk功能中被使用。
-
/* free node chain */
-
struct fast_mblock_node
-
{
-
struct fast_mblock_node *next;
-
char data[0]; //the data buffer
-
};
-
/* malloc chain */
-
struct fast_mblock_malloc
-
{
-
struct fast_mblock_malloc *next;
-
};
-
struct fast_mblock_man
-
{
-
struct fast_mblock_node *free_chain_head; //free node chain
-
struct fast_mblock_malloc *malloc_chain_head; //malloc chain to be freed
-
int element_size; //element size
-
int alloc_elements_once; //alloc elements once
-
pthread_mutex_t lock; //the lock for read / write free node chain
-
};
fasttaskqueue.c/fasttaskqueue.h:
任务队列,挺重要的一个数据结构
-
typedef struct ioevent_entry
-
{
-
int fd;
-
FastTimerEntry timer;
-
IOEventCallback callback;
-
} IOEventEntry;
-
struct nio_thread_data
-
{
-
struct ioevent_puller ev_puller;
-
struct fast_timer timer;
-
int pipe_fds[2];
-
struct fast_task_info *deleted_list; //链向已被删除的任务指针,复用了已经分配的内存
-
};
-
struct fast_task_info
-
{
-
IOEventEntry event;
-
char client_ip[IP_ADDRESS_SIZE];
-
void *arg; //extra argument pointer
-
char *data; //buffer for write or recv
-
int size; //alloc size
-
int length; //data length
-
int offset; //current offset
-
int req_count; //request count
-
TaskFinishCallBack finish_callback; //任务结束回调
-
struct nio_thread_data *thread_data;
-
struct fast_task_info *next;
-
};
-
struct fast_task_queue
-
{
-
struct fast_task_info *head; //头尾指针都存在,分别用来做队列的出队和入队
-
struct fast_task_info *tail;
-
pthread_mutex_t lock;
-
int max_connections;
-
int min_buff_size;
-
int max_buff_size;
-
int arg_size;
-
bool malloc_whole_block;
-
};
fasttimer.c/fasttimer.h:
时间哈希表,以unix时间戳作为key,用双向链表解决冲突,可以根据当前的使用量进行rehash等操作。
在刚才的fasttaskqueue中被使用
-
typedef struct fast_timer_entry {
-
int64_t expires;
-
void *data;
-
struct fast_timer_entry *prev;
-
struct fast_timer_entry *next;
-
bool rehash;
-
} FastTimerEntry;
-
typedef struct fast_timer_slot {
-
struct fast_timer_entry head;
-
} FastTimerSlot;
-
typedef struct fast_timer {
-
int slot_count; //time wheel slot count
-
int64_t base_time; //base time for slot 0
-
int64_t current_time;
-
FastTimerSlot *slots;
-
} FastTimer;
fdfsglobal.c/fdfsglobal.h:
定义了fdfs系统所使用的全局变量,包括超时,版本号等等
-
int g_fdfs_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
-
int g_fdfs_network_timeout = DEFAULT_NETWORK_TIMEOUT;
-
char g_fdfs_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'};
-
Version g_fdfs_version = {5, 1};
-
bool g_use_connection_pool = false;
-
ConnectionPool g_connection_pool;
-
int g_connection_pool_max_idle_time = 3600;
fdfshttpshared.c/fdfshttpshare.h:
FastDFS使用token来防盗链和分享图片,这一段我也不确定。回头再来看。
hash.c/hash.h:
经典的哈希结构,在FastDFS中应用的很广
哈希找到域,而后用链表解决冲突
-
typedef struct tagHashData
-
{
-
int key_len;
-
int value_len;
-
int malloc_value_size;
-
#ifdef HASH_STORE_HASH_CODE
-
unsigned int hash_code;
-
#endif
-
char *value;
-
struct tagHashData *next; //解决冲突
-
char key[0];
-
} HashData;
-
typedef struct tagHashArray
-
{
-
HashData **buckets;
-
HashFunc hash_func;
-
int item_count;
-
unsigned int *capacity;
-
double load_factor; //hash的负载因子,在FastDFS中大于1.0进行rehash
-
int64_t max_bytes; //最大占用字节,用于计算负载因子
-
int64_t bytes_used; //已经使用字节,用于计算负载因子
-
bool is_malloc_capacity;
-
bool is_malloc_value;
-
unsigned int lock_count; //锁总数,为了线程安全
-
pthread_mutex_t *locks;
-
} HashArray;
-
typedef struct tagHashStat //所有hash的统计情况
-
{
-
unsigned int capacity;
-
int item_count;
-
int bucket_used;
-
double bucket_avg_length;
-
int bucket_max_length;
-
} HashStat;
httpfunc.c/httpfunc.h:
http功能已经被砍掉了,这个也回头来看。
inifilereader.c/inifilereader.h:
FastDFS用于初始化加载配置文件的函数。
ioevent.c/ioevent.h && ioeventloop.c/ioeventloop.h:
对epoll,kqueue进行简单封装,成为一个有时间和网络的事件库。这部分逻辑应该会开独立的一章来分析
linuxstacktrace.c/linuxstacktrace.h:
-
/**
-
* This source file is used to print out a stack-trace when your program
-
* segfaults. It is relatively reliable and spot-on accurate.
-
*/
这个模块是在程序段错误后输出栈跟踪信息,呃似乎不是鱼大写的
localipfunc.c/localipfunc.h:
基于系统调用getifaddrs来获取本地IP
logger.c/logger.h:
这个太明显了,log模块
md5.c/md5.h:
fdfshttpshared.c中被调用,在fdfshttpgentoken的方法中对secretkey,file_id,timestamp进行md5得到token
mimefileparser.c/mimefileparser.h:
从配置文件中加载mime识别的配置,至于什么是mime。。我也不知道,我问问大神们看看。
osbits.h:
定义了OS的位数
processctrl.c/processctrl.h:
从配置文件中载入pid路径,定义了pid文件的增删查改,并且提供了进程停止,重启等方法
pthreadfunc.c/pthreadfunc.h:
线程相关的操作,包括初始化,创建,杀死线程
schedthread.c/schedthread.h:
定时任务线程的模块,按照hour:minute的期限执行任务
-
typedef struct tagScheduleEntry
-
{
-
int id; //the task id
-
/* the time base to execute task, such as 00:00, interval is 3600,
-
means execute the task every hour as 1:00, 2:00, 3:00 etc. */
-
TimeInfo time_base;
-
int interval; //the interval for execute task, unit is second
-
TaskFunc task_func; //callback function
-
void *func_args; //arguments pass to callback function
-
/* following are internal fields, do not set manually! */
-
time_t next_call_time;
-
struct tagScheduleEntry *next;
-
} ScheduleEntry;
-
typedef struct
-
{
-
ScheduleEntry *entries;
-
int count;
-
} ScheduleArray;
-
typedef struct
-
{
-
ScheduleArray scheduleArray;
-
ScheduleEntry *head; //schedule chain head
-
ScheduleEntry *tail; //schedule chain tail
-
bool *pcontinue_flag;
-
} ScheduleContext;
稍微看了下实现的算法,这是一个变种的链表,实现了一个变种的队列。
但是所有的数据都存在scheduleArray这个数组里面,每次新任务插入后,会对数组按时间进行一次排序
这样可以保证头指针的是最先需要执行的。
而后每次对head进行出队,初始化next域以后重新从tail入队。
总体来看是非常的简单高效的。
sharedfunc.c/sharedfunc.h:
一些工具函数,比如设置随机种子什么的,没必要单独开个文件,所以放在一起了。
sockopt.c/sockopt.h:
socket的一些工具函数,进行了简单的封装。
tracker文件夹:
先分析tracker是因为tracker只集成了网络部分,而storage还有处理磁盘吞吐的,相对复杂一些
fdfssharefunc.c/fdfssharefunc.h
tracker和storage共用的一些工具函数,比如根据IP和端口获取tracker的ID诸如此类的
fdfs_trackerd.c:
tracker的入口函数
trackerdump.c/trackerdump.h:
实现了fdfsdumptrackerglobalvarstofile这个函数
当tracker收到了SIGUSR1或者SIGUSR2信号,将启动sigDumpHandler来调用这个函数,将tracker当前的状态dump进FastDFS跟目录的logs/tracker_dump.log中
关于如何根据该dump文件恢复的,目前没看到,后面再补充
trackerfunc.c/trackerfunc.h:
实现了trackerloadfromconffile这个函数
将tracker的一些基本必要信息,从conf_file中导出
trackerglobal.c/trackerglobal.h:
记录了tracker使用的一些全局变量
trackerhttpcheck.c/trackerhttpcheck.h:
这个模块会对tracker所管理的所有group的可用storage做检测,测试所有的http端口是否可用
trackermem.c/trackermem.h:
这个模块维护了内存的所有数据,包括集群运行情况等等,提供了save,change和load的接口对集群的总情况进行修改
trackernio.c/trackernio.h:
nio的模块在common/ioevent和common/ioevent_loop的基础上进行调用
trackerproto.c/trackerproto.h:
定义了tracker通信的协议,有时间可以分析下。
trackerrelationship.c/trackerrelationship.h:
定义了tracker之间通信的方式,并且定义了选出leader,ping leader等功能,有时间可以分析下。
trackerservice.c/trackerservice.h:
tracker的逻辑层处理,各个请求在nio后进入work线程,而后分发到各个模块
trackerstatus.c/trackerstatus.h:
tracker状态的save和load模块
tracker_types.h:
定义了tracker所用到的所有类型
storage文件夹:
fdfs_storage.c:storage的入口函数
storagedio.c/storagedio.h:
使用common/fasttaskqueue实现了异步的磁盘IO,新任务由storagedioqueue_push方法入队
同时包含了trunk模块的处理,trunk模块后面再提
storagediskrecovery.c/storagediskrecovery.h:
storage的单盘恢复算法,用于故障恢复
storagedump.c/storagedump.h:
和tracker_dump原理相同
storagefunc.c/storagefunc.h:
storagefuncinit函数对应着tracker的trackerloadfromconffile函数
除此之外,还提供了根据storage_id或者ip判断是否是本机的函数
还提供了一些数据持久化的接口
storageglobal.c/storageglobal.h:
定义了storage使用的全局变量
storageipchangeddealer.c/storageipchangerdealer.h:
storage实现ip地址改变的模块
-
int storage_get_my_tracker_client_ip(); //获取storage作为tracker客户端的ip
-
int storage_changelog_req(); //接入tracker的changelog
-
int storage_check_ip_changed(); //检查ip是否改变
storagenio.c/storagenio.h:
nio的模块在common/ioevent和common/ioevent_loop的基础上进行调用
storageparamgetter.c/storageparamgetter.h:
storagegetparamsfromtracker函数,顾名思义,从tracker获取自身的参数
storageservice.c/storageservice.h:
storage的逻辑层处理,各个请求在nio后进入work线程,而后分发到各个模块
storagesync.c/storagesync.h:
storage的同步模块,众所周知,FastDFS的同步模块是根据时间戳进行的弱一致性同步
trackerclientthread.c/trackerclientthread.h
tracker_report的前缀提示的很明显,这部分是storage作为tracker的客户端,向tracker发送心跳,汇报自己的状态等等
全部接口如下:
-
int tracker_report_init();
-
int tracker_report_destroy();
-
int tracker_report_thread_start();
-
int kill_tracker_report_threads();
-
int tracker_report_join(ConnectionInfo *pTrackerServer, \
-
const int tracker_index, const bool sync_old_done);
-
int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \
-
FDFSStorageBrief *briefServer);
-
int tracker_sync_src_req(ConnectionInfo *pTrackerServer, \
-
StorageBinLogReader *pReader);
-
int tracker_sync_diff_servers(ConnectionInfo *pTrackerServer, \
-
FDFSStorageBrief *briefServers, const int server_count);
-
int tracker_deal_changelog_response(ConnectionInfo *pTrackerServer);
trunk_mgr:
这是storage文件的子目录,实现了trunk功能
trunk功能比较零碎,我目前还没搞明白,比如为什么storage和trunk模块交互,storage是作为client出现的,而不是直接调用trunk。
这部分内容应该要单独开一章来分析。
FastDFS源码解析(2)--------trunk模块分析
trunk功能是把大量小文件合并存储,大量的小文件会大量消耗linux文件系统的node,使树变的过于庞大,降低了读写效率
因此小文件合并存储能显著缓解这一压力
我将对上传和下载流程分析来追踪trunk模块的行为。
在storageservice模块中,storageservice.c/storagedealtask对请求安装cmd进行分离逻辑来处理
在storageuploadfile中处理上传逻辑
-
/**
-
1 byte: store path index
-
8 bytes: file size
-
FDFS_FILE_EXT_NAME_MAX_LEN bytes: file ext name, do not include dot (.)
-
file size bytes: file content
-
**/
-
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
DisconnectCleanFunc clean_func;
-
char *p;
-
char filename[128];
-
char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN + 1];
-
int64_t nInPackLen;
-
int64_t file_offset;
-
int64_t file_bytes;
-
int crc32;
-
int store_path_index;
-
int result;
-
int filename_len;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
-
//对包头大小进行验证
-
if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE +
-
FDFS_FILE_EXT_NAME_MAX_LEN)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"cmd=%d, client ip: %s, package size " \
-
INT64_PRINTF_FORMAT" is not correct, " \
-
"expect length >= %d", __LINE__, \
-
STORAGE_PROTO_CMD_UPLOAD_FILE, \
-
pTask->client_ip, nInPackLen, \
-
1 + FDFS_PROTO_PKG_LEN_SIZE + \
-
FDFS_FILE_EXT_NAME_MAX_LEN);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return EINVAL;
-
}
-
//跳过包头第一段,获得文件路径索引号
-
p = pTask->data + sizeof(TrackerHeader);
-
store_path_index = *p++;
-
if (store_path_index == -1)
-
{
-
if ((result=storage_get_storage_path_index( \
-
&store_path_index)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"get_storage_path_index fail, " \
-
"errno: %d, error info: %s", __LINE__, \
-
result, STRERROR(result));
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
}
-
else if (store_path_index < 0 || store_path_index >= \
-
g_fdfs_store_paths.count)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, store_path_index: %d " \
-
"is invalid", __LINE__, \
-
pTask->client_ip, store_path_index);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return EINVAL;
-
}
-
//获取文件大小
-
file_bytes = buff2long(p);
-
p += FDFS_PROTO_PKG_LEN_SIZE;
-
if (file_bytes < 0 || file_bytes != nInPackLen - \
-
(1 + FDFS_PROTO_PKG_LEN_SIZE + \
-
FDFS_FILE_EXT_NAME_MAX_LEN))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, pkg length is not correct, " \
-
"invalid file bytes: "INT64_PRINTF_FORMAT \
-
", total body length: "INT64_PRINTF_FORMAT, \
-
__LINE__, pTask->client_ip, file_bytes, nInPackLen);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return EINVAL;
-
}
-
//获取文件名
-
memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
-
*(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
-
p += FDFS_FILE_EXT_NAME_MAX_LEN;
-
if ((result=fdfs_validate_filename(file_ext_name)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, file_ext_name: %s " \
-
"is invalid!", __LINE__, \
-
pTask->client_ip, file_ext_name);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
pFileContext->calc_crc32 = true;
-
pFileContext->calc_file_hash = g_check_file_duplicate;
-
pFileContext->extra_info.upload.start_time = g_current_time;
-
strcpy(pFileContext->extra_info.upload.file_ext_name, file_ext_name);
-
storage_format_ext_name(file_ext_name, \
-
pFileContext->extra_info.upload.formatted_ext_name);
-
pFileContext->extra_info.upload.trunk_info.path. \
-
store_path_index = store_path_index;
-
pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR;
-
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
-
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
-
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
-
//如果是追加写文件,注目额外的文件追加命令值
-
if (bAppenderFile)
-
{
-
pFileContext->extra_info.upload.file_type |= \
-
_FILE_TYPE_APPENDER;
-
}
-
else
-
{
-
//判断是否开了trunk_file功能,根据大小检查是否需要trunk合并存储
-
if (g_if_use_trunk_file && trunk_check_size( \
-
TRUNK_CALC_SIZE(file_bytes)))
-
{
-
pFileContext->extra_info.upload.file_type |= \
-
_FILE_TYPE_TRUNK;
-
}
-
}
-
//根据上一步的检查需要开启trunk合并存储
-
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
-
{
-
FDFSTrunkFullInfo *pTrunkInfo;
-
pFileContext->extra_info.upload.if_sub_path_alloced = true;
-
pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info);
-
//为trunk文件名分配空间,并添加到缓存
-
if ((result=trunk_client_trunk_alloc_space( \
-
TRUNK_CALC_SIZE(file_bytes), pTrunkInfo)) != 0)
-
{
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
clean_func = dio_trunk_write_finish_clean_up;
-
file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo));
-
pFileContext->extra_info.upload.if_gen_filename = true;
-
trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \
-
sizeof(pFileContext->filename));
-
//注册trunk操作的回调
-
pFileContext->extra_info.upload.before_open_callback = \
-
dio_check_trunk_file_when_upload;
-
pFileContext->extra_info.upload.before_close_callback = \
-
dio_write_chunk_header;
-
pFileContext->open_flags = O_RDWR | g_extra_open_file_flags;
-
}
-
else
-
{
-
//普通文件的方式,略过
-
...
-
}
-
return storage_write_to_file(pTask, file_offset, file_bytes, \
-
p - pTask->data, dio_write_file, \
-
storage_upload_file_done_callback, \
-
clean_func, store_path_index);
-
}
追踪一下trunkclienttrunkallocspace的实现
-
int trunk_client_trunk_alloc_space(const int file_size, \
-
FDFSTrunkFullInfo *pTrunkInfo)
-
{
-
int result;
-
ConnectionInfo trunk_server;
-
ConnectionInfo *pTrunkServer;
-
//如果自己就是trunker,直接操作
-
if (g_if_trunker_self)
-
{
-
return trunk_alloc_space(file_size, pTrunkInfo);
-
}
-
//否则根据trunk_server的ip和port进行连接
-
if (*(g_trunk_server.ip_addr) == '\0')
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"no trunk server", __LINE__);
-
return EAGAIN;
-
}
-
memcpy(&trunk_server, &g_trunk_server, sizeof(ConnectionInfo));
-
if ((pTrunkServer=tracker_connect_server(&trunk_server, &result)) == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"can't alloc trunk space because connect to trunk " \
-
"server %s:%d fail, errno: %d", __LINE__, \
-
trunk_server.ip_addr, trunk_server.port, result);
-
return result;
-
}
-
//使用client api进行操作
-
result = trunk_client_trunk_do_alloc_space(pTrunkServer, \
-
file_size, pTrunkInfo);
-
tracker_disconnect_server_ex(pTrunkServer, result != 0);
-
return result;
-
}
对直接调用和client_api操作分别追踪
-
nt trunk_alloc_space(const int size, FDFSTrunkFullInfo *pResult)
-
{
-
FDFSTrunkSlot target_slot;
-
FDFSTrunkSlot *pSlot;
-
FDFSTrunkNode *pPreviousNode;
-
FDFSTrunkNode *pTrunkNode;
-
int result;
-
STORAGE_TRUNK_CHECK_STATUS();
-
target_slot.size = (size > g_slot_min_size) ? size : g_slot_min_size;
-
target_slot.head = NULL;
-
pPreviousNode = NULL;
-
pTrunkNode = NULL;
-
//分配trunk需要锁
-
pthread_mutex_lock(&trunk_mem_lock);
-
//寻找可以插入该文件的地方
-
while (1)
-
{
-
pSlot = (FDFSTrunkSlot *)avl_tree_find_ge(tree_info_by_sizes \
-
+ pResult->path.store_path_index, &target_slot);
-
if (pSlot == NULL)
-
{
-
break;
-
}
-
pPreviousNode = NULL;
-
pTrunkNode = pSlot->head;
-
while (pTrunkNode != NULL && \
-
pTrunkNode->trunk.status == FDFS_TRUNK_STATUS_HOLD)
-
{
-
pPreviousNode = pTrunkNode;
-
pTrunkNode = pTrunkNode->next;
-
}
-
if (pTrunkNode != NULL)
-
{
-
break;
-
}
-
target_slot.size = pSlot->size + 1;
-
}
-
//找到了,于是插入
-
if (pTrunkNode != NULL)
-
{
-
if (pPreviousNode == NULL)
-
{
-
pSlot->head = pTrunkNode->next;
-
if (pSlot->head == NULL)
-
{
-
trunk_delete_size_tree_entry(pResult->path. \
-
store_path_index, pSlot);
-
}
-
}
-
else
-
{
-
pPreviousNode->next = pTrunkNode->next;
-
}
-
trunk_free_block_delete(&(pTrunkNode->trunk));
-
}
-
else
-
{
-
//没找到,为他创建一个单独的trunk_file
-
pTrunkNode = trunk_create_trunk_file(pResult->path. \
-
store_path_index, &result);
-
if (pTrunkNode == NULL)
-
{
-
pthread_mutex_unlock(&trunk_mem_lock);
-
return result;
-
}
-
}
-
pthread_mutex_unlock(&trunk_mem_lock);
-
result = trunk_split(pTrunkNode, size);
-
if (result != 0)
-
{
-
return result;
-
}
-
pTrunkNode->trunk.status = FDFS_TRUNK_STATUS_HOLD;
-
result = trunk_add_free_block(pTrunkNode, true);
-
if (result == 0)
-
{
-
memcpy(pResult, &(pTrunkNode->trunk), \
-
sizeof(FDFSTrunkFullInfo));
-
}
-
return result;
-
}
-
static int trunk_client_trunk_do_alloc_space(ConnectionInfo *pTrunkServer, \
-
const int file_size, FDFSTrunkFullInfo *pTrunkInfo)
-
{
-
TrackerHeader *pHeader;
-
//初始化请求包等等数据,略过
-
...
-
pHeader->cmd = STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE;
-
if ((result=tcpsenddata_nb(pTrunkServer->sock, out_buff, \
-
sizeof(out_buff), g_fdfs_network_timeout)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"send data to storage server %s:%d fail, " \
-
"errno: %d, error info: %s", __LINE__, \
-
pTrunkServer->ip_addr, pTrunkServer->port, \
-
result, STRERROR(result));
-
return result;
-
}
-
p = (char *)&trunkBuff;
-
if ((result=fdfs_recv_response(pTrunkServer, \
-
&p, sizeof(FDFSTrunkInfoBuff), &in_bytes)) != 0)
-
{
-
return result;
-
}
-
//设置pTrunckInfo信息,略过
-
...
-
return 0;
-
}
追踪解析STORAGEPROTOCMDTRUNKALLOC_SPACE行为的服务端函数
storageservice.c会将其由storageservertrunkalloc_space函数来解析
-
/**
-
* request package format:
-
* FDFS_GROUP_NAME_MAX_LEN bytes: group_name
-
* 4 bytes: file size
-
* 1 bytes: store_path_index
-
*
-
* response package format:
-
* 1 byte: store_path_index
-
* 1 byte: sub_path_high
-
* 1 byte: sub_path_low
-
* 4 bytes: trunk file id
-
* 4 bytes: trunk offset
-
* 4 bytes: trunk size
-
* **/
-
static int storage_server_trunk_alloc_space(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
FDFSTrunkInfoBuff *pApplyBody;
-
char *in_buff;
-
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
-
FDFSTrunkFullInfo trunkInfo;
-
int64_t nInPackLen;
-
int file_size;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
CHECK_TRUNK_SERVER(pTask)
-
if (nInPackLen != FDFS_GROUP_NAME_MAX_LEN + 5)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"cmd=%d, client ip: %s, package size " \
-
INT64_PRINTF_FORMAT" is not correct, " \
-
"expect length: %d", __LINE__, \
-
STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE, \
-
pTask->client_ip, nInPackLen, \
-
FDFS_GROUP_NAME_MAX_LEN + 5);
-
return EINVAL;
-
}
-
in_buff = pTask->data + sizeof(TrackerHeader);
-
memcpy(group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN);
-
*(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0';
-
if (strcmp(group_name, g_group_name) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip:%s, group_name: %s " \
-
"not correct, should be: %s", \
-
__LINE__, pTask->client_ip, \
-
group_name, g_group_name);
-
return EINVAL;
-
}
-
file_size = buff2int(in_buff + FDFS_GROUP_NAME_MAX_LEN);
-
if (file_size < 0 || !trunk_check_size(file_size))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip:%s, invalid file size: %d", \
-
__LINE__, pTask->client_ip, file_size);
-
return EINVAL;
-
}
-
trunkInfo.path.store_path_index = *(in_buff+FDFS_GROUP_NAME_MAX_LEN+4);
-
//实质还是调用的trunk_alloc_space
-
if ((result=trunk_alloc_space(file_size, &trunkInfo)) != 0)
-
{
-
return result;
-
}
-
pApplyBody = (FDFSTrunkInfoBuff *)(pTask->data+sizeof(TrackerHeader));
-
pApplyBody->store_path_index = trunkInfo.path.store_path_index;
-
pApplyBody->sub_path_high = trunkInfo.path.sub_path_high;
-
pApplyBody->sub_path_low = trunkInfo.path.sub_path_low;
-
int2buff(trunkInfo.file.id, pApplyBody->id);
-
int2buff(trunkInfo.file.offset, pApplyBody->offset);
-
int2buff(trunkInfo.file.size, pApplyBody->size);
-
pClientInfo->total_length = sizeof(TrackerHeader) + \
-
sizeof(FDFSTrunkInfoBuff);
-
return 0;
-
}
trunkclienttrunkallocspace会向同组内唯一的trunk_server申请空间
最终的实现还是trunkallocspace函数
trunk相当于一个KV吧。介个会不会出现单点问题,这台trunk失效以后如何冗余故障,接着往下分析看看
以下这段函数是在trackerclientthread里面的,大致是storage和tracker的一个交互,如果有故障冗余,这里应该存在机制
-
static int tracker_check_response(ConnectionInfo *pTrackerServer, \
-
bool *bServerPortChanged)
-
{
-
int64_t nInPackLen;
-
TrackerHeader resp;
-
int server_count;
-
int result;
-
char in_buff[1 + (2 + FDFS_MAX_SERVERS_EACH_GROUP) * \
-
sizeof(FDFSStorageBrief)];
-
FDFSStorageBrief *pBriefServers;
-
char *pFlags;
-
//解析包
-
...
-
//tracker_leader变化
-
if ((*pFlags) & FDFS_CHANGE_FLAG_TRACKER_LEADER)
-
{
-
...
-
}
-
//trunk_leader变化
-
if ((*pFlags) & FDFS_CHANGE_FLAG_TRUNK_SERVER)
-
{
-
if (server_count < 1)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"tracker server %s:%d, reponse server " \
-
"count: %d < 1", __LINE__, \
-
pTrackerServer->ip_addr, \
-
pTrackerServer->port, server_count);
-
return EINVAL;
-
}
-
//未启动trunk服务,从tracker重新加载
-
if (!g_if_use_trunk_file)
-
{
-
logInfo("file: "__FILE__", line: %d, " \
-
"reload parameters from tracker server", \
-
__LINE__);
-
storage_get_params_from_tracker();
-
}
-
//还未启动trunk服务,报错
-
if (!g_if_use_trunk_file)
-
{
-
logWarning("file: "__FILE__", line: %d, " \
-
"tracker server %s:%d, " \
-
"my g_if_use_trunk_file is false, " \
-
"can't support trunk server!", \
-
__LINE__, pTrackerServer->ip_addr, \
-
pTrackerServer->port);
-
}
-
else
-
{
-
memcpy(g_trunk_server.ip_addr, pBriefServers->ip_addr, \
-
IP_ADDRESS_SIZE - 1);
-
*(g_trunk_server.ip_addr + (IP_ADDRESS_SIZE - 1)) = '\0';
-
g_trunk_server.port = buff2int(pBriefServers->port);
-
//如果本地的ip端口和trunk_server一致
-
if (is_local_host_ip(g_trunk_server.ip_addr) && \
-
g_trunk_server.port == g_server_port)
-
{
-
//我已经是trunk了,tracker重启把我重新选为trunk了
-
if (g_if_trunker_self)
-
{
-
logWarning("file: "__FILE__", line: %d, " \
-
"I am already the trunk server %s:%d, " \
-
"may be the tracker server restart", \
-
__LINE__, g_trunk_server.ip_addr, \
-
g_trunk_server.port);
-
}
-
else
-
{
-
//我成为了新的trunk
-
logInfo("file: "__FILE__", line: %d, " \
-
"I am the the trunk server %s:%d", __LINE__, \
-
g_trunk_server.ip_addr, g_trunk_server.port);
-
tracker_fetch_trunk_fid(pTrackerServer);
-
g_if_trunker_self = true;
-
if ((result=storage_trunk_init()) != 0)
-
{
-
return result;
-
}
-
if (g_trunk_create_file_advance && \
-
g_trunk_create_file_interval > 0)
-
{
-
ScheduleArray scheduleArray;
-
ScheduleEntry entries[1];
-
entries[0].id = TRUNK_FILE_CREATOR_TASK_ID;
-
entries[0].time_base = g_trunk_create_file_time_base;
-
entries[0].interval = g_trunk_create_file_interval;
-
entries[0].task_func = trunk_create_trunk_file_advance;
-
entries[0].func_args = NULL;
-
scheduleArray.count = 1;
-
scheduleArray.entries = entries;
-
sched_add_entries(&scheduleArray);
-
}
-
trunk_sync_thread_start_all();
-
}
-
}
-
else
-
{
-
logInfo("file: "__FILE__", line: %d, " \
-
"the trunk server is %s:%d", __LINE__, \
-
g_trunk_server.ip_addr, g_trunk_server.port);
-
//我以前是trunk,我让权
-
if (g_if_trunker_self)
-
{
-
int saved_trunk_sync_thread_count;
-
logWarning("file: "__FILE__", line: %d, " \
-
"I am the old trunk server, " \
-
"the new trunk server is %s:%d", \
-
__LINE__, g_trunk_server.ip_addr, \
-
g_trunk_server.port);
-
tracker_report_trunk_fid(pTrackerServer);
-
g_if_trunker_self = false;
-
saved_trunk_sync_thread_count = \
-
g_trunk_sync_thread_count;
-
if (saved_trunk_sync_thread_count > 0)
-
{
-
logInfo("file: "__FILE__", line: %d, "\
-
"waiting %d trunk sync " \
-
"threads exit ...", __LINE__, \
-
saved_trunk_sync_thread_count);
-
}
-
while (g_trunk_sync_thread_count > 0)
-
{
-
usleep(50000);
-
}
-
if (saved_trunk_sync_thread_count > 0)
-
{
-
logInfo("file: "__FILE__", line: %d, " \
-
"%d trunk sync threads exited",\
-
__LINE__, \
-
saved_trunk_sync_thread_count);
-
}
-
storage_trunk_destroy_ex(true);
-
if (g_trunk_create_file_advance && \
-
g_trunk_create_file_interval > 0)
-
{
-
sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID);
-
}
-
}
-
}
-
}
-
pBriefServers += 1;
-
server_count -= 1;
-
}
-
if (!((*pFlags) & FDFS_CHANGE_FLAG_GROUP_SERVER))
-
{
-
return 0;
-
}
-
/*
-
//printf("resp server count=%d\n", server_count);
-
{
-
int i;
-
for (i=0; i<server_count; i++)
-
{
-
//printf("%d. %d:%s\n", i+1, pBriefServers[i].status, \
-
pBriefServers[i].ip_addr);
-
}
-
}
-
*/
-
if (*bServerPortChanged)
-
{
-
if (!g_use_storage_id)
-
{
-
FDFSStorageBrief *pStorageEnd;
-
FDFSStorageBrief *pStorage;
-
*bServerPortChanged = false;
-
pStorageEnd = pBriefServers + server_count;
-
for (pStorage=pBriefServers; pStorage<pStorageEnd;
-
pStorage++)
-
{
-
if (strcmp(pStorage->id, g_my_server_id_str) == 0)
-
{
-
continue;
-
}
-
tracker_rename_mark_files(pStorage->ip_addr, \
-
g_last_server_port, pStorage->ip_addr, \
-
g_server_port);
-
}
-
}
-
if (g_server_port != g_last_server_port)
-
{
-
g_last_server_port = g_server_port;
-
if ((result=storage_write_to_sync_ini_file()) != 0)
-
{
-
return result;
-
}
-
}
-
}
-
return tracker_merge_servers(pTrackerServer, \
-
pBriefServers, server_count);
-
}
可以看到,trunk的失败确实是存在冗余机制,由tracker来选出trunk。
trunk的分析暂告一段落,删除文件后是否存在文件空洞,空洞的利用率如何,都得用数据说话才行哈。
总结:
每个组都有唯一的trunk leader,组内所有trunk文件的信息,由这个trunk leader内部组织的avl树来保存。
上传文件后,storage会向trunk leader发起申请空间的请求,这时trunk leader会使用一个全局的锁,获得了trunk存储的位置后,storage在本地写磁盘。
下载文件时,trunk信息在文件名里面已经包含,只需要直接读即可。
使用trunk方式主要是为了解决node过多造成读写性能下降的问题,但是引入trunk方式本身也会造成一定的性能损耗。
目前感觉我对trunk功能还是hold不住,包括如果trunk出错,怎么样恢复trunk文件的数据,因为没有提供的官方的工具,所以不太敢用。
以后如果有需求在跟进,先告一段落了吧。
FastDFS源码解析(3)--------通信协议分析
就上传和下载进行分析,其他暂时略过
上传:
1 根据ip,port连接上tracker
2 发送一个10字节的包,其中第9个字节为TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE,也就是101
3 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
4 接受的这个包,0-8字节是下面要接收的包的大小,通过以下算法可以还原成数字
-
int64_t buff2long(const char *buff)
-
{
-
unsigned char *p;
-
p = (unsigned char *)buff;
-
return (((int64_t)(*p)) << 56) | \
-
(((int64_t)(*(p+1))) << 48) | \
-
(((int64_t)(*(p+2))) << 40) | \
-
(((int64_t)(*(p+3))) << 32) | \
-
(((int64_t)(*(p+4))) << 24) | \
-
(((int64_t)(*(p+5))) << 16) | \
-
(((int64_t)(*(p+6))) << 8) | \
-
((int64_t)(*(p+7)));
-
}
-
void long2buff(int64_t n, char *buff)
-
{
-
unsigned char *p;
-
p = (unsigned char *)buff;
-
*p++ = (n >> 56) & 0xFF;
-
*p++ = (n >> 48) & 0xFF;
-
*p++ = (n >> 40) & 0xFF;
-
*p++ = (n >> 32) & 0xFF;
-
*p++ = (n >> 24) & 0xFF;
-
*p++ = (n >> 16) & 0xFF;
-
*p++ = (n >> 8) & 0xFF;
-
*p++ = n & 0xFF;
-
}
5 读完这个数字对应的字节数目,这个数字应当有TRACKER_QUERY_STORAGE_STORE_BODY_LEN长,否则出错
-
#define TRACKER_QUERY_STORAGE_STORE_BODY_LEN (FDFS_GROUP_NAME_MAX_LEN \
-
+ IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)
也就是16+16-1+8+1 = 40
6 这40个字节,头16字节是组名,接着15字节是IP地址,接着8字节是端口号,还是根据buff2long算法还原成数字,最后1字节是store_path_index
tracker交互完毕,此时进行storage操作
7 根据ip和端口连接storage
8 发送25字节的包
头10字节是TrackerHeader一样的结构,其中1-8字节的内容为filesize+这个包的大小(25)-头的大小(10),也就是file_size+15这个数,通过long2buff,转换的8字节字串,然后其中第9字节的内容是STORAGE_PROTO_CMD_UPLOAD_FILE,也就是11
第11字节是刚才接受的storage_path_index
第12-19字节是file_size,通过long2buff算法转换为8字节字串
19-25字节是ext_name相关,这里设置为0即可
9 发送file_size字节内容,即为文件信息
10 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
11 接受的这个包,0-8字节是下面要接收的包的大小,通过buff2long还原为数字
12 这个数字应该大于FDFS_GROUP_NAME_MAX_LEN,也就是16字节,否则出错
13 头16字节为组名,后面全部的字节为remote_filename
14 上传流程完成
下载:
下载需要上传时rsp返回的文件ID,这里命名为file_id
1 连接tracker
2 切分file_id,第一个/前出现的即为group_name,后面的都是remote_filename
3 发送一个10字节的pHeader,其中1-8字节是FDFS_GROUP_NAME_MAX_LEN(值为16) 加上 remote_filename的长度,通过long2buff转化而成的
第9字节是CMD TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE,即为102
4 发送16字节是group_name
5 发送remote_filename这个字串
6 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
7 接受的这个包,1-8字节是下面要接收的包的大小,通过buff2long可以还原成数字
8 读完这个数字对应的字节数目,这个数字应当有TRACKERQUERYSTORAGEFETCHBODYLEN(TRACKERQUERYSTORAGESTOREBODYLEN - 1,也就是39)长,否则出错
9 这39个字节,头16字节是组名(下载逻辑时可以忽略),接着15字节是IP地址,接着8字节是端口号,还是根据buff2long算法还原成数字
10 和tracker的交互完成,下面是storage
11 根据ip和端口连接storage
12 发送一个pHeader+file_offset+download_bytes+group_name(补全16字节)+filename的数据包
也就是10+8+8+16+filename_size
1-8字节是8+8+16+filename_size的大小根据long2buff转换的字串
9字节是STORAGE_PROTO_CMD_DOWNLOAD_FILE也就是14
11-18字节是file_offset的long2buff字串
19-26是download_bytes的long2buff字串
27-42是group_name
再往后就是finename
13 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
14 接受的这个包,1-8字节是下面要接收的包的大小,通过buff2long可以还原成数字
15 将接收到的包写入文件,一次下载逻辑完毕
上传下载是最经典的逻辑,其他逻辑都可以从这里衍生,不做详细介绍了
FastDFS源码解析(4)--------storage运行流程分析
大致来分析一下fdfs storage是如何提供服务的,以上传文件为例。
从storage的初始化函数来入手
-
int storage_service_init()
-
{
-
int result;
-
int bytes;
-
struct storage_nio_thread_data *pThreadData;
-
struct storage_nio_thread_data *pDataEnd;
-
pthread_t tid;
-
pthread_attr_t thread_attr;
-
//storage任务线程锁
-
if ((result=init_pthread_lock(&g_storage_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//路径索引锁
-
if ((result=init_pthread_lock(&path_index_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//状态计数锁
-
if ((result=init_pthread_lock(&stat_count_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//初始化线程堆栈大小
-
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"init_pthread_attr fail, program exit!", __LINE__);
-
return result;
-
}
-
//建立任务task对象池,复用task类型
-
if ((result=free_queue_init(g_max_connections, g_buff_size, \
-
g_buff_size, sizeof(StorageClientInfo))) != 0)
-
{
-
return result;
-
}
-
bytes = sizeof(struct storage_nio_thread_data) * g_work_threads;
-
g_nio_thread_data = (struct storage_nio_thread_data *)malloc(bytes);
-
if (g_nio_thread_data == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"malloc %d bytes fail, errno: %d, error info: %s", \
-
__LINE__, bytes, errno, STRERROR(errno));
-
return errno != 0 ? errno : ENOMEM;
-
}
-
memset(g_nio_thread_data, 0, bytes);
-
g_storage_thread_count = 0;
-
pDataEnd = g_nio_thread_data + g_work_threads;
-
for (pThreadData=g_nio_thread_data; pThreadData<pDataEnd; pThreadData++)
-
{
-
if (ioevent_init(&pThreadData->thread_data.ev_puller,
-
g_max_connections + 2, 1000, 0) != 0)
-
{
-
result = errno != 0 ? errno : ENOMEM;
-
logError("file: "__FILE__", line: %d, " \
-
"ioevent_init fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
return result;
-
}
-
result = fast_timer_init(&pThreadData->thread_data.timer,
-
2 * g_fdfs_network_timeout, g_current_time);
-
if (result != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"fast_timer_init fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
return result;
-
}
-
if (pipe(pThreadData->thread_data.pipe_fds) != 0)
-
{
-
result = errno != 0 ? errno : EPERM;
-
logError("file: "__FILE__", line: %d, " \
-
"call pipe fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
break;
-
}
-
#if defined(OS_LINUX)
-
if ((result=fd_add_flags(pThreadData->thread_data.pipe_fds[0], \
-
O_NONBLOCK | O_NOATIME)) != 0)
-
{
-
break;
-
}
-
#else
-
if ((result=fd_add_flags(pThreadData->thread_data.pipe_fds[0], \
-
O_NONBLOCK)) != 0)
-
{
-
break;
-
}
-
#endif
-
//创建工作线程
-
if ((result=pthread_create(&tid, &thread_attr, \
-
work_thread_entrance, pThreadData)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"create thread failed, startup threads: %d, " \
-
"errno: %d, error info: %s", \
-
__LINE__, g_storage_thread_count, \
-
result, STRERROR(result));
-
break;
-
}
-
else
-
{
-
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_storage_thread_count++;
-
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
}
-
}
-
pthread_attr_destroy(&thread_attr);
-
last_stat_change_count = g_stat_change_count;
-
//DO NOT support direct IO !!!
-
//g_extra_open_file_flags = g_disk_rw_direct ? O_DIRECT : 0;
-
if (result != 0)
-
{
-
return result;
-
}
-
return result;
-
}
跟进工作线程
-
static void *work_thread_entrance(void* arg)
-
{
-
int result;
-
struct storage_nio_thread_data *pThreadData;
-
pThreadData = (struct storage_nio_thread_data *)arg;
-
if (g_check_file_duplicate)
-
{
-
if ((result=fdht_copy_group_array(&(pThreadData->group_array),\
-
&g_group_array)) != 0)
-
{
-
pthread_mutex_lock(&g_storage_thread_lock);
-
g_storage_thread_count--;
-
pthread_mutex_unlock(&g_storage_thread_lock);
-
return NULL;
-
}
-
}
-
//启动主io主循环,为pThreadData->thread_data对应的pipe_fd注册回调函数
-
//storage_recv_notify_read
-
ioevent_loop(&pThreadData->thread_data, storage_recv_notify_read,
-
task_finish_clean_up, &g_continue_flag);
-
//循环退出,销毁响应数据结构
-
ioevent_destroy(&pThreadData->thread_data.ev_puller);
-
if (g_check_file_duplicate)
-
{
-
if (g_keep_alive)
-
{
-
fdht_disconnect_all_servers(&(pThreadData->group_array));
-
}
-
fdht_free_group_array(&(pThreadData->group_array));
-
}
-
//总线程数目自减
-
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_storage_thread_count--;
-
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
logDebug("file: "__FILE__", line: %d, " \
-
"nio thread exited, thread count: %d", \
-
__LINE__, g_storage_thread_count);
-
return NULL;
-
}
除了workthreadentrance线程,还有一个叫做acceptthreadentrance的线程,专门用来accept请求,防止大量的操作阻塞了accept的性能
-
static void *accept_thread_entrance(void* arg)
-
{
-
int server_sock;
-
int incomesock;
-
struct sockaddr_in inaddr;
-
socklen_t sockaddr_len;
-
in_addr_t client_addr;
-
char szClientIp[IP_ADDRESS_SIZE];
-
long task_addr;
-
struct fast_task_info *pTask;
-
StorageClientInfo *pClientInfo;
-
struct storage_nio_thread_data *pThreadData;
-
server_sock = (long)arg;
-
while (g_continue_flag)
-
{
-
sockaddr_len = sizeof(inaddr);
-
incomesock = accept(server_sock, (struct sockaddr*)&inaddr, \
-
&sockaddr_len);
-
if (incomesock < 0) //error
-
{
-
if (!(errno == EINTR || errno == EAGAIN))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"accept failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, errno, STRERROR(errno));
-
}
-
continue;
-
}
-
client_addr = getPeerIpaddr(incomesock, \
-
szClientIp, IP_ADDRESS_SIZE);
-
if (g_allow_ip_count >= 0)
-
{
-
if (bsearch(&client_addr, g_allow_ip_addrs, \
-
g_allow_ip_count, sizeof(in_addr_t), \
-
cmp_by_ip_addr_t) == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"ip addr %s is not allowed to access", \
-
__LINE__, szClientIp);
-
close(incomesock);
-
continue;
-
}
-
}
-
if (tcpsetnonblockopt(incomesock) != 0)
-
{
-
close(incomesock);
-
continue;
-
}
-
pTask = free_queue_pop();
-
if (pTask == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"malloc task buff failed", \
-
__LINE__);
-
close(incomesock);
-
continue;
-
}
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
//从task对象池里拿出一个task,将fd域填充为incomesock
-
pTask->event.fd = incomesock;
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_INIT;
-
pClientInfo->nio_thread_index = pTask->event.fd % g_work_threads;
-
pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;
-
strcpy(pTask->client_ip, szClientIp);
-
task_addr = (long)pTask;
-
//通过pThreadData->thread_data.pipe_fds[1]将task传给work_thread
-
//work_thread监视着pThreadData->thread_data.pipe_fds[0]
-
//storage_recv_notify_read将被调用
-
if (write(pThreadData->thread_data.pipe_fds[1], &task_addr, \
-
sizeof(task_addr)) != sizeof(task_addr))
-
{
-
close(incomesock);
-
free_queue_push(pTask);
-
logError("file: "__FILE__", line: %d, " \
-
"call write failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, errno, STRERROR(errno));
-
}
-
}
-
return NULL;
-
}
关注一下storagerecvnotify_read函数
-
void storage_recv_notify_read(int sock, short event, void *arg)
-
{
-
struct fast_task_info *pTask;
-
StorageClientInfo *pClientInfo;
-
long task_addr;
-
int64_t remain_bytes;
-
int bytes;
-
int result;
-
while (1)
-
{
-
//读取这个task结构
-
if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)
-
{
-
if (!(errno == EAGAIN || errno == EWOULDBLOCK))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call read failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, errno, STRERROR(errno));
-
}
-
break;
-
}
-
else if (bytes == 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call read failed, end of file", __LINE__);
-
break;
-
}
-
pTask = (struct fast_task_info *)task_addr;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
if (pTask->event.fd < 0) //quit flag
-
{
-
return;
-
}
-
/* //logInfo("=====thread index: %d, pTask->event.fd=%d", \
-
pClientInfo->nio_thread_index, pTask->event.fd);
-
*/
-
if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
-
{
-
pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
-
}
-
switch (pClientInfo->stage)
-
{
-
//初始化阶段,进行数据初始化
-
case FDFS_STORAGE_STAGE_NIO_INIT:
-
result = storage_nio_init(pTask);
-
break;
-
//暂时略过,先看storage_nio_init
-
case FDFS_STORAGE_STAGE_NIO_RECV:
-
pTask->offset = 0;
-
remain_bytes = pClientInfo->total_length - \
-
pClientInfo->total_offset;
-
if (remain_bytes > pTask->size)
-
{
-
pTask->length = pTask->size;
-
}
-
else
-
{
-
pTask->length = remain_bytes;
-
}
-
if (set_recv_event(pTask) == 0)
-
{
-
client_sock_read(pTask->event.fd,
-
IOEVENT_READ, pTask);
-
}
-
result = 0;
-
break;
-
case FDFS_STORAGE_STAGE_NIO_SEND:
-
result = storage_send_add_event(pTask);
-
break;
-
case FDFS_STORAGE_STAGE_NIO_CLOSE:
-
result = EIO; //close this socket
-
break;
-
default:
-
logError("file: "__FILE__", line: %d, " \
-
"invalid stage: %d", __LINE__, \
-
pClientInfo->stage);
-
result = EINVAL;
-
break;
-
}
-
if (result != 0)
-
{
-
add_to_deleted_list(pTask);
-
}
-
}
-
}
初始化实质上是将task对应的fd,注册clientsockread函数同时将task状态设置为FDFSSTORAGESTAGENIORECV
-
static int storage_nio_init(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
struct storage_nio_thread_data *pThreadData;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
-
return ioevent_set(pTask, &pThreadData->thread_data,
-
pTask->event.fd, IOEVENT_READ, client_sock_read,
-
g_fdfs_network_timeout);
-
}
看看这个clientsockread函数
-
static void client_sock_read(int sock, short event, void *arg)
-
{
-
int bytes;
-
int recv_bytes;
-
struct fast_task_info *pTask;
-
StorageClientInfo *pClientInfo;
-
pTask = (struct fast_task_info *)arg;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
if (pClientInfo->canceled)
-
{
-
return;
-
}
-
if (pClientInfo->stage != FDFS_STORAGE_STAGE_NIO_RECV)
-
{
-
if (event & IOEVENT_TIMEOUT) {
-
pTask->event.timer.expires = g_current_time +
-
g_fdfs_network_timeout;
-
fast_timer_add(&pTask->thread_data->timer,
-
&pTask->event.timer);
-
}
-
return;
-
}
-
//超时了,删除这个task
-
if (event & IOEVENT_TIMEOUT)
-
{
-
if (pClientInfo->total_offset == 0 && pTask->req_count > 0)
-
{
-
pTask->event.timer.expires = g_current_time +
-
g_fdfs_network_timeout;
-
fast_timer_add(&pTask->thread_data->timer,
-
&pTask->event.timer);
-
}
-
else
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv timeout, " \
-
"recv offset: %d, expect length: %d", \
-
__LINE__, pTask->client_ip, \
-
pTask->offset, pTask->length);
-
task_finish_clean_up(pTask);
-
}
-
return;
-
}
-
//io错误,一样删
-
if (event & IOEVENT_ERROR)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv error event: %d, "
-
"close connection", __LINE__, pTask->client_ip, event);
-
task_finish_clean_up(pTask);
-
return;
-
}
-
fast_timer_modify(&pTask->thread_data->timer,
-
&pTask->event.timer, g_current_time +
-
g_fdfs_network_timeout);
-
while (1)
-
{
-
//pClientInfo的total_length域为0,说明头还没接收,接收一个头
-
if (pClientInfo->total_length == 0) //recv header
-
{
-
recv_bytes = sizeof(TrackerHeader) - pTask->offset;
-
}
-
else
-
{
-
recv_bytes = pTask->length - pTask->offset;
-
}
-
/*
-
logInfo("total_length="INT64_PRINTF_FORMAT", recv_bytes=%d, "
-
"pTask->length=%d, pTask->offset=%d",
-
pClientInfo->total_length, recv_bytes,
-
pTask->length, pTask->offset);
-
*/
-
bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
-
if (bytes < 0)
-
{
-
if (errno == EAGAIN || errno == EWOULDBLOCK)
-
{
-
}
-
else
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, pTask->client_ip, \
-
errno, STRERROR(errno));
-
task_finish_clean_up(pTask);
-
}
-
return;
-
}
-
else if (bytes == 0)
-
{
-
logDebug("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv failed, " \
-
"connection disconnected.", \
-
__LINE__, pTask->client_ip);
-
task_finish_clean_up(pTask);
-
return;
-
}
-
//用包头数据对pClientInfo进行初始化
-
if (pClientInfo->total_length == 0) //header
-
{
-
if (pTask->offset + bytes < sizeof(TrackerHeader))
-
{
-
pTask->offset += bytes;
-
return;
-
}
-
pClientInfo->total_length=buff2long(((TrackerHeader *) \
-
pTask->data)->pkg_len);
-
if (pClientInfo->total_length < 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, pkg length: " \
-
INT64_PRINTF_FORMAT" < 0", \
-
__LINE__, pTask->client_ip, \
-
pClientInfo->total_length);
-
task_finish_clean_up(pTask);
-
return;
-
}
-
pClientInfo->total_length += sizeof(TrackerHeader);
-
//如果需要接受的数据总长大于pTask的固定长度阀值,那么暂时只接受那么长
-
if (pClientInfo->total_length > pTask->size)
-
{
-
pTask->length = pTask->size;
-
}
-
else
-
{
-
pTask->length = pClientInfo->total_length;
-
}
-
}
-
pTask->offset += bytes;
-
//接受完了当前的包
-
if (pTask->offset >= pTask->length) //recv current pkg done
-
{
-
//略过先看下面
-
if (pClientInfo->total_offset + pTask->length >= \
-
pClientInfo->total_length)
-
{
-
/* current req recv done */
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
-
pTask->req_count++;
-
}
-
//刚接受了包头,那么由storage_deal_task分发任务
-
if (pClientInfo->total_offset == 0)
-
{
-
pClientInfo->total_offset = pTask->length;
-
storage_deal_task(pTask);
-
}
-
else
-
{
-
//略过先看下面
-
pClientInfo->total_offset += pTask->length;
-
/* continue write to file */
-
storage_dio_queue_push(pTask);
-
}
-
return;
-
}
-
}
-
return;
-
}
storagedealtask将上传请求分发给storageuploadfile
storageuploadfile注册一些基本的函数而后调用 storagewriteto_file
-
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
-
{
-
//略过
-
...
-
return storage_write_to_file(pTask, file_offset, file_bytes, \
-
p - pTask->data, dio_write_file, \
-
storage_upload_file_done_callback, \
-
clean_func, store_path_index);
-
}
-
static int storage_write_to_file(struct fast_task_info *pTask, \
-
const int64_t file_offset, const int64_t upload_bytes, \
-
const int buff_offset, TaskDealFunc deal_func, \
-
FileDealDoneCallback done_callback, \
-
DisconnectCleanFunc clean_func, const int store_path_index)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
pClientInfo->deal_func = deal_func;
-
pClientInfo->clean_func = clean_func;
-
pFileContext->fd = -1;
-
pFileContext->buff_offset = buff_offset;
-
pFileContext->offset = file_offset;
-
pFileContext->start = file_offset;
-
pFileContext->end = file_offset + upload_bytes;
-
pFileContext->dio_thread_index = storage_dio_get_thread_index( \
-
pTask, store_path_index, pFileContext->op);
-
pFileContext->done_callback = done_callback;
-
if (pFileContext->calc_crc32)
-
{
-
pFileContext->crc32 = CRC32_XINIT;
-
}
-
if (pFileContext->calc_file_hash)
-
{
-
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
-
{
-
INIT_HASH_CODES4(pFileContext->file_hash_codes)
-
}
-
else
-
{
-
my_md5_init(&pFileContext->md5_context);
-
}
-
}
-
//将任务压入磁盘队列
-
if ((result=storage_dio_queue_push(pTask)) != 0)
-
{
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
return STORAGE_STATUE_DEAL_FILE;
-
}
压入磁盘队列的处理函数
-
int storage_dio_queue_push(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
struct storage_dio_context *pContext;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
pContext = g_dio_contexts + pFileContext->dio_thread_index;
-
//这里为什么要或上这个呢,因为在LT模式的工作下,client_sock_read会被不断的触发
-
//pTask的数据就会被刷掉了,所以改变当前FDFS_STORAGE_STAGE_NIO_RECV的状态,让client_sock_read调用就被返回
-
pClientInfo->stage |= FDFS_STORAGE_STAGE_DIO_THREAD;
-
if ((result=task_queue_push(&(pContext->queue), pTask)) != 0)
-
{
-
add_to_deleted_list(pTask);
-
return result;
-
}
-
if ((result=pthread_cond_signal(&(pContext->cond))) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"pthread_cond_signal fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
add_to_deleted_list(pTask);
-
return result;
-
}
-
return 0;
-
}
下面就是磁盘线程取task了
-
static void *dio_thread_entrance(void* arg)
-
{
-
int result;
-
struct storage_dio_context *pContext;
-
struct fast_task_info *pTask;
-
pContext = (struct storage_dio_context *)arg;
-
pthread_mutex_lock(&(pContext->lock));
-
while (g_continue_flag)
-
{
-
if ((result=pthread_cond_wait(&(pContext->cond), \
-
&(pContext->lock))) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_cond_wait fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
//循环取队列里的任务,执行他的deal_func
-
while ((pTask=task_queue_pop(&(pContext->queue))) != NULL)
-
{
-
((StorageClientInfo *)pTask->arg)->deal_func(pTask);
-
}
-
}
-
pthread_mutex_unlock(&(pContext->lock));
-
if ((result=pthread_mutex_lock(&g_dio_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_dio_thread_count--;
-
if ((result=pthread_mutex_unlock(&g_dio_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
logDebug("file: "__FILE__", line: %d, " \
-
"dio thread exited, thread count: %d", \
-
__LINE__, g_dio_thread_count);
-
return NULL;
-
}
对于上传任务来说,dealtask实际上是dowrite_file
-
int dio_write_file(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
int result;
-
int write_bytes;
-
char *pDataBuff;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
result = 0;
-
do
-
{
-
if (pFileContext->fd < 0)
-
{
-
if (pFileContext->extra_info.upload.before_open_callback!=NULL)
-
{
-
result = pFileContext->extra_info.upload. \
-
before_open_callback(pTask);
-
if (result != 0)
-
{
-
break;
-
}
-
}
-
if ((result=dio_open_file(pFileContext)) != 0)
-
{
-
break;
-
}
-
}
-
pDataBuff = pTask->data + pFileContext->buff_offset;
-
write_bytes = pTask->length - pFileContext->buff_offset;
-
if (write(pFileContext->fd, pDataBuff, write_bytes) != write_bytes)
-
{
-
result = errno != 0 ? errno : EIO;
-
logError("file: "__FILE__", line: %d, " \
-
"write to file: %s fail, fd=%d, write_bytes=%d, " \
-
"errno: %d, error info: %s", \
-
__LINE__, pFileContext->filename, \
-
pFileContext->fd, write_bytes, \
-
result, STRERROR(result));
-
}
-
pthread_mutex_lock(&g_dio_thread_lock);
-
g_storage_stat.total_file_write_count++;
-
if (result == 0)
-
{
-
g_storage_stat.success_file_write_count++;
-
}
-
pthread_mutex_unlock(&g_dio_thread_lock);
-
if (result != 0)
-
{
-
break;
-
}
-
if (pFileContext->calc_crc32)
-
{
-
pFileContext->crc32 = CRC32_ex(pDataBuff, write_bytes, \
-
pFileContext->crc32);
-
}
-
if (pFileContext->calc_file_hash)
-
{
-
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
-
{
-
CALC_HASH_CODES4(pDataBuff, write_bytes, \
-
pFileContext->file_hash_codes)
-
}
-
else
-
{
-
my_md5_update(&pFileContext->md5_context, \
-
(unsigned char *)pDataBuff, write_bytes);
-
}
-
}
-
/*
-
logInfo("###dio write bytes: %d, pTask->length=%d, buff_offset=%d", \
-
write_bytes, pTask->length, pFileContext->buff_offset);
-
*/
-
pFileContext->offset += write_bytes;
-
if (pFileContext->offset < pFileContext->end)
-
{
-
pFileContext->buff_offset = 0;
-
storage_nio_notify(pTask); //notify nio to deal
-
}
-
else
-
{
-
if (pFileContext->calc_crc32)
-
{
-
pFileContext->crc32 = CRC32_FINAL( \
-
pFileContext->crc32);
-
}
-
if (pFileContext->calc_file_hash)
-
{
-
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
-
{
-
FINISH_HASH_CODES4(pFileContext->file_hash_codes)
-
}
-
else
-
{
-
my_md5_final((unsigned char *)(pFileContext-> \
-
file_hash_codes), &pFileContext->md5_context);
-
}
-
}
-
if (pFileContext->extra_info.upload.before_close_callback != NULL)
-
{
-
result = pFileContext->extra_info.upload. \
-
before_close_callback(pTask);
-
}
-
/* file write done, close it */
-
close(pFileContext->fd);
-
pFileContext->fd = -1;
-
if (pFileContext->done_callback != NULL)
-
{
-
pFileContext->done_callback(pTask, result);
-
}
-
}
-
return 0;
-
} while (0);
-
pClientInfo->clean_func(pTask);
-
if (pFileContext->done_callback != NULL)
-
{
-
pFileContext->done_callback(pTask, result);
-
}
-
return result;
-
}
pFileContext->donecallback对应了storageuploadfiledone_callback
-
static void storage_upload_file_done_callback(struct fast_task_info *pTask, \
-
const int err_no)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
TrackerHeader *pHeader;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
-
{
-
result = trunk_client_trunk_alloc_confirm( \
-
&(pFileContext->extra_info.upload.trunk_info), err_no);
-
if (err_no != 0)
-
{
-
result = err_no;
-
}
-
}
-
else
-
{
-
result = err_no;
-
}
-
if (result == 0)
-
{
-
result = storage_service_upload_file_done(pTask);
-
if (result == 0)
-
{
-
if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
-
{
-
result = storage_binlog_write(\
-
pFileContext->timestamp2log, \
-
STORAGE_OP_TYPE_SOURCE_CREATE_FILE, \
-
pFileContext->fname2log);
-
}
-
}
-
}
-
if (result == 0)
-
{
-
int filename_len;
-
char *p;
-
if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
-
{
-
CHECK_AND_WRITE_TO_STAT_FILE3_WITH_BYTES( \
-
g_storage_stat.total_upload_count, \
-
g_storage_stat.success_upload_count, \
-
g_storage_stat.last_source_update, \
-
g_storage_stat.total_upload_bytes, \
-
g_storage_stat.success_upload_bytes, \
-
pFileContext->end - pFileContext->start)
-
}
-
filename_len = strlen(pFileContext->fname2log);
-
pClientInfo->total_length = sizeof(TrackerHeader) + \
-
FDFS_GROUP_NAME_MAX_LEN + filename_len;
-
p = pTask->data + sizeof(TrackerHeader);
-
memcpy(p, pFileContext->extra_info.upload.group_name, \
-
FDFS_GROUP_NAME_MAX_LEN);
-
p += FDFS_GROUP_NAME_MAX_LEN;
-
memcpy(p, pFileContext->fname2log, filename_len);
-
}
-
else
-
{
-
pthread_mutex_lock(&stat_count_thread_lock);
-
if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
-
{
-
g_storage_stat.total_upload_count++;
-
g_storage_stat.total_upload_bytes += \
-
pClientInfo->total_offset;
-
}
-
pthread_mutex_unlock(&stat_count_thread_lock);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
}
-
STORAGE_ACCESS_LOG(pTask, ACCESS_LOG_ACTION_UPLOAD_FILE, result);
-
pClientInfo->total_offset = 0;
-
pTask->length = pClientInfo->total_length;
-
pHeader = (TrackerHeader *)pTask->data;
-
pHeader->status = result;
-
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
-
long2buff(pClientInfo->total_length - sizeof(TrackerHeader), \
-
pHeader->pkg_len);
-
//又看到熟悉的函数了,这完成以后将pTask从磁盘线程压入work线程
-
//work线程调用storage_recv_notify_read函数来做下一步处理
-
storage_nio_notify(pTask);
-
}
-
void storage_recv_notify_read(int sock, short event, void *arg)
-
{
-
//前文已有,略过
-
...
-
//刚从磁盘线程里出来的任务状态依然是dio_thread,去掉dio_thread状态
-
if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
-
{
-
pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
-
}
-
switch (pClientInfo->stage)
-
{
-
//前文已有,略过
-
...
-
case FDFS_STORAGE_STAGE_NIO_RECV:
-
pTask->offset = 0;
-
remain_bytes = pClientInfo->total_length - \
-
pClientInfo->total_offset;
-
if (remain_bytes > pTask->size)
-
{
-
pTask->length = pTask->size;
-
}
-
else
-
{
-
pTask->length = remain_bytes;
-
}
-
if (set_recv_event(pTask) == 0)
-
{
-
client_sock_read(pTask->event.fd,
-
IOEVENT_READ, pTask);
-
}
-
result = 0;
-
break;
-
case FDFS_STORAGE_STAGE_NIO_SEND:
-
result = storage_send_add_event(pTask);
-
break;
-
case FDFS_STORAGE_STAGE_NIO_CLOSE:
-
result = EIO; //close this socket
-
break;
-
default:
-
logError("file: "__FILE__", line: %d, " \
-
"invalid stage: %d", __LINE__, \
-
pClientInfo->stage);
-
result = EINVAL;
-
break;
-
}
-
if (result != 0)
-
{
-
add_to_deleted_list(pTask);
-
}
-
}
调用了clientsockread函数进行处理
-
static void client_sock_read(int sock, short event, void *arg)
-
{
-
//前文已有,略
-
...
-
pTask->offset += bytes;
-
if (pTask->offset >= pTask->length) //recv current pkg done
-
{
-
//这个req接受完毕,准备反馈rsp
-
if (pClientInfo->total_offset + pTask->length >= \
-
pClientInfo->total_length)
-
{
-
/* current req recv done */
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
-
pTask->req_count++;
-
}
-
if (pClientInfo->total_offset == 0)
-
{
-
pClientInfo->total_offset = pTask->length;
-
storage_deal_task(pTask);
-
}
-
else
-
{
-
//接受的是数据包,压入磁盘线程
-
pClientInfo->total_offset += pTask->length;
-
/* continue write to file */
-
storage_dio_queue_push(pTask);
-
}
-
return;
-
}
-
return;
-
}
数据包的网络接收和磁盘的处理成为一个环,接收完一部分,通过队列压入磁盘队列,磁盘线程处理完以后又通过像工作线程的fd进行写,触发网络线程读取这个task。自此源源不断将数据传过来。
总结:
还是上图吧,整个处理流程如下图
1 client发出请求,accept线程catch到描述符,初始化pTask结构,填入描述符,然后将pTask通过管道给work_entrance
2 进入storagerecvnotify_read函数
3 根据当前的pTask->stage等于FDFSSTORAGESTAGEINIT为fd创建读事件,绑定函数clientsock_read
4 调用storageuploadfile
5 storageuploadfile调用storagewriteto_file
6 storagewritetofile调用压磁盘队列函数storagedioqueuepush
7 storagedioqueuepush将pTask->stage |= FDFSSTORAGESTAGEDIO_THREAD
8 根
开始:
源码在sourceforge,github上都能找到。这里我使用的FastDFS v5.01版本,值得注意的是,这个版本干掉了该死了libevent,直接使用epoll,kqueue,可读性提高了不少,而且0依赖了,赞一个。
源码目录包括了common,test,client,stroage,tracker
按文件夹顺序和首字母进行分析:
common文件夹:
common_define.h:
跳过首字母a的文件先介绍这个,是因为这个文件定义了整个系统的一些环境变量,包括bool类型,全局变量等等。下文中你没见过,我也没提的变量或者宏都取自这里。
avltree.c/avltree.h:
对于avl树的定义和实现,这是FastDFS实现trunk功能和单盘恢复功能所依赖的数据结构
-
typedef struct tagAVLTreeNode {
-
void *data;
-
struct tagAVLTreeNode *left;
-
struct tagAVLTreeNode *right;
-
byte balance;
-
} AVLTreeNode;
-
typedef struct tagAVLTreeInfo {
-
AVLTreeNode *root;
-
FreeDataFunc free_data_func;
-
CompareFunc compare_func;
-
} AVLTreeInfo;
经典的数据结构,没有修改的原汁原味。
base64.c/base64.h:
FastDFS得到文件包含的信息后,用base64算法对其编码生成文件ID。
chain.c/chain.hi:
对于链表的实现。
-
typedef struct tagChainNode
-
{
-
void *data;
-
struct tagChainNode *next;
-
} ChainNode;
-
typedef struct
-
{
-
int type;
-
ChainNode *head;
-
ChainNode *tail;
-
FreeDataFunc freeDataFunc;
-
CompareFunc compareFunc;
-
} ChainList;
type变量是定义链表的使用方式的:
CHAINTYPEINSERT: insert new node before head
CHAINTYPEAPPEND: insert new node after tail
CHAINTYPESORTED: sorted chain
在fast_mblock中#include了它,但是并没有使用,直接注释了这个include也成功编译无报错,可能后续会使用吧?这里会和鱼大咨询下。mark。
connectpool.c/connectpool.h:
连接池的定义与实现
-
typedef struct
-
{
-
int sock;
-
int port;
-
char ip_addr[IP_ADDRESS_SIZE];
-
} ConnectionInfo;
-
struct tagConnectionManager;
-
typedef struct tagConnectionNode {
-
ConnectionInfo *conn;
-
struct tagConnectionManager *manager;
-
struct tagConnectionNode *next;
-
time_t atime; //last access time
-
} ConnectionNode;
-
typedef struct tagConnectionManager {
-
ConnectionNode *head;
-
int total_count; //total connections
-
int free_count; //free connections
-
pthread_mutex_t lock;
-
} ConnectionManager;
-
typedef struct tagConnectionPool {
-
HashArray hash_array; //key is ip:port, value is ConnectionManager
-
pthread_mutex_t lock;
-
int connect_timeout;
-
int max_count_per_entry; //0 means no limit
-
/*
-
connections whose the idle time exceeds this time will be closed
-
*/
-
int max_idle_time;
-
} ConnectionPool;
呃,注释已经一目了然了。
三层结构
pool->manager->node
pool使用哈希来定位manager,因为作为key的ip:port是唯一的,而后用链表来管理该节点的所有连接。
fastmblock.c/fastmblock.h:
链表的一个变种,存储有已分配的对象和已经释放的对象,大致相当于一个对象池,在trunk功能中被使用。
-
/* free node chain */
-
struct fast_mblock_node
-
{
-
struct fast_mblock_node *next;
-
char data[0]; //the data buffer
-
};
-
/* malloc chain */
-
struct fast_mblock_malloc
-
{
-
struct fast_mblock_malloc *next;
-
};
-
struct fast_mblock_man
-
{
-
struct fast_mblock_node *free_chain_head; //free node chain
-
struct fast_mblock_malloc *malloc_chain_head; //malloc chain to be freed
-
int element_size; //element size
-
int alloc_elements_once; //alloc elements once
-
pthread_mutex_t lock; //the lock for read / write free node chain
-
};
fasttaskqueue.c/fasttaskqueue.h:
任务队列,挺重要的一个数据结构
-
typedef struct ioevent_entry
-
{
-
int fd;
-
FastTimerEntry timer;
-
IOEventCallback callback;
-
} IOEventEntry;
-
struct nio_thread_data
-
{
-
struct ioevent_puller ev_puller;
-
struct fast_timer timer;
-
int pipe_fds[2];
-
struct fast_task_info *deleted_list; //链向已被删除的任务指针,复用了已经分配的内存
-
};
-
struct fast_task_info
-
{
-
IOEventEntry event;
-
char client_ip[IP_ADDRESS_SIZE];
-
void *arg; //extra argument pointer
-
char *data; //buffer for write or recv
-
int size; //alloc size
-
int length; //data length
-
int offset; //current offset
-
int req_count; //request count
-
TaskFinishCallBack finish_callback; //任务结束回调
-
struct nio_thread_data *thread_data;
-
struct fast_task_info *next;
-
};
-
struct fast_task_queue
-
{
-
struct fast_task_info *head; //头尾指针都存在,分别用来做队列的出队和入队
-
struct fast_task_info *tail;
-
pthread_mutex_t lock;
-
int max_connections;
-
int min_buff_size;
-
int max_buff_size;
-
int arg_size;
-
bool malloc_whole_block;
-
};
fasttimer.c/fasttimer.h:
时间哈希表,以unix时间戳作为key,用双向链表解决冲突,可以根据当前的使用量进行rehash等操作。
在刚才的fasttaskqueue中被使用
-
typedef struct fast_timer_entry {
-
int64_t expires;
-
void *data;
-
struct fast_timer_entry *prev;
-
struct fast_timer_entry *next;
-
bool rehash;
-
} FastTimerEntry;
-
typedef struct fast_timer_slot {
-
struct fast_timer_entry head;
-
} FastTimerSlot;
-
typedef struct fast_timer {
-
int slot_count; //time wheel slot count
-
int64_t base_time; //base time for slot 0
-
int64_t current_time;
-
FastTimerSlot *slots;
-
} FastTimer;
fdfsglobal.c/fdfsglobal.h:
定义了fdfs系统所使用的全局变量,包括超时,版本号等等
-
int g_fdfs_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
-
int g_fdfs_network_timeout = DEFAULT_NETWORK_TIMEOUT;
-
char g_fdfs_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'};
-
Version g_fdfs_version = {5, 1};
-
bool g_use_connection_pool = false;
-
ConnectionPool g_connection_pool;
-
int g_connection_pool_max_idle_time = 3600;
fdfshttpshared.c/fdfshttpshare.h:
FastDFS使用token来防盗链和分享图片,这一段我也不确定。回头再来看。
hash.c/hash.h:
经典的哈希结构,在FastDFS中应用的很广
哈希找到域,而后用链表解决冲突
-
typedef struct tagHashData
-
{
-
int key_len;
-
int value_len;
-
int malloc_value_size;
-
#ifdef HASH_STORE_HASH_CODE
-
unsigned int hash_code;
-
#endif
-
char *value;
-
struct tagHashData *next; //解决冲突
-
char key[0];
-
} HashData;
-
typedef struct tagHashArray
-
{
-
HashData **buckets;
-
HashFunc hash_func;
-
int item_count;
-
unsigned int *capacity;
-
double load_factor; //hash的负载因子,在FastDFS中大于1.0进行rehash
-
int64_t max_bytes; //最大占用字节,用于计算负载因子
-
int64_t bytes_used; //已经使用字节,用于计算负载因子
-
bool is_malloc_capacity;
-
bool is_malloc_value;
-
unsigned int lock_count; //锁总数,为了线程安全
-
pthread_mutex_t *locks;
-
} HashArray;
-
typedef struct tagHashStat //所有hash的统计情况
-
{
-
unsigned int capacity;
-
int item_count;
-
int bucket_used;
-
double bucket_avg_length;
-
int bucket_max_length;
-
} HashStat;
httpfunc.c/httpfunc.h:
http功能已经被砍掉了,这个也回头来看。
inifilereader.c/inifilereader.h:
FastDFS用于初始化加载配置文件的函数。
ioevent.c/ioevent.h && ioeventloop.c/ioeventloop.h:
对epoll,kqueue进行简单封装,成为一个有时间和网络的事件库。这部分逻辑应该会开独立的一章来分析
linuxstacktrace.c/linuxstacktrace.h:
-
/**
-
* This source file is used to print out a stack-trace when your program
-
* segfaults. It is relatively reliable and spot-on accurate.
-
*/
这个模块是在程序段错误后输出栈跟踪信息,呃似乎不是鱼大写的
localipfunc.c/localipfunc.h:
基于系统调用getifaddrs来获取本地IP
logger.c/logger.h:
这个太明显了,log模块
md5.c/md5.h:
fdfshttpshared.c中被调用,在fdfshttpgentoken的方法中对secretkey,file_id,timestamp进行md5得到token
mimefileparser.c/mimefileparser.h:
从配置文件中加载mime识别的配置,至于什么是mime。。我也不知道,我问问大神们看看。
osbits.h:
定义了OS的位数
processctrl.c/processctrl.h:
从配置文件中载入pid路径,定义了pid文件的增删查改,并且提供了进程停止,重启等方法
pthreadfunc.c/pthreadfunc.h:
线程相关的操作,包括初始化,创建,杀死线程
schedthread.c/schedthread.h:
定时任务线程的模块,按照hour:minute的期限执行任务
-
typedef struct tagScheduleEntry
-
{
-
int id; //the task id
-
/* the time base to execute task, such as 00:00, interval is 3600,
-
means execute the task every hour as 1:00, 2:00, 3:00 etc. */
-
TimeInfo time_base;
-
int interval; //the interval for execute task, unit is second
-
TaskFunc task_func; //callback function
-
void *func_args; //arguments pass to callback function
-
/* following are internal fields, do not set manually! */
-
time_t next_call_time;
-
struct tagScheduleEntry *next;
-
} ScheduleEntry;
-
typedef struct
-
{
-
ScheduleEntry *entries;
-
int count;
-
} ScheduleArray;
-
typedef struct
-
{
-
ScheduleArray scheduleArray;
-
ScheduleEntry *head; //schedule chain head
-
ScheduleEntry *tail; //schedule chain tail
-
bool *pcontinue_flag;
-
} ScheduleContext;
稍微看了下实现的算法,这是一个变种的链表,实现了一个变种的队列。
但是所有的数据都存在scheduleArray这个数组里面,每次新任务插入后,会对数组按时间进行一次排序
这样可以保证头指针的是最先需要执行的。
而后每次对head进行出队,初始化next域以后重新从tail入队。
总体来看是非常的简单高效的。
sharedfunc.c/sharedfunc.h:
一些工具函数,比如设置随机种子什么的,没必要单独开个文件,所以放在一起了。
sockopt.c/sockopt.h:
socket的一些工具函数,进行了简单的封装。
tracker文件夹:
先分析tracker是因为tracker只集成了网络部分,而storage还有处理磁盘吞吐的,相对复杂一些
fdfssharefunc.c/fdfssharefunc.h
tracker和storage共用的一些工具函数,比如根据IP和端口获取tracker的ID诸如此类的
fdfs_trackerd.c:
tracker的入口函数
trackerdump.c/trackerdump.h:
实现了fdfsdumptrackerglobalvarstofile这个函数
当tracker收到了SIGUSR1或者SIGUSR2信号,将启动sigDumpHandler来调用这个函数,将tracker当前的状态dump进FastDFS跟目录的logs/tracker_dump.log中
关于如何根据该dump文件恢复的,目前没看到,后面再补充
trackerfunc.c/trackerfunc.h:
实现了trackerloadfromconffile这个函数
将tracker的一些基本必要信息,从conf_file中导出
trackerglobal.c/trackerglobal.h:
记录了tracker使用的一些全局变量
trackerhttpcheck.c/trackerhttpcheck.h:
这个模块会对tracker所管理的所有group的可用storage做检测,测试所有的http端口是否可用
trackermem.c/trackermem.h:
这个模块维护了内存的所有数据,包括集群运行情况等等,提供了save,change和load的接口对集群的总情况进行修改
trackernio.c/trackernio.h:
nio的模块在common/ioevent和common/ioevent_loop的基础上进行调用
trackerproto.c/trackerproto.h:
定义了tracker通信的协议,有时间可以分析下。
trackerrelationship.c/trackerrelationship.h:
定义了tracker之间通信的方式,并且定义了选出leader,ping leader等功能,有时间可以分析下。
trackerservice.c/trackerservice.h:
tracker的逻辑层处理,各个请求在nio后进入work线程,而后分发到各个模块
trackerstatus.c/trackerstatus.h:
tracker状态的save和load模块
tracker_types.h:
定义了tracker所用到的所有类型
storage文件夹:
fdfs_storage.c:storage的入口函数
storagedio.c/storagedio.h:
使用common/fasttaskqueue实现了异步的磁盘IO,新任务由storagedioqueue_push方法入队
同时包含了trunk模块的处理,trunk模块后面再提
storagediskrecovery.c/storagediskrecovery.h:
storage的单盘恢复算法,用于故障恢复
storagedump.c/storagedump.h:
和tracker_dump原理相同
storagefunc.c/storagefunc.h:
storagefuncinit函数对应着tracker的trackerloadfromconffile函数
除此之外,还提供了根据storage_id或者ip判断是否是本机的函数
还提供了一些数据持久化的接口
storageglobal.c/storageglobal.h:
定义了storage使用的全局变量
storageipchangeddealer.c/storageipchangerdealer.h:
storage实现ip地址改变的模块
-
int storage_get_my_tracker_client_ip(); //获取storage作为tracker客户端的ip
-
int storage_changelog_req(); //接入tracker的changelog
-
int storage_check_ip_changed(); //检查ip是否改变
storagenio.c/storagenio.h:
nio的模块在common/ioevent和common/ioevent_loop的基础上进行调用
storageparamgetter.c/storageparamgetter.h:
storagegetparamsfromtracker函数,顾名思义,从tracker获取自身的参数
storageservice.c/storageservice.h:
storage的逻辑层处理,各个请求在nio后进入work线程,而后分发到各个模块
storagesync.c/storagesync.h:
storage的同步模块,众所周知,FastDFS的同步模块是根据时间戳进行的弱一致性同步
trackerclientthread.c/trackerclientthread.h
tracker_report的前缀提示的很明显,这部分是storage作为tracker的客户端,向tracker发送心跳,汇报自己的状态等等
全部接口如下:
-
int tracker_report_init();
-
int tracker_report_destroy();
-
int tracker_report_thread_start();
-
int kill_tracker_report_threads();
-
int tracker_report_join(ConnectionInfo *pTrackerServer, \
-
const int tracker_index, const bool sync_old_done);
-
int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \
-
FDFSStorageBrief *briefServer);
-
int tracker_sync_src_req(ConnectionInfo *pTrackerServer, \
-
StorageBinLogReader *pReader);
-
int tracker_sync_diff_servers(ConnectionInfo *pTrackerServer, \
-
FDFSStorageBrief *briefServers, const int server_count);
-
int tracker_deal_changelog_response(ConnectionInfo *pTrackerServer);
trunk_mgr:
这是storage文件的子目录,实现了trunk功能
trunk功能比较零碎,我目前还没搞明白,比如为什么storage和trunk模块交互,storage是作为client出现的,而不是直接调用trunk。
这部分内容应该要单独开一章来分析。
FastDFS源码解析(2)--------trunk模块分析
trunk功能是把大量小文件合并存储,大量的小文件会大量消耗linux文件系统的node,使树变的过于庞大,降低了读写效率
因此小文件合并存储能显著缓解这一压力
我将对上传和下载流程分析来追踪trunk模块的行为。
在storageservice模块中,storageservice.c/storagedealtask对请求安装cmd进行分离逻辑来处理
在storageuploadfile中处理上传逻辑
-
/**
-
1 byte: store path index
-
8 bytes: file size
-
FDFS_FILE_EXT_NAME_MAX_LEN bytes: file ext name, do not include dot (.)
-
file size bytes: file content
-
**/
-
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
DisconnectCleanFunc clean_func;
-
char *p;
-
char filename[128];
-
char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN + 1];
-
int64_t nInPackLen;
-
int64_t file_offset;
-
int64_t file_bytes;
-
int crc32;
-
int store_path_index;
-
int result;
-
int filename_len;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
-
//对包头大小进行验证
-
if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE +
-
FDFS_FILE_EXT_NAME_MAX_LEN)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"cmd=%d, client ip: %s, package size " \
-
INT64_PRINTF_FORMAT" is not correct, " \
-
"expect length >= %d", __LINE__, \
-
STORAGE_PROTO_CMD_UPLOAD_FILE, \
-
pTask->client_ip, nInPackLen, \
-
1 + FDFS_PROTO_PKG_LEN_SIZE + \
-
FDFS_FILE_EXT_NAME_MAX_LEN);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return EINVAL;
-
}
-
//跳过包头第一段,获得文件路径索引号
-
p = pTask->data + sizeof(TrackerHeader);
-
store_path_index = *p++;
-
if (store_path_index == -1)
-
{
-
if ((result=storage_get_storage_path_index( \
-
&store_path_index)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"get_storage_path_index fail, " \
-
"errno: %d, error info: %s", __LINE__, \
-
result, STRERROR(result));
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
}
-
else if (store_path_index < 0 || store_path_index >= \
-
g_fdfs_store_paths.count)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, store_path_index: %d " \
-
"is invalid", __LINE__, \
-
pTask->client_ip, store_path_index);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return EINVAL;
-
}
-
//获取文件大小
-
file_bytes = buff2long(p);
-
p += FDFS_PROTO_PKG_LEN_SIZE;
-
if (file_bytes < 0 || file_bytes != nInPackLen - \
-
(1 + FDFS_PROTO_PKG_LEN_SIZE + \
-
FDFS_FILE_EXT_NAME_MAX_LEN))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, pkg length is not correct, " \
-
"invalid file bytes: "INT64_PRINTF_FORMAT \
-
", total body length: "INT64_PRINTF_FORMAT, \
-
__LINE__, pTask->client_ip, file_bytes, nInPackLen);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return EINVAL;
-
}
-
//获取文件名
-
memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
-
*(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
-
p += FDFS_FILE_EXT_NAME_MAX_LEN;
-
if ((result=fdfs_validate_filename(file_ext_name)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, file_ext_name: %s " \
-
"is invalid!", __LINE__, \
-
pTask->client_ip, file_ext_name);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
pFileContext->calc_crc32 = true;
-
pFileContext->calc_file_hash = g_check_file_duplicate;
-
pFileContext->extra_info.upload.start_time = g_current_time;
-
strcpy(pFileContext->extra_info.upload.file_ext_name, file_ext_name);
-
storage_format_ext_name(file_ext_name, \
-
pFileContext->extra_info.upload.formatted_ext_name);
-
pFileContext->extra_info.upload.trunk_info.path. \
-
store_path_index = store_path_index;
-
pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR;
-
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
-
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
-
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
-
//如果是追加写文件,注目额外的文件追加命令值
-
if (bAppenderFile)
-
{
-
pFileContext->extra_info.upload.file_type |= \
-
_FILE_TYPE_APPENDER;
-
}
-
else
-
{
-
//判断是否开了trunk_file功能,根据大小检查是否需要trunk合并存储
-
if (g_if_use_trunk_file && trunk_check_size( \
-
TRUNK_CALC_SIZE(file_bytes)))
-
{
-
pFileContext->extra_info.upload.file_type |= \
-
_FILE_TYPE_TRUNK;
-
}
-
}
-
//根据上一步的检查需要开启trunk合并存储
-
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
-
{
-
FDFSTrunkFullInfo *pTrunkInfo;
-
pFileContext->extra_info.upload.if_sub_path_alloced = true;
-
pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info);
-
//为trunk文件名分配空间,并添加到缓存
-
if ((result=trunk_client_trunk_alloc_space( \
-
TRUNK_CALC_SIZE(file_bytes), pTrunkInfo)) != 0)
-
{
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
clean_func = dio_trunk_write_finish_clean_up;
-
file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo));
-
pFileContext->extra_info.upload.if_gen_filename = true;
-
trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \
-
sizeof(pFileContext->filename));
-
//注册trunk操作的回调
-
pFileContext->extra_info.upload.before_open_callback = \
-
dio_check_trunk_file_when_upload;
-
pFileContext->extra_info.upload.before_close_callback = \
-
dio_write_chunk_header;
-
pFileContext->open_flags = O_RDWR | g_extra_open_file_flags;
-
}
-
else
-
{
-
//普通文件的方式,略过
-
...
-
}
-
return storage_write_to_file(pTask, file_offset, file_bytes, \
-
p - pTask->data, dio_write_file, \
-
storage_upload_file_done_callback, \
-
clean_func, store_path_index);
-
}
追踪一下trunkclienttrunkallocspace的实现
-
int trunk_client_trunk_alloc_space(const int file_size, \
-
FDFSTrunkFullInfo *pTrunkInfo)
-
{
-
int result;
-
ConnectionInfo trunk_server;
-
ConnectionInfo *pTrunkServer;
-
//如果自己就是trunker,直接操作
-
if (g_if_trunker_self)
-
{
-
return trunk_alloc_space(file_size, pTrunkInfo);
-
}
-
//否则根据trunk_server的ip和port进行连接
-
if (*(g_trunk_server.ip_addr) == '\0')
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"no trunk server", __LINE__);
-
return EAGAIN;
-
}
-
memcpy(&trunk_server, &g_trunk_server, sizeof(ConnectionInfo));
-
if ((pTrunkServer=tracker_connect_server(&trunk_server, &result)) == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"can't alloc trunk space because connect to trunk " \
-
"server %s:%d fail, errno: %d", __LINE__, \
-
trunk_server.ip_addr, trunk_server.port, result);
-
return result;
-
}
-
//使用client api进行操作
-
result = trunk_client_trunk_do_alloc_space(pTrunkServer, \
-
file_size, pTrunkInfo);
-
tracker_disconnect_server_ex(pTrunkServer, result != 0);
-
return result;
-
}
对直接调用和client_api操作分别追踪
-
nt trunk_alloc_space(const int size, FDFSTrunkFullInfo *pResult)
-
{
-
FDFSTrunkSlot target_slot;
-
FDFSTrunkSlot *pSlot;
-
FDFSTrunkNode *pPreviousNode;
-
FDFSTrunkNode *pTrunkNode;
-
int result;
-
STORAGE_TRUNK_CHECK_STATUS();
-
target_slot.size = (size > g_slot_min_size) ? size : g_slot_min_size;
-
target_slot.head = NULL;
-
pPreviousNode = NULL;
-
pTrunkNode = NULL;
-
//分配trunk需要锁
-
pthread_mutex_lock(&trunk_mem_lock);
-
//寻找可以插入该文件的地方
-
while (1)
-
{
-
pSlot = (FDFSTrunkSlot *)avl_tree_find_ge(tree_info_by_sizes \
-
+ pResult->path.store_path_index, &target_slot);
-
if (pSlot == NULL)
-
{
-
break;
-
}
-
pPreviousNode = NULL;
-
pTrunkNode = pSlot->head;
-
while (pTrunkNode != NULL && \
-
pTrunkNode->trunk.status == FDFS_TRUNK_STATUS_HOLD)
-
{
-
pPreviousNode = pTrunkNode;
-
pTrunkNode = pTrunkNode->next;
-
}
-
if (pTrunkNode != NULL)
-
{
-
break;
-
}
-
target_slot.size = pSlot->size + 1;
-
}
-
//找到了,于是插入
-
if (pTrunkNode != NULL)
-
{
-
if (pPreviousNode == NULL)
-
{
-
pSlot->head = pTrunkNode->next;
-
if (pSlot->head == NULL)
-
{
-
trunk_delete_size_tree_entry(pResult->path. \
-
store_path_index, pSlot);
-
}
-
}
-
else
-
{
-
pPreviousNode->next = pTrunkNode->next;
-
}
-
trunk_free_block_delete(&(pTrunkNode->trunk));
-
}
-
else
-
{
-
//没找到,为他创建一个单独的trunk_file
-
pTrunkNode = trunk_create_trunk_file(pResult->path. \
-
store_path_index, &result);
-
if (pTrunkNode == NULL)
-
{
-
pthread_mutex_unlock(&trunk_mem_lock);
-
return result;
-
}
-
}
-
pthread_mutex_unlock(&trunk_mem_lock);
-
result = trunk_split(pTrunkNode, size);
-
if (result != 0)
-
{
-
return result;
-
}
-
pTrunkNode->trunk.status = FDFS_TRUNK_STATUS_HOLD;
-
result = trunk_add_free_block(pTrunkNode, true);
-
if (result == 0)
-
{
-
memcpy(pResult, &(pTrunkNode->trunk), \
-
sizeof(FDFSTrunkFullInfo));
-
}
-
return result;
-
}
-
static int trunk_client_trunk_do_alloc_space(ConnectionInfo *pTrunkServer, \
-
const int file_size, FDFSTrunkFullInfo *pTrunkInfo)
-
{
-
TrackerHeader *pHeader;
-
//初始化请求包等等数据,略过
-
...
-
pHeader->cmd = STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE;
-
if ((result=tcpsenddata_nb(pTrunkServer->sock, out_buff, \
-
sizeof(out_buff), g_fdfs_network_timeout)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"send data to storage server %s:%d fail, " \
-
"errno: %d, error info: %s", __LINE__, \
-
pTrunkServer->ip_addr, pTrunkServer->port, \
-
result, STRERROR(result));
-
return result;
-
}
-
p = (char *)&trunkBuff;
-
if ((result=fdfs_recv_response(pTrunkServer, \
-
&p, sizeof(FDFSTrunkInfoBuff), &in_bytes)) != 0)
-
{
-
return result;
-
}
-
//设置pTrunckInfo信息,略过
-
...
-
return 0;
-
}
追踪解析STORAGEPROTOCMDTRUNKALLOC_SPACE行为的服务端函数
storageservice.c会将其由storageservertrunkalloc_space函数来解析
-
/**
-
* request package format:
-
* FDFS_GROUP_NAME_MAX_LEN bytes: group_name
-
* 4 bytes: file size
-
* 1 bytes: store_path_index
-
*
-
* response package format:
-
* 1 byte: store_path_index
-
* 1 byte: sub_path_high
-
* 1 byte: sub_path_low
-
* 4 bytes: trunk file id
-
* 4 bytes: trunk offset
-
* 4 bytes: trunk size
-
* **/
-
static int storage_server_trunk_alloc_space(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
FDFSTrunkInfoBuff *pApplyBody;
-
char *in_buff;
-
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
-
FDFSTrunkFullInfo trunkInfo;
-
int64_t nInPackLen;
-
int file_size;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
CHECK_TRUNK_SERVER(pTask)
-
if (nInPackLen != FDFS_GROUP_NAME_MAX_LEN + 5)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"cmd=%d, client ip: %s, package size " \
-
INT64_PRINTF_FORMAT" is not correct, " \
-
"expect length: %d", __LINE__, \
-
STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE, \
-
pTask->client_ip, nInPackLen, \
-
FDFS_GROUP_NAME_MAX_LEN + 5);
-
return EINVAL;
-
}
-
in_buff = pTask->data + sizeof(TrackerHeader);
-
memcpy(group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN);
-
*(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0';
-
if (strcmp(group_name, g_group_name) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip:%s, group_name: %s " \
-
"not correct, should be: %s", \
-
__LINE__, pTask->client_ip, \
-
group_name, g_group_name);
-
return EINVAL;
-
}
-
file_size = buff2int(in_buff + FDFS_GROUP_NAME_MAX_LEN);
-
if (file_size < 0 || !trunk_check_size(file_size))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip:%s, invalid file size: %d", \
-
__LINE__, pTask->client_ip, file_size);
-
return EINVAL;
-
}
-
trunkInfo.path.store_path_index = *(in_buff+FDFS_GROUP_NAME_MAX_LEN+4);
-
//实质还是调用的trunk_alloc_space
-
if ((result=trunk_alloc_space(file_size, &trunkInfo)) != 0)
-
{
-
return result;
-
}
-
pApplyBody = (FDFSTrunkInfoBuff *)(pTask->data+sizeof(TrackerHeader));
-
pApplyBody->store_path_index = trunkInfo.path.store_path_index;
-
pApplyBody->sub_path_high = trunkInfo.path.sub_path_high;
-
pApplyBody->sub_path_low = trunkInfo.path.sub_path_low;
-
int2buff(trunkInfo.file.id, pApplyBody->id);
-
int2buff(trunkInfo.file.offset, pApplyBody->offset);
-
int2buff(trunkInfo.file.size, pApplyBody->size);
-
pClientInfo->total_length = sizeof(TrackerHeader) + \
-
sizeof(FDFSTrunkInfoBuff);
-
return 0;
-
}
trunkclienttrunkallocspace会向同组内唯一的trunk_server申请空间
最终的实现还是trunkallocspace函数
trunk相当于一个KV吧。介个会不会出现单点问题,这台trunk失效以后如何冗余故障,接着往下分析看看
以下这段函数是在trackerclientthread里面的,大致是storage和tracker的一个交互,如果有故障冗余,这里应该存在机制
-
static int tracker_check_response(ConnectionInfo *pTrackerServer, \
-
bool *bServerPortChanged)
-
{
-
int64_t nInPackLen;
-
TrackerHeader resp;
-
int server_count;
-
int result;
-
char in_buff[1 + (2 + FDFS_MAX_SERVERS_EACH_GROUP) * \
-
sizeof(FDFSStorageBrief)];
-
FDFSStorageBrief *pBriefServers;
-
char *pFlags;
-
//解析包
-
...
-
//tracker_leader变化
-
if ((*pFlags) & FDFS_CHANGE_FLAG_TRACKER_LEADER)
-
{
-
...
-
}
-
//trunk_leader变化
-
if ((*pFlags) & FDFS_CHANGE_FLAG_TRUNK_SERVER)
-
{
-
if (server_count < 1)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"tracker server %s:%d, reponse server " \
-
"count: %d < 1", __LINE__, \
-
pTrackerServer->ip_addr, \
-
pTrackerServer->port, server_count);
-
return EINVAL;
-
}
-
//未启动trunk服务,从tracker重新加载
-
if (!g_if_use_trunk_file)
-
{
-
logInfo("file: "__FILE__", line: %d, " \
-
"reload parameters from tracker server", \
-
__LINE__);
-
storage_get_params_from_tracker();
-
}
-
//还未启动trunk服务,报错
-
if (!g_if_use_trunk_file)
-
{
-
logWarning("file: "__FILE__", line: %d, " \
-
"tracker server %s:%d, " \
-
"my g_if_use_trunk_file is false, " \
-
"can't support trunk server!", \
-
__LINE__, pTrackerServer->ip_addr, \
-
pTrackerServer->port);
-
}
-
else
-
{
-
memcpy(g_trunk_server.ip_addr, pBriefServers->ip_addr, \
-
IP_ADDRESS_SIZE - 1);
-
*(g_trunk_server.ip_addr + (IP_ADDRESS_SIZE - 1)) = '\0';
-
g_trunk_server.port = buff2int(pBriefServers->port);
-
//如果本地的ip端口和trunk_server一致
-
if (is_local_host_ip(g_trunk_server.ip_addr) && \
-
g_trunk_server.port == g_server_port)
-
{
-
//我已经是trunk了,tracker重启把我重新选为trunk了
-
if (g_if_trunker_self)
-
{
-
logWarning("file: "__FILE__", line: %d, " \
-
"I am already the trunk server %s:%d, " \
-
"may be the tracker server restart", \
-
__LINE__, g_trunk_server.ip_addr, \
-
g_trunk_server.port);
-
}
-
else
-
{
-
//我成为了新的trunk
-
logInfo("file: "__FILE__", line: %d, " \
-
"I am the the trunk server %s:%d", __LINE__, \
-
g_trunk_server.ip_addr, g_trunk_server.port);
-
tracker_fetch_trunk_fid(pTrackerServer);
-
g_if_trunker_self = true;
-
if ((result=storage_trunk_init()) != 0)
-
{
-
return result;
-
}
-
if (g_trunk_create_file_advance && \
-
g_trunk_create_file_interval > 0)
-
{
-
ScheduleArray scheduleArray;
-
ScheduleEntry entries[1];
-
entries[0].id = TRUNK_FILE_CREATOR_TASK_ID;
-
entries[0].time_base = g_trunk_create_file_time_base;
-
entries[0].interval = g_trunk_create_file_interval;
-
entries[0].task_func = trunk_create_trunk_file_advance;
-
entries[0].func_args = NULL;
-
scheduleArray.count = 1;
-
scheduleArray.entries = entries;
-
sched_add_entries(&scheduleArray);
-
}
-
trunk_sync_thread_start_all();
-
}
-
}
-
else
-
{
-
logInfo("file: "__FILE__", line: %d, " \
-
"the trunk server is %s:%d", __LINE__, \
-
g_trunk_server.ip_addr, g_trunk_server.port);
-
//我以前是trunk,我让权
-
if (g_if_trunker_self)
-
{
-
int saved_trunk_sync_thread_count;
-
logWarning("file: "__FILE__", line: %d, " \
-
"I am the old trunk server, " \
-
"the new trunk server is %s:%d", \
-
__LINE__, g_trunk_server.ip_addr, \
-
g_trunk_server.port);
-
tracker_report_trunk_fid(pTrackerServer);
-
g_if_trunker_self = false;
-
saved_trunk_sync_thread_count = \
-
g_trunk_sync_thread_count;
-
if (saved_trunk_sync_thread_count > 0)
-
{
-
logInfo("file: "__FILE__", line: %d, "\
-
"waiting %d trunk sync " \
-
"threads exit ...", __LINE__, \
-
saved_trunk_sync_thread_count);
-
}
-
while (g_trunk_sync_thread_count > 0)
-
{
-
usleep(50000);
-
}
-
if (saved_trunk_sync_thread_count > 0)
-
{
-
logInfo("file: "__FILE__", line: %d, " \
-
"%d trunk sync threads exited",\
-
__LINE__, \
-
saved_trunk_sync_thread_count);
-
}
-
storage_trunk_destroy_ex(true);
-
if (g_trunk_create_file_advance && \
-
g_trunk_create_file_interval > 0)
-
{
-
sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID);
-
}
-
}
-
}
-
}
-
pBriefServers += 1;
-
server_count -= 1;
-
}
-
if (!((*pFlags) & FDFS_CHANGE_FLAG_GROUP_SERVER))
-
{
-
return 0;
-
}
-
/*
-
//printf("resp server count=%d\n", server_count);
-
{
-
int i;
-
for (i=0; i<server_count; i++)
-
{
-
//printf("%d. %d:%s\n", i+1, pBriefServers[i].status, \
-
pBriefServers[i].ip_addr);
-
}
-
}
-
*/
-
if (*bServerPortChanged)
-
{
-
if (!g_use_storage_id)
-
{
-
FDFSStorageBrief *pStorageEnd;
-
FDFSStorageBrief *pStorage;
-
*bServerPortChanged = false;
-
pStorageEnd = pBriefServers + server_count;
-
for (pStorage=pBriefServers; pStorage<pStorageEnd;
-
pStorage++)
-
{
-
if (strcmp(pStorage->id, g_my_server_id_str) == 0)
-
{
-
continue;
-
}
-
tracker_rename_mark_files(pStorage->ip_addr, \
-
g_last_server_port, pStorage->ip_addr, \
-
g_server_port);
-
}
-
}
-
if (g_server_port != g_last_server_port)
-
{
-
g_last_server_port = g_server_port;
-
if ((result=storage_write_to_sync_ini_file()) != 0)
-
{
-
return result;
-
}
-
}
-
}
-
return tracker_merge_servers(pTrackerServer, \
-
pBriefServers, server_count);
-
}
可以看到,trunk的失败确实是存在冗余机制,由tracker来选出trunk。
trunk的分析暂告一段落,删除文件后是否存在文件空洞,空洞的利用率如何,都得用数据说话才行哈。
总结:
每个组都有唯一的trunk leader,组内所有trunk文件的信息,由这个trunk leader内部组织的avl树来保存。
上传文件后,storage会向trunk leader发起申请空间的请求,这时trunk leader会使用一个全局的锁,获得了trunk存储的位置后,storage在本地写磁盘。
下载文件时,trunk信息在文件名里面已经包含,只需要直接读即可。
使用trunk方式主要是为了解决node过多造成读写性能下降的问题,但是引入trunk方式本身也会造成一定的性能损耗。
目前感觉我对trunk功能还是hold不住,包括如果trunk出错,怎么样恢复trunk文件的数据,因为没有提供的官方的工具,所以不太敢用。
以后如果有需求在跟进,先告一段落了吧。
FastDFS源码解析(3)--------通信协议分析
就上传和下载进行分析,其他暂时略过
上传:
1 根据ip,port连接上tracker
2 发送一个10字节的包,其中第9个字节为TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE,也就是101
3 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
4 接受的这个包,0-8字节是下面要接收的包的大小,通过以下算法可以还原成数字
-
int64_t buff2long(const char *buff)
-
{
-
unsigned char *p;
-
p = (unsigned char *)buff;
-
return (((int64_t)(*p)) << 56) | \
-
(((int64_t)(*(p+1))) << 48) | \
-
(((int64_t)(*(p+2))) << 40) | \
-
(((int64_t)(*(p+3))) << 32) | \
-
(((int64_t)(*(p+4))) << 24) | \
-
(((int64_t)(*(p+5))) << 16) | \
-
(((int64_t)(*(p+6))) << 8) | \
-
((int64_t)(*(p+7)));
-
}
-
void long2buff(int64_t n, char *buff)
-
{
-
unsigned char *p;
-
p = (unsigned char *)buff;
-
*p++ = (n >> 56) & 0xFF;
-
*p++ = (n >> 48) & 0xFF;
-
*p++ = (n >> 40) & 0xFF;
-
*p++ = (n >> 32) & 0xFF;
-
*p++ = (n >> 24) & 0xFF;
-
*p++ = (n >> 16) & 0xFF;
-
*p++ = (n >> 8) & 0xFF;
-
*p++ = n & 0xFF;
-
}
5 读完这个数字对应的字节数目,这个数字应当有TRACKER_QUERY_STORAGE_STORE_BODY_LEN长,否则出错
-
#define TRACKER_QUERY_STORAGE_STORE_BODY_LEN (FDFS_GROUP_NAME_MAX_LEN \
-
+ IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)
也就是16+16-1+8+1 = 40
6 这40个字节,头16字节是组名,接着15字节是IP地址,接着8字节是端口号,还是根据buff2long算法还原成数字,最后1字节是store_path_index
tracker交互完毕,此时进行storage操作
7 根据ip和端口连接storage
8 发送25字节的包
头10字节是TrackerHeader一样的结构,其中1-8字节的内容为filesize+这个包的大小(25)-头的大小(10),也就是file_size+15这个数,通过long2buff,转换的8字节字串,然后其中第9字节的内容是STORAGE_PROTO_CMD_UPLOAD_FILE,也就是11
第11字节是刚才接受的storage_path_index
第12-19字节是file_size,通过long2buff算法转换为8字节字串
19-25字节是ext_name相关,这里设置为0即可
9 发送file_size字节内容,即为文件信息
10 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
11 接受的这个包,0-8字节是下面要接收的包的大小,通过buff2long还原为数字
12 这个数字应该大于FDFS_GROUP_NAME_MAX_LEN,也就是16字节,否则出错
13 头16字节为组名,后面全部的字节为remote_filename
14 上传流程完成
下载:
下载需要上传时rsp返回的文件ID,这里命名为file_id
1 连接tracker
2 切分file_id,第一个/前出现的即为group_name,后面的都是remote_filename
3 发送一个10字节的pHeader,其中1-8字节是FDFS_GROUP_NAME_MAX_LEN(值为16) 加上 remote_filename的长度,通过long2buff转化而成的
第9字节是CMD TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE,即为102
4 发送16字节是group_name
5 发送remote_filename这个字串
6 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
7 接受的这个包,1-8字节是下面要接收的包的大小,通过buff2long可以还原成数字
8 读完这个数字对应的字节数目,这个数字应当有TRACKERQUERYSTORAGEFETCHBODYLEN(TRACKERQUERYSTORAGESTOREBODYLEN - 1,也就是39)长,否则出错
9 这39个字节,头16字节是组名(下载逻辑时可以忽略),接着15字节是IP地址,接着8字节是端口号,还是根据buff2long算法还原成数字
10 和tracker的交互完成,下面是storage
11 根据ip和端口连接storage
12 发送一个pHeader+file_offset+download_bytes+group_name(补全16字节)+filename的数据包
也就是10+8+8+16+filename_size
1-8字节是8+8+16+filename_size的大小根据long2buff转换的字串
9字节是STORAGE_PROTO_CMD_DOWNLOAD_FILE也就是14
11-18字节是file_offset的long2buff字串
19-26是download_bytes的long2buff字串
27-42是group_name
再往后就是finename
13 接受一个10字节的包,其中第10个字节为返回状态,如果是0,说明一切正常
14 接受的这个包,1-8字节是下面要接收的包的大小,通过buff2long可以还原成数字
15 将接收到的包写入文件,一次下载逻辑完毕
上传下载是最经典的逻辑,其他逻辑都可以从这里衍生,不做详细介绍了
FastDFS源码解析(4)--------storage运行流程分析
大致来分析一下fdfs storage是如何提供服务的,以上传文件为例。
从storage的初始化函数来入手
-
int storage_service_init()
-
{
-
int result;
-
int bytes;
-
struct storage_nio_thread_data *pThreadData;
-
struct storage_nio_thread_data *pDataEnd;
-
pthread_t tid;
-
pthread_attr_t thread_attr;
-
//storage任务线程锁
-
if ((result=init_pthread_lock(&g_storage_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//路径索引锁
-
if ((result=init_pthread_lock(&path_index_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//状态计数锁
-
if ((result=init_pthread_lock(&stat_count_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//初始化线程堆栈大小
-
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"init_pthread_attr fail, program exit!", __LINE__);
-
return result;
-
}
-
//建立任务task对象池,复用task类型
-
if ((result=free_queue_init(g_max_connections, g_buff_size, \
-
g_buff_size, sizeof(StorageClientInfo))) != 0)
-
{
-
return result;
-
}
-
bytes = sizeof(struct storage_nio_thread_data) * g_work_threads;
-
g_nio_thread_data = (struct storage_nio_thread_data *)malloc(bytes);
-
if (g_nio_thread_data == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"malloc %d bytes fail, errno: %d, error info: %s", \
-
__LINE__, bytes, errno, STRERROR(errno));
-
return errno != 0 ? errno : ENOMEM;
-
}
-
memset(g_nio_thread_data, 0, bytes);
-
g_storage_thread_count = 0;
-
pDataEnd = g_nio_thread_data + g_work_threads;
-
for (pThreadData=g_nio_thread_data; pThreadData<pDataEnd; pThreadData++)
-
{
-
if (ioevent_init(&pThreadData->thread_data.ev_puller,
-
g_max_connections + 2, 1000, 0) != 0)
-
{
-
result = errno != 0 ? errno : ENOMEM;
-
logError("file: "__FILE__", line: %d, " \
-
"ioevent_init fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
return result;
-
}
-
result = fast_timer_init(&pThreadData->thread_data.timer,
-
2 * g_fdfs_network_timeout, g_current_time);
-
if (result != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"fast_timer_init fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
return result;
-
}
-
if (pipe(pThreadData->thread_data.pipe_fds) != 0)
-
{
-
result = errno != 0 ? errno : EPERM;
-
logError("file: "__FILE__", line: %d, " \
-
"call pipe fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
break;
-
}
-
#if defined(OS_LINUX)
-
if ((result=fd_add_flags(pThreadData->thread_data.pipe_fds[0], \
-
O_NONBLOCK | O_NOATIME)) != 0)
-
{
-
break;
-
}
-
#else
-
if ((result=fd_add_flags(pThreadData->thread_data.pipe_fds[0], \
-
O_NONBLOCK)) != 0)
-
{
-
break;
-
}
-
#endif
-
//创建工作线程
-
if ((result=pthread_create(&tid, &thread_attr, \
-
work_thread_entrance, pThreadData)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"create thread failed, startup threads: %d, " \
-
"errno: %d, error info: %s", \
-
__LINE__, g_storage_thread_count, \
-
result, STRERROR(result));
-
break;
-
}
-
else
-
{
-
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_storage_thread_count++;
-
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
}
-
}
-
pthread_attr_destroy(&thread_attr);
-
last_stat_change_count = g_stat_change_count;
-
//DO NOT support direct IO !!!
-
//g_extra_open_file_flags = g_disk_rw_direct ? O_DIRECT : 0;
-
if (result != 0)
-
{
-
return result;
-
}
-
return result;
-
}
跟进工作线程
-
static void *work_thread_entrance(void* arg)
-
{
-
int result;
-
struct storage_nio_thread_data *pThreadData;
-
pThreadData = (struct storage_nio_thread_data *)arg;
-
if (g_check_file_duplicate)
-
{
-
if ((result=fdht_copy_group_array(&(pThreadData->group_array),\
-
&g_group_array)) != 0)
-
{
-
pthread_mutex_lock(&g_storage_thread_lock);
-
g_storage_thread_count--;
-
pthread_mutex_unlock(&g_storage_thread_lock);
-
return NULL;
-
}
-
}
-
//启动主io主循环,为pThreadData->thread_data对应的pipe_fd注册回调函数
-
//storage_recv_notify_read
-
ioevent_loop(&pThreadData->thread_data, storage_recv_notify_read,
-
task_finish_clean_up, &g_continue_flag);
-
//循环退出,销毁响应数据结构
-
ioevent_destroy(&pThreadData->thread_data.ev_puller);
-
if (g_check_file_duplicate)
-
{
-
if (g_keep_alive)
-
{
-
fdht_disconnect_all_servers(&(pThreadData->group_array));
-
}
-
fdht_free_group_array(&(pThreadData->group_array));
-
}
-
//总线程数目自减
-
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_storage_thread_count--;
-
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
logDebug("file: "__FILE__", line: %d, " \
-
"nio thread exited, thread count: %d", \
-
__LINE__, g_storage_thread_count);
-
return NULL;
-
}
除了workthreadentrance线程,还有一个叫做acceptthreadentrance的线程,专门用来accept请求,防止大量的操作阻塞了accept的性能
-
static void *accept_thread_entrance(void* arg)
-
{
-
int server_sock;
-
int incomesock;
-
struct sockaddr_in inaddr;
-
socklen_t sockaddr_len;
-
in_addr_t client_addr;
-
char szClientIp[IP_ADDRESS_SIZE];
-
long task_addr;
-
struct fast_task_info *pTask;
-
StorageClientInfo *pClientInfo;
-
struct storage_nio_thread_data *pThreadData;
-
server_sock = (long)arg;
-
while (g_continue_flag)
-
{
-
sockaddr_len = sizeof(inaddr);
-
incomesock = accept(server_sock, (struct sockaddr*)&inaddr, \
-
&sockaddr_len);
-
if (incomesock < 0) //error
-
{
-
if (!(errno == EINTR || errno == EAGAIN))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"accept failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, errno, STRERROR(errno));
-
}
-
continue;
-
}
-
client_addr = getPeerIpaddr(incomesock, \
-
szClientIp, IP_ADDRESS_SIZE);
-
if (g_allow_ip_count >= 0)
-
{
-
if (bsearch(&client_addr, g_allow_ip_addrs, \
-
g_allow_ip_count, sizeof(in_addr_t), \
-
cmp_by_ip_addr_t) == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"ip addr %s is not allowed to access", \
-
__LINE__, szClientIp);
-
close(incomesock);
-
continue;
-
}
-
}
-
if (tcpsetnonblockopt(incomesock) != 0)
-
{
-
close(incomesock);
-
continue;
-
}
-
pTask = free_queue_pop();
-
if (pTask == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"malloc task buff failed", \
-
__LINE__);
-
close(incomesock);
-
continue;
-
}
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
//从task对象池里拿出一个task,将fd域填充为incomesock
-
pTask->event.fd = incomesock;
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_INIT;
-
pClientInfo->nio_thread_index = pTask->event.fd % g_work_threads;
-
pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;
-
strcpy(pTask->client_ip, szClientIp);
-
task_addr = (long)pTask;
-
//通过pThreadData->thread_data.pipe_fds[1]将task传给work_thread
-
//work_thread监视着pThreadData->thread_data.pipe_fds[0]
-
//storage_recv_notify_read将被调用
-
if (write(pThreadData->thread_data.pipe_fds[1], &task_addr, \
-
sizeof(task_addr)) != sizeof(task_addr))
-
{
-
close(incomesock);
-
free_queue_push(pTask);
-
logError("file: "__FILE__", line: %d, " \
-
"call write failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, errno, STRERROR(errno));
-
}
-
}
-
return NULL;
-
}
关注一下storagerecvnotify_read函数
-
void storage_recv_notify_read(int sock, short event, void *arg)
-
{
-
struct fast_task_info *pTask;
-
StorageClientInfo *pClientInfo;
-
long task_addr;
-
int64_t remain_bytes;
-
int bytes;
-
int result;
-
while (1)
-
{
-
//读取这个task结构
-
if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)
-
{
-
if (!(errno == EAGAIN || errno == EWOULDBLOCK))
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call read failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, errno, STRERROR(errno));
-
}
-
break;
-
}
-
else if (bytes == 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call read failed, end of file", __LINE__);
-
break;
-
}
-
pTask = (struct fast_task_info *)task_addr;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
if (pTask->event.fd < 0) //quit flag
-
{
-
return;
-
}
-
/* //logInfo("=====thread index: %d, pTask->event.fd=%d", \
-
pClientInfo->nio_thread_index, pTask->event.fd);
-
*/
-
if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
-
{
-
pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
-
}
-
switch (pClientInfo->stage)
-
{
-
//初始化阶段,进行数据初始化
-
case FDFS_STORAGE_STAGE_NIO_INIT:
-
result = storage_nio_init(pTask);
-
break;
-
//暂时略过,先看storage_nio_init
-
case FDFS_STORAGE_STAGE_NIO_RECV:
-
pTask->offset = 0;
-
remain_bytes = pClientInfo->total_length - \
-
pClientInfo->total_offset;
-
if (remain_bytes > pTask->size)
-
{
-
pTask->length = pTask->size;
-
}
-
else
-
{
-
pTask->length = remain_bytes;
-
}
-
if (set_recv_event(pTask) == 0)
-
{
-
client_sock_read(pTask->event.fd,
-
IOEVENT_READ, pTask);
-
}
-
result = 0;
-
break;
-
case FDFS_STORAGE_STAGE_NIO_SEND:
-
result = storage_send_add_event(pTask);
-
break;
-
case FDFS_STORAGE_STAGE_NIO_CLOSE:
-
result = EIO; //close this socket
-
break;
-
default:
-
logError("file: "__FILE__", line: %d, " \
-
"invalid stage: %d", __LINE__, \
-
pClientInfo->stage);
-
result = EINVAL;
-
break;
-
}
-
if (result != 0)
-
{
-
add_to_deleted_list(pTask);
-
}
-
}
-
}
初始化实质上是将task对应的fd,注册clientsockread函数同时将task状态设置为FDFSSTORAGESTAGENIORECV
-
static int storage_nio_init(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
struct storage_nio_thread_data *pThreadData;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
-
return ioevent_set(pTask, &pThreadData->thread_data,
-
pTask->event.fd, IOEVENT_READ, client_sock_read,
-
g_fdfs_network_timeout);
-
}
看看这个clientsockread函数
-
static void client_sock_read(int sock, short event, void *arg)
-
{
-
int bytes;
-
int recv_bytes;
-
struct fast_task_info *pTask;
-
StorageClientInfo *pClientInfo;
-
pTask = (struct fast_task_info *)arg;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
if (pClientInfo->canceled)
-
{
-
return;
-
}
-
if (pClientInfo->stage != FDFS_STORAGE_STAGE_NIO_RECV)
-
{
-
if (event & IOEVENT_TIMEOUT) {
-
pTask->event.timer.expires = g_current_time +
-
g_fdfs_network_timeout;
-
fast_timer_add(&pTask->thread_data->timer,
-
&pTask->event.timer);
-
}
-
return;
-
}
-
//超时了,删除这个task
-
if (event & IOEVENT_TIMEOUT)
-
{
-
if (pClientInfo->total_offset == 0 && pTask->req_count > 0)
-
{
-
pTask->event.timer.expires = g_current_time +
-
g_fdfs_network_timeout;
-
fast_timer_add(&pTask->thread_data->timer,
-
&pTask->event.timer);
-
}
-
else
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv timeout, " \
-
"recv offset: %d, expect length: %d", \
-
__LINE__, pTask->client_ip, \
-
pTask->offset, pTask->length);
-
task_finish_clean_up(pTask);
-
}
-
return;
-
}
-
//io错误,一样删
-
if (event & IOEVENT_ERROR)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv error event: %d, "
-
"close connection", __LINE__, pTask->client_ip, event);
-
task_finish_clean_up(pTask);
-
return;
-
}
-
fast_timer_modify(&pTask->thread_data->timer,
-
&pTask->event.timer, g_current_time +
-
g_fdfs_network_timeout);
-
while (1)
-
{
-
//pClientInfo的total_length域为0,说明头还没接收,接收一个头
-
if (pClientInfo->total_length == 0) //recv header
-
{
-
recv_bytes = sizeof(TrackerHeader) - pTask->offset;
-
}
-
else
-
{
-
recv_bytes = pTask->length - pTask->offset;
-
}
-
/*
-
logInfo("total_length="INT64_PRINTF_FORMAT", recv_bytes=%d, "
-
"pTask->length=%d, pTask->offset=%d",
-
pClientInfo->total_length, recv_bytes,
-
pTask->length, pTask->offset);
-
*/
-
bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
-
if (bytes < 0)
-
{
-
if (errno == EAGAIN || errno == EWOULDBLOCK)
-
{
-
}
-
else
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv failed, " \
-
"errno: %d, error info: %s", \
-
__LINE__, pTask->client_ip, \
-
errno, STRERROR(errno));
-
task_finish_clean_up(pTask);
-
}
-
return;
-
}
-
else if (bytes == 0)
-
{
-
logDebug("file: "__FILE__", line: %d, " \
-
"client ip: %s, recv failed, " \
-
"connection disconnected.", \
-
__LINE__, pTask->client_ip);
-
task_finish_clean_up(pTask);
-
return;
-
}
-
//用包头数据对pClientInfo进行初始化
-
if (pClientInfo->total_length == 0) //header
-
{
-
if (pTask->offset + bytes < sizeof(TrackerHeader))
-
{
-
pTask->offset += bytes;
-
return;
-
}
-
pClientInfo->total_length=buff2long(((TrackerHeader *) \
-
pTask->data)->pkg_len);
-
if (pClientInfo->total_length < 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"client ip: %s, pkg length: " \
-
INT64_PRINTF_FORMAT" < 0", \
-
__LINE__, pTask->client_ip, \
-
pClientInfo->total_length);
-
task_finish_clean_up(pTask);
-
return;
-
}
-
pClientInfo->total_length += sizeof(TrackerHeader);
-
//如果需要接受的数据总长大于pTask的固定长度阀值,那么暂时只接受那么长
-
if (pClientInfo->total_length > pTask->size)
-
{
-
pTask->length = pTask->size;
-
}
-
else
-
{
-
pTask->length = pClientInfo->total_length;
-
}
-
}
-
pTask->offset += bytes;
-
//接受完了当前的包
-
if (pTask->offset >= pTask->length) //recv current pkg done
-
{
-
//略过先看下面
-
if (pClientInfo->total_offset + pTask->length >= \
-
pClientInfo->total_length)
-
{
-
/* current req recv done */
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
-
pTask->req_count++;
-
}
-
//刚接受了包头,那么由storage_deal_task分发任务
-
if (pClientInfo->total_offset == 0)
-
{
-
pClientInfo->total_offset = pTask->length;
-
storage_deal_task(pTask);
-
}
-
else
-
{
-
//略过先看下面
-
pClientInfo->total_offset += pTask->length;
-
/* continue write to file */
-
storage_dio_queue_push(pTask);
-
}
-
return;
-
}
-
}
-
return;
-
}
storagedealtask将上传请求分发给storageuploadfile
storageuploadfile注册一些基本的函数而后调用 storagewriteto_file
-
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
-
{
-
//略过
-
...
-
return storage_write_to_file(pTask, file_offset, file_bytes, \
-
p - pTask->data, dio_write_file, \
-
storage_upload_file_done_callback, \
-
clean_func, store_path_index);
-
}
-
static int storage_write_to_file(struct fast_task_info *pTask, \
-
const int64_t file_offset, const int64_t upload_bytes, \
-
const int buff_offset, TaskDealFunc deal_func, \
-
FileDealDoneCallback done_callback, \
-
DisconnectCleanFunc clean_func, const int store_path_index)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
pClientInfo->deal_func = deal_func;
-
pClientInfo->clean_func = clean_func;
-
pFileContext->fd = -1;
-
pFileContext->buff_offset = buff_offset;
-
pFileContext->offset = file_offset;
-
pFileContext->start = file_offset;
-
pFileContext->end = file_offset + upload_bytes;
-
pFileContext->dio_thread_index = storage_dio_get_thread_index( \
-
pTask, store_path_index, pFileContext->op);
-
pFileContext->done_callback = done_callback;
-
if (pFileContext->calc_crc32)
-
{
-
pFileContext->crc32 = CRC32_XINIT;
-
}
-
if (pFileContext->calc_file_hash)
-
{
-
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
-
{
-
INIT_HASH_CODES4(pFileContext->file_hash_codes)
-
}
-
else
-
{
-
my_md5_init(&pFileContext->md5_context);
-
}
-
}
-
//将任务压入磁盘队列
-
if ((result=storage_dio_queue_push(pTask)) != 0)
-
{
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
return result;
-
}
-
return STORAGE_STATUE_DEAL_FILE;
-
}
压入磁盘队列的处理函数
-
int storage_dio_queue_push(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
struct storage_dio_context *pContext;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
pContext = g_dio_contexts + pFileContext->dio_thread_index;
-
//这里为什么要或上这个呢,因为在LT模式的工作下,client_sock_read会被不断的触发
-
//pTask的数据就会被刷掉了,所以改变当前FDFS_STORAGE_STAGE_NIO_RECV的状态,让client_sock_read调用就被返回
-
pClientInfo->stage |= FDFS_STORAGE_STAGE_DIO_THREAD;
-
if ((result=task_queue_push(&(pContext->queue), pTask)) != 0)
-
{
-
add_to_deleted_list(pTask);
-
return result;
-
}
-
if ((result=pthread_cond_signal(&(pContext->cond))) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"pthread_cond_signal fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
add_to_deleted_list(pTask);
-
return result;
-
}
-
return 0;
-
}
下面就是磁盘线程取task了
-
static void *dio_thread_entrance(void* arg)
-
{
-
int result;
-
struct storage_dio_context *pContext;
-
struct fast_task_info *pTask;
-
pContext = (struct storage_dio_context *)arg;
-
pthread_mutex_lock(&(pContext->lock));
-
while (g_continue_flag)
-
{
-
if ((result=pthread_cond_wait(&(pContext->cond), \
-
&(pContext->lock))) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_cond_wait fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
//循环取队列里的任务,执行他的deal_func
-
while ((pTask=task_queue_pop(&(pContext->queue))) != NULL)
-
{
-
((StorageClientInfo *)pTask->arg)->deal_func(pTask);
-
}
-
}
-
pthread_mutex_unlock(&(pContext->lock));
-
if ((result=pthread_mutex_lock(&g_dio_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_dio_thread_count--;
-
if ((result=pthread_mutex_unlock(&g_dio_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
logDebug("file: "__FILE__", line: %d, " \
-
"dio thread exited, thread count: %d", \
-
__LINE__, g_dio_thread_count);
-
return NULL;
-
}
对于上传任务来说,dealtask实际上是dowrite_file
-
int dio_write_file(struct fast_task_info *pTask)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
int result;
-
int write_bytes;
-
char *pDataBuff;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
result = 0;
-
do
-
{
-
if (pFileContext->fd < 0)
-
{
-
if (pFileContext->extra_info.upload.before_open_callback!=NULL)
-
{
-
result = pFileContext->extra_info.upload. \
-
before_open_callback(pTask);
-
if (result != 0)
-
{
-
break;
-
}
-
}
-
if ((result=dio_open_file(pFileContext)) != 0)
-
{
-
break;
-
}
-
}
-
pDataBuff = pTask->data + pFileContext->buff_offset;
-
write_bytes = pTask->length - pFileContext->buff_offset;
-
if (write(pFileContext->fd, pDataBuff, write_bytes) != write_bytes)
-
{
-
result = errno != 0 ? errno : EIO;
-
logError("file: "__FILE__", line: %d, " \
-
"write to file: %s fail, fd=%d, write_bytes=%d, " \
-
"errno: %d, error info: %s", \
-
__LINE__, pFileContext->filename, \
-
pFileContext->fd, write_bytes, \
-
result, STRERROR(result));
-
}
-
pthread_mutex_lock(&g_dio_thread_lock);
-
g_storage_stat.total_file_write_count++;
-
if (result == 0)
-
{
-
g_storage_stat.success_file_write_count++;
-
}
-
pthread_mutex_unlock(&g_dio_thread_lock);
-
if (result != 0)
-
{
-
break;
-
}
-
if (pFileContext->calc_crc32)
-
{
-
pFileContext->crc32 = CRC32_ex(pDataBuff, write_bytes, \
-
pFileContext->crc32);
-
}
-
if (pFileContext->calc_file_hash)
-
{
-
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
-
{
-
CALC_HASH_CODES4(pDataBuff, write_bytes, \
-
pFileContext->file_hash_codes)
-
}
-
else
-
{
-
my_md5_update(&pFileContext->md5_context, \
-
(unsigned char *)pDataBuff, write_bytes);
-
}
-
}
-
/*
-
logInfo("###dio write bytes: %d, pTask->length=%d, buff_offset=%d", \
-
write_bytes, pTask->length, pFileContext->buff_offset);
-
*/
-
pFileContext->offset += write_bytes;
-
if (pFileContext->offset < pFileContext->end)
-
{
-
pFileContext->buff_offset = 0;
-
storage_nio_notify(pTask); //notify nio to deal
-
}
-
else
-
{
-
if (pFileContext->calc_crc32)
-
{
-
pFileContext->crc32 = CRC32_FINAL( \
-
pFileContext->crc32);
-
}
-
if (pFileContext->calc_file_hash)
-
{
-
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
-
{
-
FINISH_HASH_CODES4(pFileContext->file_hash_codes)
-
}
-
else
-
{
-
my_md5_final((unsigned char *)(pFileContext-> \
-
file_hash_codes), &pFileContext->md5_context);
-
}
-
}
-
if (pFileContext->extra_info.upload.before_close_callback != NULL)
-
{
-
result = pFileContext->extra_info.upload. \
-
before_close_callback(pTask);
-
}
-
/* file write done, close it */
-
close(pFileContext->fd);
-
pFileContext->fd = -1;
-
if (pFileContext->done_callback != NULL)
-
{
-
pFileContext->done_callback(pTask, result);
-
}
-
}
-
return 0;
-
} while (0);
-
pClientInfo->clean_func(pTask);
-
if (pFileContext->done_callback != NULL)
-
{
-
pFileContext->done_callback(pTask, result);
-
}
-
return result;
-
}
pFileContext->donecallback对应了storageuploadfiledone_callback
-
static void storage_upload_file_done_callback(struct fast_task_info *pTask, \
-
const int err_no)
-
{
-
StorageClientInfo *pClientInfo;
-
StorageFileContext *pFileContext;
-
TrackerHeader *pHeader;
-
int result;
-
pClientInfo = (StorageClientInfo *)pTask->arg;
-
pFileContext = &(pClientInfo->file_context);
-
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
-
{
-
result = trunk_client_trunk_alloc_confirm( \
-
&(pFileContext->extra_info.upload.trunk_info), err_no);
-
if (err_no != 0)
-
{
-
result = err_no;
-
}
-
}
-
else
-
{
-
result = err_no;
-
}
-
if (result == 0)
-
{
-
result = storage_service_upload_file_done(pTask);
-
if (result == 0)
-
{
-
if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
-
{
-
result = storage_binlog_write(\
-
pFileContext->timestamp2log, \
-
STORAGE_OP_TYPE_SOURCE_CREATE_FILE, \
-
pFileContext->fname2log);
-
}
-
}
-
}
-
if (result == 0)
-
{
-
int filename_len;
-
char *p;
-
if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
-
{
-
CHECK_AND_WRITE_TO_STAT_FILE3_WITH_BYTES( \
-
g_storage_stat.total_upload_count, \
-
g_storage_stat.success_upload_count, \
-
g_storage_stat.last_source_update, \
-
g_storage_stat.total_upload_bytes, \
-
g_storage_stat.success_upload_bytes, \
-
pFileContext->end - pFileContext->start)
-
}
-
filename_len = strlen(pFileContext->fname2log);
-
pClientInfo->total_length = sizeof(TrackerHeader) + \
-
FDFS_GROUP_NAME_MAX_LEN + filename_len;
-
p = pTask->data + sizeof(TrackerHeader);
-
memcpy(p, pFileContext->extra_info.upload.group_name, \
-
FDFS_GROUP_NAME_MAX_LEN);
-
p += FDFS_GROUP_NAME_MAX_LEN;
-
memcpy(p, pFileContext->fname2log, filename_len);
-
}
-
else
-
{
-
pthread_mutex_lock(&stat_count_thread_lock);
-
if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
-
{
-
g_storage_stat.total_upload_count++;
-
g_storage_stat.total_upload_bytes += \
-
pClientInfo->total_offset;
-
}
-
pthread_mutex_unlock(&stat_count_thread_lock);
-
pClientInfo->total_length = sizeof(TrackerHeader);
-
}
-
STORAGE_ACCESS_LOG(pTask, ACCESS_LOG_ACTION_UPLOAD_FILE, result);
-
pClientInfo->total_offset = 0;
-
pTask->length = pClientInfo->total_length;
-
pHeader = (TrackerHeader *)pTask->data;
-
pHeader->status = result;
-
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
-
long2buff(pClientInfo->total_length - sizeof(TrackerHeader), \
-
pHeader->pkg_len);
-
//又看到熟悉的函数了,这完成以后将pTask从磁盘线程压入work线程
-
//work线程调用storage_recv_notify_read函数来做下一步处理
-
storage_nio_notify(pTask);
-
}
-
void storage_recv_notify_read(int sock, short event, void *arg)
-
{
-
//前文已有,略过
-
...
-
//刚从磁盘线程里出来的任务状态依然是dio_thread,去掉dio_thread状态
-
if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
-
{
-
pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
-
}
-
switch (pClientInfo->stage)
-
{
-
//前文已有,略过
-
...
-
case FDFS_STORAGE_STAGE_NIO_RECV:
-
pTask->offset = 0;
-
remain_bytes = pClientInfo->total_length - \
-
pClientInfo->total_offset;
-
if (remain_bytes > pTask->size)
-
{
-
pTask->length = pTask->size;
-
}
-
else
-
{
-
pTask->length = remain_bytes;
-
}
-
if (set_recv_event(pTask) == 0)
-
{
-
client_sock_read(pTask->event.fd,
-
IOEVENT_READ, pTask);
-
}
-
result = 0;
-
break;
-
case FDFS_STORAGE_STAGE_NIO_SEND:
-
result = storage_send_add_event(pTask);
-
break;
-
case FDFS_STORAGE_STAGE_NIO_CLOSE:
-
result = EIO; //close this socket
-
break;
-
default:
-
logError("file: "__FILE__", line: %d, " \
-
"invalid stage: %d", __LINE__, \
-
pClientInfo->stage);
-
result = EINVAL;
-
break;
-
}
-
if (result != 0)
-
{
-
add_to_deleted_list(pTask);
-
}
-
}
调用了clientsockread函数进行处理
-
static void client_sock_read(int sock, short event, void *arg)
-
{
-
//前文已有,略
-
...
-
pTask->offset += bytes;
-
if (pTask->offset >= pTask->length) //recv current pkg done
-
{
-
//这个req接受完毕,准备反馈rsp
-
if (pClientInfo->total_offset + pTask->length >= \
-
pClientInfo->total_length)
-
{
-
/* current req recv done */
-
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
-
pTask->req_count++;
-
}
-
if (pClientInfo->total_offset == 0)
-
{
-
pClientInfo->total_offset = pTask->length;
-
storage_deal_task(pTask);
-
}
-
else
-
{
-
//接受的是数据包,压入磁盘线程
-
pClientInfo->total_offset += pTask->length;
-
/* continue write to file */
-
storage_dio_queue_push(pTask);
-
}
-
return;
-
}
-
return;
-
}
数据包的网络接收和磁盘的处理成为一个环,接收完一部分,通过队列压入磁盘队列,磁盘线程处理完以后又通过像工作线程的fd进行写,触发网络线程读取这个task。自此源源不断将数据传过来。
总结:
还是上图吧,整个处理流程如下图
1 client发出请求,accept线程catch到描述符,初始化pTask结构,填入描述符,然后将pTask通过管道给work_entrance
2 进入storagerecvnotify_read函数
3 根据当前的pTask->stage等于FDFSSTORAGESTAGEINIT为fd创建读事件,绑定函数clientsock_read
4 调用storageuploadfile
5 storageuploadfile调用storagewriteto_file
6 storagewritetofile调用压磁盘队列函数storagedioqueuepush
7 storagedioqueuepush将pTask->stage |= FDFSSTORAGESTAGEDIO_THREAD
8 根据事件触发机制,clientsockread将被不断的调用,然而由于pTask->stage != FDFSSTORAGESTAGE_RECV,所以返回
9 磁盘线程通过队列取pTask,调用pTask的处理函数diowritefile
10 调用storageuploadfiledonecallback,调用storagenionotify,通过管道的形式将pTask压入工作进程
11 触发storagerecvnotifyread,将task->stage的FDFSSTORAGESTAGEDIO_THREAD标志去除
12 根据task->stage的FDFSSTORAGESTAGERECV状态,调用函数clientsock_read
13 clientsockread读取完以后调用磁盘队列函数storagedioqueue_push
14 重复7
15 直到结束
一次上传逻辑分析完成
另外pTask的大小是在配置文件里指定的,默认256KB,补充说明一下
每个连接只提供一个pTask来做数据接受和写,猜测是怕大并发占用太多的系统内存吧。
比如1W并发下,256K的pTask大致是存在1W个,也就是2.5G左右内存
我以前自己写的那个分布式文件系统也是这个串行化的逻辑,因为这样开发简单有效。
有一点不足,我以前把数据压入磁盘IO后,我就删除了这个事件,等到磁盘线程读写完毕,我再建立这个事件。
看鱼大是通过判断pTask->stage的状态来暂时忽略回调的,这样在逻辑上比较好,毕竟有事件发生了就要去处理,删掉了始终不是什么好办法。
据事件触发机制,clientsockread将被不断的调用,然而由于pTask->stage != FDFSSTORAGESTAGE_RECV,所以返回
9 磁盘线程通过队列取pTask,调用pTask的处理函数diowritefile
10 调用storageuploadfiledonecallback,调用storagenionotify,通过管道的形式将pTask压入工作进程
11 触发storagerecvnotifyread,将task->stage的FDFSSTORAGESTAGEDIO_THREAD标志去除
12 根据task->stage的FDFSSTORAGESTAGERECV状态,调用函数clientsock_read
13 clientsockread读取完以后调用磁盘队列函数storagedioqueue_push
14 重复7
15 直到结束
一次上传逻辑分析完成
另外pTask的大小是在配置文件里指定的,默认256KB,补充说明一下
每个连接只提供一个pTask来做数据接受和写,猜测是怕大并发占用太多的系统内存吧。
比如1W并发下,256K的pTask大致是存在1W个,也就是2.5G左右内存
我以前自己写的那个分布式文件系统也是这个串行化的逻辑,因为这样开发简单有效。
有一点不足,我以前把数据压入磁盘IO后,我就删除了这个事件,等到磁盘线程读写完毕,我再建立这个事件。
看鱼大是通过判断pTask->stage的状态来暂时忽略回调的,这样在逻辑上比较好,毕竟有事件发生了就要去处理,删掉了始终不是什么好办法。