tcp段重组--suricata实现

tcp协议上的应用层协议检测时,需要做段重组,这里分析下suricata的TCP 段重组源码。

1. 数据结构

1.1 TcpSession结构体

TcpSession结构体,为flow结构体成员protoctx。

 typedef struct TcpSession_ {
        PoolThreadReserved res; /*PoolThread使用的的id号。*/
        uint8_t state; 		/*会话状态,TCP_NONE, TCP_LISTEN, TCP_SYN_SENT, TCP_SYN_RECV, TCP_ESTABLISHED, TCP_FIN_WAIT1, TCP_FIN_WAIT2, TCP_TIME_WAIT, TCP_LAST_ACK, TCP_CLOSE_WAIT, TCP_CLOSING, TCP_CLOSED, */
        uint8_t queue_len;                      /**< length of queue list below */
        int8_t data_first_seen_dir;			/*首次出现的数据的方向STREAM_TOSERVER, STREAM_TOCLIENT*/
        /** track all the tcp flags we've seen */
        uint8_t tcp_packet_flags;
        /*flags:
        STREAMTCP_FLAG_MIDSTREAM
		STREAMTCP_FLAG_MIDSTREAM_ESTABLISHED
		STREAMTCP_FLAG_MIDSTREAM_SYNACK
		STREAMTCP_FLAG_TIMESTAMP
		STREAMTCP_FLAG_SERVER_WSCALE
		server端支持WSCALE(窗口扩大选项)
		STREAMTCP_FLAG_ASYNC
		标记这是一个异步单向收包的会话。
		STREAMTCP_FLAG_4WHS
		标记这是一个四次握手建立连接的会话,SYN, SYN, SYN/ACK, ACK。
		STREAMTCP_FLAG_DETECTION_EVASION_ATTEMPT
		STREAMTCP_FLAG_CLIENT_SACKOK
		标记client支持SACK
		STREAMTCP_FLAG_SACKOK
		标记双方均支持SACK
		STREAMTCP_FLAG_3WHS_CONFIRMED
		三次握手后SERVER发送了ACK包后设置该标记,用于标记握手的绝对完成。
		STREAMTCP_FLAG_APP_LAYER_DISABLED
		STREAMTCP_FLAG_BYPASS
	
		*/
        uint16_t flags;		
        uint32_t reassembly_depth;      /*reassembly支持的最大字节数 */
        TcpStream server;
        TcpStream client;
        TcpStateQueue *queue;                   /**< list of SYN/ACK candidates */
    } TcpSession;

TcpSession内有client和server两个成员类型为TcpStream,这个结构体保存了一个方向数据流的全部信息,包括tcp头中体现的stream的状态和数据的缓存。TcpSteam中数据缓存保存在StreamingBuffer结构体中,接收到的数据segment的序号和长度保存在TcpSegment结构体中。

1.2 TcpStream结构体

  typedef struct TcpStream_ {
        uint16_t flags:12;              /**< Flag specific to the stream e.g. Timestamp */
        /* coccinelle: TcpStream:flags:STREAMTCP_STREAM_FLAG_ */
        uint16_t wscale:4;              /**< wscale setting in this direction, 4 bits as max val is 15 */
        uint8_t os_policy;              /**< target based OS policy used for reassembly and handling packets*/
        uint8_t tcp_flags;              /**< TCP flags seen */
    
        uint32_t isn;                   /**< initial sequence number */
        uint32_t next_seq;              /**< next expected sequence number */
        uint32_t last_ack;              /**< last ack'd sequence number in this stream */
        uint32_t next_win;              /**< next max seq within window */
        uint32_t window;                /**< current window setting, after wscale is applied */
    
        uint32_t last_ts;               /**< Time stamp (TSVAL) of the last seen packet for this stream*/
        uint32_t last_pkt_ts;           /**< Time of last seen packet for this stream (needed for PAWS update)
                                             This will be used to validate the last_ts, when connection has been idle for
                                             longer time.(RFC 1323)*/
        /* reassembly */
        uint32_t base_seq;              /**< seq where we are left with reassebly. Matches STREAM_BASE_OFFSET below. */
    
        uint32_t app_progress_rel;      /**< app-layer progress relative to STREAM_BASE_OFFSET */
        uint32_t raw_progress_rel;      /**< raw reassembly progress relative to STREAM_BASE_OFFSET */
        uint32_t log_progress_rel;      /**< streaming logger progress relative to STREAM_BASE_OFFSET */
    
        StreamingBuffer sb;
    
        TcpSegment *seg_list;           /**< list of TCP segments that are not yet (fully) used in reassembly */
        TcpSegment *seg_list_tail;      /**< Last segment in the reassembled stream seg list*/
    
        StreamTcpSackRecord *sack_head; /**< head of list of SACK records */
        StreamTcpSackRecord *sack_tail; /**< tail of list of SACK records */
    } TcpStream;

tcp段重组--suricata实现
上图展示了,tcpstream的数据结构。

/**
 *  \brief block of continues data
 */
typedef struct StreamingBufferBlock_ {
    // block标记的数据头部相对于整个stream的偏移。
    uint64_t offset;
    // block标记的数据长度。
    uint32_t len;
    struct StreamingBufferBlock_ *next;
} StreamingBufferBlock;

typedef struct StreamingBuffer_ {
    const StreamingBufferConfig *cfg;

    // buf数据相对于整个stream数据的偏移。由于存在AutoSlide函数被调用的可能,也就是当空间不足以放置数据时,buf会保留尾部cfg->buf_slide大小的数据,并向左偏移,也就说是滑动的距离为buf_offset - cfg->buf_slide。这时stream_offset就需要增加滑动的距离,buf_offset变更为cfg->buf_slide。(个人认为如果AutoSlide被调用会导致stream_offset变更,但是app_progress_rel等不会改变,应该是个bug。实际代码运行中,AutoSlide不会被调用,因此bug不会出现)。
    // 另外在tcp segment清理阶段,清理不需要的segment时,buf中的数据会做滑动,buf_offset和stream_offset会做调整。stream成员base_seq和app_progress_rel等成员也做相应调整。
    uint64_t stream_offset; /**< offset of the start of the memory block */

    // stream数据的缓存
    uint8_t *buf;           /**< memory block for reassembly */
    // buf的长度
    uint32_t buf_size;      /**< size of memory block */
    // buf被填充到的位置,也就是使用的字节数
    uint32_t buf_offset;    /**< how far we are in buf_size */

    // StreamingBufferBlock是为了标记stream中接收数据出现了空洞而存在的。比如丢包产生的空洞。
    // block记录了实际存在的数据,block之间的部分就是数据空洞。
    StreamingBufferBlock *block_list;
    StreamingBufferBlock *block_list_tail;
#ifdef DEBUG
    uint32_t buf_size_max;
#endif
} StreamingBuffer;
typedef struct StreamingBufferSegment_ {
    // stream_offset是相对于整个stream的offset。
    uint64_t stream_offset;
    // StreamingBuffer中的数据长度。
    uint32_t segment_len;
} __attribute__((__packed__)) StreamingBufferSegment;

typedef struct TcpSegment_ {
    PoolThreadReserved res;
    uint16_t payload_len;       /**< actual size of the payload */
    uint32_t seq;
    StreamingBufferSegment sbseg;
    struct TcpSegment_ *next;
    struct TcpSegment_ *prev;
} TcpSegment;

2. tcp段重组流程

tcp处理的相关配置初始化位于main -> PostConfLoadedSetup -> PreRunInit -> StreamTcpInitConfig。
tcp段重组--suricata实现
tcp reassembly的主体逻辑在FlowWorker -> StreamTcp中。(上图11-12,画圈的地方)

2.1 FlowWorker -> StreamTcp -> StreamTcpPacket

    /* flow is and stays locked */
    int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
                         PacketQueue *pq)
    {
		...     
		
        TcpSession *ssn = (TcpSession *)p->flow->protoctx;
        
        ...
        
        if (ssn == NULL || ssn->state == TCP_NONE) {
        	/*
        	1. SYN数据包是tcp连接中的首个数据包,由client发送给server,flow结构刚刚分配,session还没有创建,这时会进入StreamTcpPacketStateNone函数。这里我们不关注midstream和async_onside,只关注正常的三次握手,也就是这时进入只有SYN标记的分支。
        	这里首先调用StreamTcpNewSession获得一个新的session,内部成员只进行了初始化,很简单。
        	由于一个SYN包已经发送出来,session状态设置为TCP_SYN_SENT。
        	更新session中client和server成员中tcp头相关的值,比如isn、next_seq、base_seq、last_ts、last_pkt_ts、flags、window、wscale等,这部分是tcp协议头相关,不做更多关注。

			*/
            if (StreamTcpPacketStateNone(tv, p, stt, ssn, &stt->pseudo_queue) == -1) {
                goto error;
            }
            
            ...
            
            switch (ssn->state) {
                case TCP_SYN_SENT:
                	/*
                	2. SYN/ACK数据包是tcp连接中的第二个数据包,由server发送给client。
                	这时session状态是TCP_SYN_SENT,
                	session状态更新为TCP_SYN_RECV。
                	更新session中client和server成员中tcp头相关的值。
                	*/
                    if(StreamTcpPacketStateSynSent(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_SYN_RECV:
                	/*
                	3. 从第三个数据包开始,均为ACK数据包(这里先不考虑RST和FIN的情况),区别在于session状态的不同。第三个数据包收到时session状态为TCP_SYN_RECV
                	3.1. 更新session和其成员client与server的tcp协议头相关字段。
                	3.2. 更新session状态为TCP_ESTABLISHED。
                	3.3. 调用函数StreamTcpReassembleHandleSegment,这个函数从名字可以看出开始正式处理tcp数据包中可能携带的数据,并做reassembly。
					*/
                    if(StreamTcpPacketStateSynRecv(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_ESTABLISHED:
                	/*
                	4. 从第四个数据包开始,session状态为TCP_ESTABLISHED
                	更新session和其成员client与server的tcp协议头相关字段.
                	调用函数StreamTcpReassembleHandleSegmen
					*/
                    if(StreamTcpPacketStateEstablished(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_FIN_WAIT1:
                    if(StreamTcpPacketStateFinWait1(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_FIN_WAIT2:
                    if(StreamTcpPacketStateFinWait2(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_CLOSING:
                    if(StreamTcpPacketStateClosing(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_CLOSE_WAIT:
                    if(StreamTcpPacketStateCloseWait(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_LAST_ACK:
                    if(StreamTcpPacketStateLastAck(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_TIME_WAIT:
                    if(StreamTcpPacketStateTimeWait(tv, p, stt, ssn, &stt->pseudo_queue)) {
                        goto error;
                    }
                    break;
                case TCP_CLOSED:
                    /* TCP session memory is not returned to pool until timeout. */
                    SCLogDebug("packet received on closed state");
                    break;
                default:
                    SCLogDebug("packet received on default state");
                    break;
            }

2.2 FlowWorker -> StreamTcp -> StreamTcpPacket->StreamTcpReassembleHandleSegment

StreamTcpReassembleHandleSegment函数用来处理tcp stream中的数据,这里的处理逻辑区分了suricata的当前模式是IDS还是IPS。

IDS模式

  1. 首先处理对端stream已经缓存的数据包,进行应用层协议识别。函数栈为StreamTcpReassembleHandleSegmentUpdateACK -> StreamTcpReassembleAppLayer -> ReassembleUpdateAppLayer。
  2. 然后处理当前packet的数据,加入到本端stream的缓存中。函数为StreamTcpReassembleHandleSegmentHandleData。

IPS模式

IPS模式下,或当前packet是PKT_PSEUDO_STREAM_END、RST或进入连接关闭流程。

  1. 首先处理当前packet的数据,加入到本端stream的缓存中。函数为StreamTcpReassembleHandleSegmentHandleData。
  2. 然后处理本端stream已经缓存的数据包,这里包含了刚刚加入缓存的数据,进行应用层协议识别。函数栈为StreamTcpReassembleAppLayer -> ReassembleUpdateAppLayer。
    tcp segment数据重组
int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
                                     TcpSession *ssn, TcpStream *stream,
                                     Packet *p, PacketQueue *pq)
{
    /* we need to update the opposing stream in
     * StreamTcpReassembleHandleSegmentUpdateACK */
    TcpStream *opposing_stream = NULL;
    if (stream == &ssn->client) {
        opposing_stream = &ssn->server;
    } else {
        opposing_stream = &ssn->client;
    }

    /* default IDS: update opposing side (triggered by ACK) */
    enum StreamUpdateDir dir = UPDATE_DIR_OPPOSING;
    /* inline and stream end and flow timeout packets trigger same dir handling */
    if (StreamTcpInlineMode()) {
        dir = UPDATE_DIR_PACKET;
    } else if (p->flags & PKT_PSEUDO_STREAM_END) {
        dir = UPDATE_DIR_PACKET;
    } else if (p->tcph && (p->tcph->th_flags & TH_RST)) { // accepted rst
        dir = UPDATE_DIR_PACKET;
    } else if (p->tcph && (p->tcph->th_flags & TH_FIN) && ssn->state > TCP_TIME_WAIT) {
        dir = UPDATE_DIR_PACKET;
    }

    /* handle ack received */
    if (dir == UPDATE_DIR_OPPOSING &&
        StreamTcpReassembleHandleSegmentUpdateACK(tv, ra_ctx, ssn, opposing_stream, p) != 0)
    {
        SCLogDebug("StreamTcpReassembleHandleSegmentUpdateACK error");
        SCReturnInt(-1);
    }

    /* if this segment contains data, insert it */
    if (p->payload_len > 0 && !(stream->flags & STREAMTCP_STREAM_FLAG_NOREASSEMBLY)) {
        SCLogDebug("calling StreamTcpReassembleHandleSegmentHandleData");

        if (StreamTcpReassembleHandleSegmentHandleData(tv, ra_ctx, ssn, stream, p) != 0) {
            SCLogDebug("StreamTcpReassembleHandleSegmentHandleData error");
            SCReturnInt(-1);
        }

        SCLogDebug("packet %"PRIu64" set PKT_STREAM_ADD", p->pcap_cnt);
        p->flags |= PKT_STREAM_ADD;
    } else {
        SCLogDebug("ssn %p / stream %p: not calling StreamTcpReassembleHandleSegmentHandleData:"
                " p->payload_len %u, STREAMTCP_STREAM_FLAG_NOREASSEMBLY %s",
                ssn, stream, p->payload_len,
                (stream->flags & STREAMTCP_STREAM_FLAG_NOREASSEMBLY) ? "true" : "false");
    }

    /* in stream inline mode even if we have no data we call the reassembly
     * functions to handle EOF */
    if (dir == UPDATE_DIR_PACKET) {
        SCLogDebug("inline (%s) or PKT_PSEUDO_STREAM_END (%s)",
                StreamTcpInlineMode()?"true":"false",
                (p->flags & PKT_PSEUDO_STREAM_END) ?"true":"false");
        if (StreamTcpReassembleAppLayer(tv, ra_ctx, ssn, stream, p, dir) < 0) {
            SCReturnInt(-1);
        }
    }

    SCReturnInt(0);
}

2.3 FlowWorker -> StreamTcp -> StreamTcpPacket->StreamTcpReassembleHandleSegment->StreamTcpReassembleHandleSegmentHandleData

数据重组操作位于函数StreamTcpReassembleHandleSegmentHandleData。

/**
 *  \brief Insert a packets TCP data into the stream reassembly engine.
 *
 *  \retval 0 good segment, as far as we checked.
 *  \retval -1 badness, reason to drop in inline mode
 *
 *  If the retval is 0 the segment is inserted correctly, or overlap is handled,
 *  or it wasn't added because of reassembly depth.
 *
 */
int StreamTcpReassembleHandleSegmentHandleData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
                                TcpSession *ssn, TcpStream *stream, Packet *p)
{
    SCEnter();

    if (ssn->data_first_seen_dir == 0) {
        if (PKT_IS_TOSERVER(p)) {
            ssn->data_first_seen_dir = STREAM_TOSERVER;
        } else {
            ssn->data_first_seen_dir = STREAM_TOCLIENT;
        }
    }

    /* If the OS policy is not set then set the OS policy for this stream */
    if (stream->os_policy == 0) {
        StreamTcpSetOSPolicy(stream, p);
    }

	/*
	检查session的标记STREAMTCP_FLAG_APP_LAYER_DISABLED,
	和stream的标记STREAMTCP_STREAM_FLAG_NEW_RAW_DISABLED,
	如果这两个都关闭了,那么没有继续重组的必要了。
	*/
    if ((ssn->flags & STREAMTCP_FLAG_APP_LAYER_DISABLED) &&
        (stream->flags & STREAMTCP_STREAM_FLAG_NEW_RAW_DISABLED)) {
        SCLogDebug("ssn %p: both app and raw reassembly disabled, not reassembling", ssn);
        SCReturnInt(0);
    }

    /* If we have reached the defined depth for either of the stream, then stop
       reassembling the TCP session 
	检查stream的标记STREAMTCP_STREAM_FLAG_DEPTH_REACHED,
	以及当前packet是否会导致超出reassembly depth,如果超过则增加该标记	
	*/
    uint32_t size = StreamTcpReassembleCheckDepth(ssn, stream, TCP_GET_SEQ(p), p->payload_len);
    SCLogDebug("ssn %p: check depth returned %"PRIu32, ssn, size);

    if (stream->flags & STREAMTCP_STREAM_FLAG_DEPTH_REACHED) {
        /* increment stream depth counter */
        StatsIncr(tv, ra_ctx->counter_tcp_stream_depth);
    }
    if (size == 0) {
        SCLogDebug("ssn %p: depth reached, not reassembling", ssn);
        SCReturnInt(0);
    }

    DEBUG_VALIDATE_BUG_ON(size > p->payload_len);
    if (size > p->payload_len)
        size = p->payload_len;

	/*
	取得一个TcpSegment(使用了一个结构PoolThread,后面介绍)
	设置其seq***和payload_len
	*/
    TcpSegment *seg = StreamTcpGetSegment(tv, ra_ctx);
    if (seg == NULL) {
        SCLogDebug("segment_pool is empty");
        StreamTcpSetEvent(p, STREAM_REASSEMBLY_NO_SEGMENT);
        SCReturnInt(-1);
    }

    TCP_SEG_LEN(seg) = size;
    seg->seq = TCP_GET_SEQ(p);

    /* proto detection skipped, but now we do get data. Set event. */
    if (stream->seg_list == NULL &&
        stream->flags & STREAMTCP_STREAM_FLAG_APPPROTO_DETECTION_SKIPPED) {

        AppLayerDecoderEventsSetEventRaw(&p->app_layer_events,
                APPLAYER_PROTO_DETECTION_SKIPPED);
    }

    if (StreamTcpReassembleInsertSegment(tv, ra_ctx, stream, seg, p, TCP_GET_SEQ(p), p->payload, p->payload_len) != 0) {
        SCLogDebug("StreamTcpReassembleInsertSegment failed");
        SCReturnInt(-1);
    }
    SCReturnInt(0);
}

2.4 FlowWorker -> StreamTcp -> StreamTcpPacket->StreamTcpReassembleHandleSegment->StreamTcpReassembleHandleSegmentHandleData->StreamTcpReassembleInsertSegment

/**
 *  \retval -1 segment not inserted
 *
 *  \param seg segment, this function takes total ownership
 *
 *  In case of error, this function returns the segment to the pool
 */
int StreamTcpReassembleInsertSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
        TcpStream *stream, TcpSegment *seg, Packet *p, uint32_t pkt_seq, uint8_t *pkt_data, uint16_t pkt_datalen)
{
    /* insert segment into list. Note: doesn't handle the data 
	调用函数DoInsertSegment将刚才的segment插入到链表中,按seq序号排列。
	这个函数的返回值区分是segment是否有相互覆盖的情况。
	*/
    int r = DoInsertSegment (stream, seg, p);
    if (r < 0) {
    	/*有覆盖*/
        StatsIncr(tv, ra_ctx->counter_tcp_reass_list_fail);
        StreamTcpSegmentReturntoPool(seg);
        SCReturnInt(-1);
    }

    if (likely(r == 0)) {
        /*
         no overlap, straight data insert 
        无覆盖

        */
        
        int res = InsertSegmentDataCustom(stream, seg, pkt_data, pkt_datalen);
        if (res < 0) {
            StatsIncr(tv, ra_ctx->counter_tcp_reass_data_normal_fail);
            StreamTcpRemoveSegmentFromStream(stream, seg);
            StreamTcpSegmentReturntoPool(seg);
            SCReturnInt(-1);
        }

    } else if (r == 1) {
        /* XXX should we exclude 'retransmissions' here? */
        StatsIncr(tv, ra_ctx->counter_tcp_reass_overlap);

        /* now let's consider the data in the overlap case */
        int res = DoHandleData(tv, ra_ctx, stream, seg, p);
        if (res < 0) {
            StatsIncr(tv, ra_ctx->counter_tcp_reass_data_overlap_fail);
            StreamTcpRemoveSegmentFromStream(stream, seg);
            StreamTcpSegmentReturntoPool(seg);
            SCReturnInt(-1);
        }
    }

    SCReturnInt(0);
}

2.5 FlowWorker -> StreamTcp -> StreamTcpPacket->StreamTcpReassembleHandleSegment->StreamTcpReassembleHandleSegmentHandleData->StreamTcpReassembleInsertSegment->InsertSegmentDataCustom

/** \internal
 *  \brief insert segment data into the streaming buffer
 *  \param seg segment to store stream offset in
 *  \param data segment data after overlap handling (if any)
 *  \param data_len data length
 */
static inline int InsertSegmentDataCustom(TcpStream *stream, TcpSegment *seg, uint8_t *data, uint16_t data_len)
{
    uint64_t stream_offset;
    uint16_t data_offset;

	/*对比数据seq序号与stream->base_seq,确定数据写入在StreamingBuffer中的偏移和长度。*/
    if (likely(SEQ_GEQ(seg->seq, stream->base_seq))) {
        stream_offset = STREAM_BASE_OFFSET(stream) + (seg->seq - stream->base_seq);
        data_offset = 0;
    } else {
        /* segment is partly before base_seq */
        data_offset = stream->base_seq - seg->seq;
        stream_offset = STREAM_BASE_OFFSET(stream);
    }

    SCLogDebug("stream %p buffer %p, stream_offset %"PRIu64", "
               "data_offset %"PRIu16", SEQ %u BASE %u, data_len %u",
               stream, &stream->sb, stream_offset,
               data_offset, seg->seq, stream->base_seq, data_len);
    BUG_ON(data_offset > data_len);
    if (data_len == data_offset) {
        SCReturnInt(0);
    }

	/*拷贝数据内容,写入segment->sbseg。*/
    if (StreamingBufferInsertAt(&stream->sb, &seg->sbseg,
                data + data_offset,
                data_len - data_offset,
                stream_offset) != 0) {
        SCReturnInt(-1);
    }
#ifdef DEBUG
    {
        const uint8_t *mydata;
        uint32_t mydata_len;
        uint64_t mydata_offset;
        StreamingBufferGetData(&stream->sb, &mydata, &mydata_len, &mydata_offset);

        SCLogDebug("stream %p seg %p data in buffer %p of len %u and offset %"PRIu64,
                stream, seg, &stream->sb, mydata_len, mydata_offset);
        //PrintRawDataFp(stdout, mydata, mydata_len);
    }
#endif
    SCReturnInt(0);
}

2.6 FlowWorker -> StreamTcp -> StreamTcpPacket->StreamTcpReassembleHandleSegment->StreamTcpReassembleHandleSegmentHandleData->StreamTcpReassembleInsertSegment->InsertSegmentDataCustom->StreamingBufferInsertAt

int StreamingBufferInsertAt(StreamingBuffer *sb, StreamingBufferSegment *seg,
                            const uint8_t *data, uint32_t data_len,
                            uint64_t offset)
{
    BUG_ON(seg == NULL);

    if (offset < sb->stream_offset)
        return -1;

    if (sb->buf == NULL) {
        if (InitBuffer(sb) == -1)
            return -1;
    }

    uint32_t rel_offset = offset - sb->stream_offset;
	/*检查写入的数据后,StreamingBuffer中是否存在空隙*/    
    if (!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)) {
        if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE) {
            AutoSlide(sb);
            rel_offset = offset - sb->stream_offset;
        }
        if (!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)) {
            if (GrowToSize(sb, (rel_offset + data_len)) != 0)
                return -1;
        }
    }
    if (!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)) {
        return -1;
    }

    memcpy(sb->buf + rel_offset, data, data_len);
    seg->stream_offset = offset;
    seg->segment_len = data_len;

    SCLogDebug("rel_offset %u sb->stream_offset %"PRIu64", buf_offset %u",
            rel_offset, sb->stream_offset, sb->buf_offset);

	/*根据实际情况更新StreamingBuffer中的block_list和block_list_tail。*/
    if (sb->block_list == NULL) {
        SCLogDebug("empty sbb list");

        if (sb->stream_offset == offset) {
            SCLogDebug("empty sbb list: block exactly what was expected, fall through");
            /* empty list, data is exactly what is expected (append),
             * so do nothing */
        } else if ((rel_offset + data_len) <= sb->buf_offset) {
            SCLogDebug("empty sbb list: block is within existing region");
        } else {
            if (sb->buf_offset && rel_offset == sb->buf_offset) {
                // nothing to do
            } else if (rel_offset < sb->buf_offset) {
                // nothing to do
            } else if (sb->buf_offset) {
                /* existing data, but there is a gap between us */
                SBBInit(sb, rel_offset, data_len);
            } else {
                /* gap before data in empty list */
                SCLogDebug("empty sbb list: invoking SBBInitLeadingGap");
                SBBInitLeadingGap(sb, offset, data_len);
            }
        }
    } else {
        /* already have blocks, so append new block based on new data */
        SBBUpdate(sb, rel_offset, data_len);
    }

    if (rel_offset + data_len > sb->buf_offset)
        sb->buf_offset = rel_offset + data_len;

    return 0;
}

参考:http://www.hyuuhit.com/2018/05/28/suricata-4-0-3-tcp-reassembly/