基于线程调度的同步机制

 基于线程调度的同步机制

1  线程进入等待

当一个线程的控制流到达一个等待函数时,若等待的条件不满足,则线程调度器会将处理器的执行权交给其他处于备用或就绪状态的线程。这些被等待的对象可以用来协调线程之间的行为,它们被称为同步对象或者分发器对象。头部以DISPATCH_HEADER开始的对象都市分发器对象,定义如下:

typedef struct _DISPATCHER_HEADER {

    union {

        struct {

            UCHAR Type;

            union {

                UCHAR Absolute;

                UCHAR NpxIrql;

            };

            union {

                UCHAR Size;

                UCHAR Hand;

            };

            union {

                UCHAR Inserted;

                BOOLEAN DebugActive;

            };

        };

        volatile LONG Lock;

    };

    LONG SignalState;

    LIST_ENTRY WaitListHead;

} DISPATCHER_HEADER;

DISPATCHER_HEADER 的第一个成员,要么是一个锁Lock,要么是Type 代表的类型。

SignalState : 代表该分发器对象的信号状态

WaitListHead : 是链表头 连接了所有等待此对象的线程的KWAIT_BLOCK,将所有等在该锁上的线程全部记录下来,等锁被释放后就从这个列表中挑选一个(或者多个)线程唤醒。

线程和分发器对象之间的关系。首先,线程对象的KTHREAD 包含一个WaitBlockList ,它指向一个链表,链表上的每个节点代表了该线程正在等待的一个分发器对象。其次,分发器对象也包含一个链表,即DISPATCHER_HEADER的WaitListHead,链表上的每个节点代表了正在等待该对象的一个线程。两个链表节点都是等待块对象,数据类型为KWAIT_BLOCK:

typedef struct _KWAIT_BLOCK {

    LIST_ENTRY WaitListEntry; //挂入WaitListHead,一个对象所有的等待块   struct _KTHREAD *Thread;

    PVOID Object; //指要等待的对象

    struct _KWAIT_BLOCK *NextWaitBlock; //挂入线程链表,这个链表组成的线程等待的所有对象,单循环链表,头是KTHREAD.WaitBlockList

    USHORT WaitKey;//当Thead等待Object 成功从而被解除等待时的完成状态值

    UCHAR WaitType; //WaitAny 任意对象满足后唤醒 WaitAll要等到所有都满足

    UCHAR SpareByte;

} KWAIT_BLOCK, *PKWAIT_BLOCK, *PRKWAIT_BLOCK;

线程、分发器对象和等待块之间的关系:

 基于线程调度的同步机制

当一个线程通过某个Wait 函数进入等待状态时,线程对象的WaitBlockList 指向一个非空链表,链表上的每个等待块对象描述了它所等待的一个分发器对象。只要链表节点上的分发器对象时无信号状态,则该线程始终处于等待状态。

线程进入等待的状况,以KeWaitForSingleObject 为例。

NTSTATUS

KeWaitForSingleObject (

    __in PVOID Object,

    __in KWAIT_REASON WaitReason,

    __in KPROCESSOR_MODE WaitMode,

    __in BOOLEAN Alertable,

    __in_opt PLARGE_INTEGER Timeout

    )

--*/

{

    Hand = 0;

    Thread = KeGetCurrentThread();

    Objectx = (PKMUTANT)Object;   //object 是参数,指定了要等待的对象

    OriginalTime = Timeout;   //Timeout 是指针参数,指定了超时设置

    Timer = &Thread->Timer;  //Timer指向线程内置的定时器对象,已初始化

    WaitBlock = &Thread->WaitBlock[0];  //WaitBlock 和WaitTimer是局部指针

    WaitTimer = &Thread->WaitBlock[TIMER_WAIT_BLOCK]; //最后一个用于超时,TIMER_WAIT_BLOCK=3

    if (ReadForWriteAccess(&Thread->WaitNext) == FALSE) {

        goto WaitStart;

    }

    Thread->WaitNext = FALSE;

    /*InitializeWaitSingle();//以下代码系C宏InitializeWaitSingle 展开所得

#define InitializeWaitSingle() 宏展开*/                                           

    Thread->WaitBlockList = WaitBlock;     //初始化等待块                                

    WaitBlock->Object = Object;                                            

    WaitBlock->WaitKey = (CSHORT)(STATUS_SUCCESS);                         

    WaitBlock->WaitType = WaitAny;                                         

    Thread->WaitStatus = 0;                                                

    if (ARGUMENT_PRESENT(Timeout)) {     //如果有到期时间 ,初始化一个定时器                                

        KiSetDueTime(Timer, *Timeout, &Hand);                              

        DueTime.QuadPart = Timer->DueTime.QuadPart;                        

        WaitBlock->NextWaitBlock = WaitTimer;    //定时器等待块和要等待的等待块构成循环链表                            

        WaitTimer->NextWaitBlock = WaitBlock;                              

        Timer->Header.WaitListHead.Flink = &WaitTimer->WaitListEntry;      

        Timer->Header.WaitListHead.Blink = &WaitTimer->WaitListEntry;      

    } else {                                                               

        WaitBlock->NextWaitBlock = WaitBlock;                               

    }                                                                      

    Thread->Alertable = Alertable;                                         

    Thread->WaitMode = WaitMode;                                           

    Thread->WaitReason = (UCHAR)WaitReason;                                

    Thread->WaitListEntry.Flink = NULL;                                    

    StackSwappable = KiIsKernelStackSwappable(WaitMode, Thread);           

    Thread->WaitTime = KiQueryLowTickCount()

    do {

   //这个循环是为了能够在等待过程中处理APC

        Thread->Preempted = FALSE;

        if (Thread->ApcState.KernelApcPending &&

            (Thread->SpecialApcDisable == 0) &&

            (Thread->WaitIrql < APC_LEVEL)) {

    //irql提升到DISPATCH_LEVEL,其他cpu投递了一个apc,dispatcher数据库仍在锁定的情况

    //这样会请求一个中断派发掉apc

            KiUnlockDispatcherDatabase(Thread->WaitIrql);

        } else {  

    //MutantObject要特别对待 Mutant对应的是R3的mutex概念

            if (Objectx->Header.Type == MutantObject) {

                if ((Objectx->Header.SignalState > 0) ||

                    (Thread == Objectx->OwnerThread)) {

                    if (Objectx->Header.SignalState != MINLONG) {

                        KiWaitSatisfyMutant(Objectx, Thread);

                        WaitStatus = (NTSTATUS)(Thread->WaitStatus);

                        goto NoWait;

                    } else {

                        KiUnlockDispatcherDatabase(Thread->WaitIrql);

                        ExRaiseStatus(STATUS_MUTANT_LIMIT_EXCEEDED);

                    }

                }

            } else if (Objectx->Header.SignalState > 0) {//条件已经满足

                KiWaitSatisfyOther(Objectx);

                WaitStatus = (NTSTATUS)(0);

                goto NoWait;

            }

            TestForAlertPending(Alertable);   //检测是否可以被叫醒(用户态APC神马的) 更改等待状态 WaitStatus

    /* 以下是宏定义 如果被叫醒,则提前结束循环

    #define TestForAlertPending(Alertable) \

    if (Alertable) { \

        if (Thread->Alerted[WaitMode] != FALSE) { \

            Thread->Alerted[WaitMode] = FALSE; \

            WaitStatus = STATUS_ALERTED; \

            break; \

        } else if ((WaitMode != KernelMode) && \

                  (IsListEmpty(&Thread->ApcState.ApcListHead[UserMode])) == FALSE) { \

            Thread->ApcState.UserApcPending = TRUE; \

            WaitStatus = STATUS_USER_APC; \

            break; \

        } else if (Thread->Alerted[KernelMode] != FALSE) { \

            Thread->Alerted[KernelMode] = FALSE; \

            WaitStatus = STATUS_ALERTED; \

            break; \

        } \

    } else if (Thread->ApcState.UserApcPending & WaitMode) { \

        WaitStatus = STATUS_USER_APC; \

        break; \

    }

    */

            if (ARGUMENT_PRESENT(Timeout)) {// 检查定时是否到期

                if (KiCheckDueTime(Timer) == FALSE) {

                    WaitStatus = (NTSTATUS)STATUS_TIMEOUT;

                    goto NoWait;

                }

            }

            InsertTailList(&Objectx->Header.WaitListHead, &WaitBlock->WaitListEntry);// 等待块插入对象的派发头

  

            Queue = Thread->Queue;

            if (Queue != NULL) {

                KiActivateWaiterQueue(Queue);

            }

            Thread->State = Waiting;    //设置线程进入等待状态

            CurrentPrcb = KeGetCurrentPrcb();

            if (StackSwappable != FALSE) {

                InsertTailList(&CurrentPrcb->WaitListHead, &Thread->WaitListEntry);

            }

            ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL);

            KiSetContextSwapBusy(Thread);

            if (ARGUMENT_PRESENT(Timeout)) {

                KiInsertOrSignalTimer(Timer, Hand);

            } else {

                KiUnlockDispatcherDatabaseFromSynchLevel();

            }

            WaitStatus = (NTSTATUS)KiSwapThread(Thread, CurrentPrcb); //放弃执行,切换线程,当从这里返回之后意味着

                   //条件已经被满足,或者超时,或者有内核APC插入

            if (WaitStatus != STATUS_KERNEL_APC) {//只要不是内核apc插入,就可以返回了

                return WaitStatus;

            }

            if (ARGUMENT_PRESENT(Timeout)) {//否则这不是一次真的等到了,而是被内核apc搞醒的 重新计算时间 重新构造一个等待 循环

                Timeout = KiComputeWaitInterval(OriginalTime, //减少时间

                                                &DueTime,

                                                &NewTime);

            }

        }

WaitStart:

        Thread->WaitIrql = KeRaiseIrqlToSynchLevel();

        InitializeWaitSingle();

        KiLockDispatcherDatabaseAtSynchLevel();

    } while (TRUE);

    KiUnlockDispatcherDatabase(Thread->WaitIrql);

    return WaitStatus;

NoWait:

    KiUnlockDispatcherDatabaseFromSynchLevel();

    KiAdjustQuantumThread(Thread);

    return WaitStatus;

}

KeWaitForSingleObject 使用当前线程得WaitBlock 数组的第0项作为所等待对象的KWAIT_BLOCK;如果Timeout 不为NULL,则用WaitBlock的第3项作为定时器对象的KWAIT_BLOCK。While 循环在每次执行时都会检查自己等待的同步对象是否满足等待条件。

针对WaitAny的情况,当发现某一个同步对象有信号时就会直接结束,而针对WaitAll的情况,就需要所有对象都就绪。当线程满足等待条件时,会执行相应的Satisfy函数调整相应的信号量。即,当对象为事件(自动重置)时设置信号为0,阻止其他线程获取这个事件;当对象为信号量时,执行信号量的P操作。

如果线程不满足条件,则会准备好超时计时器,调度到等待状态,直到被唤醒。

注意到唤醒时的判断条件,STATUS_KERNEL_APC是一个特殊的条件,此时线程会重新检查自己等待的对象是否满足条件,否则会直接返回等待结果,结束等待。

因为如果不考虑等待多个对象,单个对象就绪就可以通知线程等待的结果,而需要等待多个对象时,单一对象的就绪并不能代表最终结果,这就要求线程重新检查自己等待的每个对象是否满足要求。此时通过STATUS_KERNEL_APC这个状态码就起到这样一个通知的作用。

线程解除等待:

它们调用KiWaitTest、KiWaitTestSynchronizationObject、KiWaitTestWithoutSideEffects 来测试一个等待是否已经满足。这些函数从分发器对象的WaitListHead出发,遍历KWAIT_BLOCK链表,逐个检查。然后,调用KiUnwaitThread和KiReadyThread 函数,使线程进入延迟的就绪状态。在KiReadThread 函数之前调用KiUnlinkThread ,将一个满足等待条件的线程从它的等待块链表中移除处理。

FORCEINLINE VOID FASTCALL KiUnlinkThread (IN PRKTHREAD Thread,IN LONG_PTR WaitStatus )

{

    PKQUEUE Queue;

    PKTIMER Timer;

    PRKWAIT_BLOCK WaitBlock;

    //设置线程的等待完成状态,将线程的等待块从相应对象的等待块链表中移除

    //同时也将线程从处理器的全局等待链表中移除

    Thread->WaitStatus |= WaitStatus;

    WaitBlock = Thread->WaitBlockList;

    do {

        RemoveEntryList(&WaitBlock->WaitListEntry);

        WaitBlock = WaitBlock->NextWaitBlock;

    } while (WaitBlock != Thread->WaitBlockList);

 

    if (Thread->WaitListEntry.Flink != NULL) {

        RemoveEntryList(&Thread->WaitListEntry);

    }

    //如果用于超时控制的线程定时器还在起作用,则取消线程定时器

    Timer = &Thread->Timer;

    if (Timer->Header.Inserted != FALSE) {

        KiRemoveTreeTimer(Timer);

    }

    //如果线程正在处理一个队列项,则递增该队列的当前活动线程的技术

    Queue = Thread->Queue;

    if (Queue != NULL) {

        Queue->CurrentCount += 1;

    }

    return;

}

进入等待和解除等待的典型过程,如图:

 基于线程调度的同步机制

WaitForSingleObject在拿到Event后会直接结束循环返回WaitStatus。

WaitForMultipleObjects在拿到Event后不会直接退出,而是去检查是否满足所有被等待对象的条件。

2  分发器对象

Windows中的分发器对象一览表

名称

KOBJECTS的类型定义

数据结构定义

变成有信号状态的条件

事件

EventNotificationObject或

EventSynchroizationObject

KEVENT

调用KeSetEvent设置状态

突变体

MutantObject

KMUTANT

线程释放了突变体对象

信号量

SemaphoreObject

KSEMAPHORE

信号量计数减一

进程

ProcessObject

KPROCESS

最后一个线程终止

线程

ThreadObject

KTHREAD

线程被终止

队列

QueueObject

KQUEUE

再队列中插入一项

GateObject

KGATE

调用

KeSignalGateBoostPriority

定时器

TimerNotificationObject或

TimerSynchronizationObject

KTIMER

设定的时间到期

事件对象和定时器对象分别由同步和通知两种类型。

(1)事件对象,分为事件通知对象和事件同步对象。

KEVENT只有一个DISPATCHER_HEADER对象,与事件对象有关的操作包括:

  1. KeInitializeEvent,初始化事件对象的类型和信号状态,以及头部的WaitListHead.
  2. KeClearEvent,清除事件对象的信号状态,使它变成无信号状态。
  3. KePulseEvent,设置事件对象为有信号状态,并调用KiWaitTest 以满足那些正在等待该对象的线程,然后设置对象的状态为无信号状态。
  4. KeResetEvent,设置事件对象为无信号状态,返回原来的状态。
  5. KeSetEvent,设置事件对象为有信号状态,若为事件通知对象,则调用KiWaitTestWithoutSideEffects,唤醒所以正在等待该对象的线程;若为同步对象,则调用KiWaitTestSynchronizationObject,检查它的KWAIT_BLOCK链表,唤醒一个正在等待该对象的线程,并将该对象设置为无信号状态。
  6. KeSetEventBoostPriority,仅适用于事件同步对象。若它的KWAIT_BLOCK链表第一个等待块为WaitAll,则设置对象为有信号状态,并调用KiWaitTestSynchronizationObject;若第一个等待块为WaitAny,则直接解除所对应线程的等待,不改变事件对象的状态。

(2)突变体对象,互斥体(mutex)概念的具体实现。

typedef struct _KMUTANT {

    DISPATCHER_HEADER Header;

    LIST_ENTRY MutantListEntry;

    struct _KTHREAD *OwnerThread;

    BOOLEAN Abandoned;

    UCHAR ApcDisable;

} KMUTANT, *PKMUTANT, *PRKMUTANT, KMUTEX, *PKMUTEX, *PRKMUTEX;

MutantListEntry:是个链表头,连着所有突变体

OwnerThread:正在拥有突变体的线程

Abandoned:该突变体对象是否已经被放弃不用

ApcDisable:是否禁用内核APC

与突变体对象有关的操作包括:

1.KeInitializeMutant,初始化一个KMUTANT对象,并指定当前线程释放为它的所有者

2.KeInitializeMUtex,初始化一个KMUTANT对象为有信号状态,所以它没有所有者线程。另外,设置ApcDisable 为1,不同于KeInitializeMutant。

3.KeReleaseMutant,释放一个KMUTANT对象,递增它的信号计数(即头部的SignalState)。

4.KeReleaseMutex,转发给KeReleaseMutant,其Abandoned参数为FALSE。

(3)信号量对象,Semaphore 具体实现

KSEMAPHORE再DISPATCHER_HEADER后加了Limit 整数,用于控制最多有多少个线程共享一个资源,即信号量计数器的最大值。

与信号量对象有关的操作包括:

  1. KeInitializeSemaphore ,初始化一个KSEMAPHORE对象,主要是它的计数值(SignalState)和最大计数值(Limit)。
  2. KeReaseSemaphore,释放一个KSEMAPHORE对象,并指定在它的计数器上增加多少量。

(4)队列对象,用于支持内存池的机制。

typedef struct _KQUEUE {

    DISPATCHER_HEADER Header;

    LIST_ENTRY EntryListHead;//链表头,包含了队列对象中待处理的项

    ULONG CurrentCount;//当前有多少个活动线程

    ULONG MaximumCount;//最多可以有多少个活动线程

    LIST_ENTRY ThreadListHead;//链表头,包含了当前已加入该队列对象的线程,每个节点都为线程THREAD对象的QueueListEntry成员

} KQUEUE, *PKQUEUE, *PRKQUEUE;

与队列对象有关的操作包括:

  1. KeInitializeQueue,初始化一个内核队列对象。
  2. KiInsertQueue,插入一个队列项。
  3. KeInsertQueue,插入一个队列项。其做法是,先提升IRQL至SYNCH_LEVEL,并锁住调度器数据库,然后调用KiInsertQueue,将参数插入到队列尾部。
  4. KeInsertHeadQueue,将参数项插入到队列头部。
  5. KeRemoveQueue,从队列对象中移除一项,如果队列对象的EntryListHead链表为空,则线程进入等待。
  6. KeRundownQueue,清除一个队列对象,它会移除掉所有与该队列对象相关联的线程。
  7. KiActivateWaiterQueue,当线程池中一个线程进入等待状态时,其它由于活动线程数量受限制而不得不等待的线程将有机会被执行。

(5)进程对象

在进程对象被创建之初,其状态为无信号;当进程结束时,其状态为有信号。

KeSetProcess 被用于设置进程对象的信号状态。

(6)线程对象

在线程对象被创建之初,其状态为无信号;当线程结束时,其状态为有信号。

(7)定时器对象

定时器对象分为定时器通知对象和定时器同步对象。相关操作包括:

  1. KeInitializeTimer/KeInitializeTimerEx ,初始化一个定时器对象。KeInitializeTimer调用KeInitializeTimerEx初始化一个通知类型的定时器。
  2. KeClearTimer ,清除一个定时器对象的信号状态。
  3. KeCancelTimer ,取消一个已经设定的定时器。
  4. KeSetTimer/KeSetTimerEx,设定一个定时器。
  5. KiSignalTimer ,定时器对象变为有信号状态。
  6. KiInsertOrSignalTimer,如果定时器对象尚未到期,则将它插入到系统的定时器链表中,否则,调用KiCompleteTimer ,KiSignalTimer完成定时器对象的到期处理。
  7. KiTimerExpiration ,定时器到期处理函数。