12.互斥量(mutex)

    当信号量(Semaphore)的计数功能不再需要,信号量简化之后就成为一种新的变量互斥量(mutex)。互斥量在处理共享资源和代码之间的互斥访问方面非常有用。互斥量实现起来简单高效,这一点对于用户空间的线程库非常有用。

    互斥是那种只有两种状态,但每次只能处在其中一种状态的变量。这两种状态分别是锁定和非锁定状态。因此,只需要一个比特就能表示互斥量的两种状态,例如,0代表非锁定和1表示锁定。互斥量有两个相关的操作:mutex_lock和mutex_unlock。

    mutex_lock:当线程需要进入临界区时,它调用mutex_lock操作,若当前mutex未锁定,则mutex_lock调用成功,若mutex已经锁定,则调用线程会阻塞,直到当前获得mutex锁的那个线程离开临界区,使得mutex解锁。

    mutex_unlock:当线程离开临界区,需要解除对mutex的锁定,这时候线程调用mutex_unlock。mutex很简单,因此在用户空间可以很简单的通过TSL和XCHG指令实现,其原理的代码如下:


  1. mutex_lock: 
  2.     TSL REGISTER,MUTEX 
  3.     CMP REGISTER,#0 
  4.     JZE ok          | 如果mutex为0,则证明mutex目前未锁定,于是返回 
  5.     CALL thread_yield 
  6.     JMP mutex_lock      | 再次尝试 
  7. ok: RET 
  8.  
  9. mutex_unlock: 
  10.     MOVE MUTEX,#0       | 解锁:将mutex设置为0 
  11.     RET 

    这个代码和前面讲到的enter_region的代码非常类似,但是却有一个很关键的区别:在enter_region中,如果进程暂时不能进入临界区,那么他一直测试条件,看能否进入,直到时间片用完为止。

 

    而在用户线程中,根本没有时钟中断来使得运行过长的线程停下来,结果就是一个使用上述的忙等待方式企图获取锁的线程会时钟循环,因为别的获得锁的线程根本没机会运行。

    而在mutex_lock中则不同,如果当前mutex锁定,他就阻塞,让调度机调度其他的线程运行,从而更加充分的利用CPU资源。

    由于thread_yield仅仅是在用户空间对线程调度器的调用,因此,不需要内核支持,这样实现起来是很简单的。

    除了mutex_lock和mutex_unlock之外,可能还需要其他的一些特性,例如mutex_trylock,该调用会尝试获得锁,但是如果不成功就以失败状态返回,然后该干嘛干嘛,至少并不阻塞。

    由于线程之间共享一个内存空间,所以对于多个线程来说,共享mutex不是什么难事。但是之前提到的Peterson方案、信号量方案等主要是对于进程来说的。他们之间并不共享一份内存空间。那怎么保证多个进程之间共享变量,例如之前提到的turn呢?至少有这么几种办法:

 

  • 将共享数据类型,例如信号量,存储在内核,然后只允许通过系统调用来访问
  • 大部分现代操作系统(Windows和Unix)都支持使多个进程共享一部分内存空间
  • 实在不行多个进程还能共享文件吧

    如果说多个进程共享大部分内存空间,那么进程和线程的区别就不是那么的明显,但还是存在的。比如说,两个进程还是有自己单独的打开的文件等独有的属性,但这些属性对于多个线程来说是共享的。

 

    13.Pthread库中的互斥量

    Pthread库中同步的方法的原理还是互斥量。如果一个线程需要进入临界区,那么它需要尝试锁住相关的互斥量。如果互斥量未锁定,它进入临界区并且立马锁定互斥量,并借此防止别的线程进入临界区。

    Pthread的几个主要的调用如下所示,他们的作用都在注释中显示:


  1. Pthread_mutex_init      //创建mutex 
  2. Pthread_mutex_destroy       //销毁mutex 
  3. Pthread_mutex_lock      //锁定mutex 
  4. Pthread_mutex_trylock       //尝试锁定mutex 
  5. Pthread_mutex_unlock        //解锁mutex 

    Pthread库在实现同步方面还需要使用到另外一种机制:条件变量(Condition Variable)。条件变量往往和互斥量一起使用来确保同步。回想一下生产者-消费者问题。当生产者先检查缓冲区是否已经满了,这可以通过mutex实现,而不需要其他线程的干扰,但是如果发现已经满了,那么就需要一种机制让它阻塞以及之后被唤醒。这就需要通过条件变量来实现了。

 

    与条件变量相关的库调用如下所示:


  1. Pthread_cond_init       //创建条件变量 
  2. Pthread_cond_destroy        //销毁条件变量 
  3. Pthread_cond_wait       //阻塞并且等待信号 
  4. Pthread_cond_signal     //发信号唤醒另一个线程 
  5. Pthread_cond_broadcast      //广播唤醒多个线程 

    互斥量和条件变量总是一起被使用。使用的基本模式如下:一个线程先锁定mutex,然后等待条件变量(换句话说,就是它需要的资源)知道另一个线程接下来可以给它发信号以便继续。

 

    还是以只有一个单元的缓冲区生产者-消费者问题为例看看Pthread的使用:


  1. #include <stdio.h> 
  2. #include <pthread.h> 
  3. #define MAX 1000000000 
  4. pthread_mutex_t the_mutex; 
  5. pthread_cond_t condc,condp; 
  6. int buffer = 0; 
  7.  
  8. void *producer(void *ptr) { 
  9.     int i; 
  10.     for(i = 0; i <= MAX; i++) { 
  11.         pthread_mutex_lock(&the_mutex); 
  12.         while(buffer != 0) { 
  13.             pthread_cond_wait(&condp,&the_mutex); 
  14.         } 
  15.         buffer = 1; 
  16.         pthread_cond_signal(&condc); 
  17.         pthread_mutex_unlock(&the_mutex); 
  18.     } 
  19.     pthread_exit(0); 
  20.  
  21. void *consumer(coid *ptr) { 
  22.     int i; 
  23.     for(i = 0; i <= MAX; i++) { 
  24.         pthread_mutex_lock(&the_mutex); 
  25.         while(buffer == 0) { 
  26.             pthread_cond_wait(&condc,&the_mutex); 
  27.         } 
  28.         buffer = 0; 
  29.         pthread_cond_signal(&condp); 
  30.         pthread_mutex_unlock(&the_mutex); 
  31.     } 
  32.     pthread_exit(0); 
  33.  
  34. int main(int argc,char *argv) { 
  35.     pthread_t pro,con; 
  36.     pthread_mutex_init(&the_mutex,0); 
  37.     pthread_cond_init(&condc); 
  38.     pthread_cond_init(&condp); 
  39.     pthread_create(&con,0,consumer,0); 
  40.     pthread_create(&pro,0,producer,0); 
  41.     pthread_join(pro,0); 
  42.     pthread_join(con,0); 
  43.     pthread_cond_destroy(&condc); 
  44.     pthread_cond_destroy(&condp); 
  45.     pthread_mutex_destroy(&the_mutex); 

    14.监视器(Monitor)

      在上一篇博客中提到了那个信号量的例子。对于生产者来说,如果将两个down操作的顺序颠倒一下,后果就会很严重。我们先把那个代码回顾一遍:


  1. #defone N 100   /*buffer的大小*/ 
  2. typedef int semaphore; 
  3. semaphore mutex = 1; 
  4. semaphore full = 0; 
  5. semaphore empty = N; 
  6.  
  7. void producer(void) { 
  8.     int item; 
  9.     while(TRUE) { 
  10.         item = produce_item(); 
  11.         down(&empty);       //看这里 
  12.         down(&mutex);       //看这里 
  13.         insert_item(item); 
  14.         up(&mutex);     //看这里 
  15.         up(&full); 
  16.     } 
  17.  
  18. void consumer(void) { 
  19.     int item; 
  20.     wile(TRUE) { 
  21.         down(&full); 
  22.         down(&mutex); 
  23.         item = remove_item(); 
  24.         up(&mutex); 
  25.         up(&empty); 
  26.         consume_item(item); 
  27.     } 

    如果这两个操作颠倒了,换句话说生产者其实还没生产出来就跑去通知消费者了。假设这时候buffer已经满了,那么生产者会阻塞,下面的up(&mutex)操作根本就没执行。好了,轮到消费者了,消费者上来先down(&full),也就是消费掉一个消息,接着要down(&mutex)。但是这时候一看,mutex已经是0,然后一直等,等到晕倒了(阻塞)。于是乎,大家就一直等下去。这就是死锁。

 

    所以说,在使用信号量解决类似的问题上,程序猿必须非常小心。一不注意,就会出事。鉴于这样的情况,又有人提出了新的办法,他们是Brinch Hansen和Hoare。他们提出了一种高层的同步原语——监视器(monitor)。监视器是一组过程(procedures)、变量(variables)、数据结构打包在一个特殊的模块或者包中。进程可以随时调用里面的过程,但是不能直接访问过程暴露给进程之外的内部数据结构。下面是一个监视器的例子,它是用一种假设的语言写的。


  1. monitor example 
  2.     integer i; 
  3.     condition c; 
  4.     procedure producer() 
  5.          
  6.     end
  7.  
  8.     procedure consumer() 
  9.  
  10.     end
  11. end monitor; 

    监视器有一个明显的特点就是同一时刻只允许一个进程进入其中。由于监视器属于语言本身的一部分,因此监视器保证互斥往往是由语言的编译器来实现的。这样的话程序员可以基本不关注监视器如何安排互斥。但是还有一个问题就是,当进程无法继续时如何使它阻塞。这个问题通过条件变量的引进来解决。条件变量有两个相关的操作:wait和signal。当一个监视器过程发现自己无法运行,那么他对某个条件变量进行一次wait操作,例如,对full这个条件变量进行wait。wait操作会使得调用线程阻塞,从而允许别的进程进入监视器。

 

    而另外一个进程,也就是消费者,可以通过给生产者需要的那个条件变量发信号来使得生产者从阻塞中醒来。当执行完signal之后该怎么继续下去呢?有三种方案:

 

  • 让被唤醒的进程执行;
  • 执行了signal操作的进程必须立刻离开监视器
  • 让执行signal操作的进程继续进程,等该进程离开监视器后再由被唤醒的进程进入监视器。

    这里的条件变量并不是计数器,它不会保存数值以便后面使用,这一点与信号量是不同的。如果对一个没有被wait的条件变量执行signal操作,那么这个signal将会永远消失。因此必须保证wait必须在signal之前。

 

    使用监视器处理生产者、消费者问题的框架如下面的代码:


  1. monitor ProducerConsumer 
  2.     condition full,empty; 
  3.     integer count
  4.  
  5.     procedure insert(item:integer
  6.     begin 
  7.         if count = N then wait(full); 
  8.         insert_item(item); 
  9.         count:=count+1; 
  10.         if count = 1 then signal(empty); 
  11.     end
  12.  
  13.     function remove:integer 
  14.     begin  
  15.         if count = 0 then wait(empty) 
  16.         remove = remove_item; 
  17.         count = count - 1; 
  18.         if count = N - 1 then signal(full); 
  19.     end
  20.  
  21.     count := 0; 
  22. end monitor; 
  23.  
  24. procedure producer 
  25. begin  
  26.     while true do 
  27.     begin 
  28.         item = produce_item; 
  29.         ProducerConsumer.insert(item); 
  30.     end
  31. end
  32.  
  33. procedure consumer 
  34. begin 
  35.     while true do 
  36.         item = ProducerConsumer.remove; 
  37.         consume_item(item) 
  38. end

    实际上,上面是采用一种假象的语言来模拟的。实际上Java语言采用了类似的设计思路,实现同步synchronized。一旦某个线程正在执行synchronized方法,此时,该对象中的其他线程不允许调用其中的任何同步方法。下面看看Java中如何解决生产者消费者问题:


  1. package net.jerryblog.concurrent; 
  2.  
  3. public class 
  4. ProducerConsumer { 
  5.     static final int N = 100;              // constant giving the buffer size 
  6.     static producer p = new producer();    // instantiate a new producer thread 
  7.     static consumer c = new consumer();    // instantiate a new consumer thread 
  8.     static our_monitor mon = new our_monitor(); // instantiate a new monitor 
  9.   
  10.     public static void main(String args[ ]) { 
  11.         p.start();      // start the producer thread 
  12.         c.start();      // start the consumer thread 
  13.     } 
  14.   
  15.     static class producer extends Thread { 
  16.         public void run( ) {   // run method contains the thread code 
  17.              int item; 
  18.              while(true) {     // producer loop 
  19.                  item = produce_item(); 
  20.                  mon.insert(item); 
  21.             } 
  22.         } 
  23.         private int produce_item(){ 
  24.              
  25.              
  26.         }  // actually produce 
  27.     } 
  28.   
  29.     static class consumer extends Thread { 
  30.         public void run() {    // run method contains the thread code 
  31.              int item; 
  32.              while(true) {     // consumer loop 
  33.                  item = mon.remove(); 
  34.                  consume_item (item); 
  35.              } 
  36.         } 
  37.         private void consume_item (int item) {  }     // actually consume 
  38.     } 
  39.   
  40.     static class our_monitor {                 // this is a monitor 
  41.         private int buffer[ ] = new int[N]; 
  42.         private int count = 0, lo = 0, hi = 0// counters and indices 
  43.   
  44.         public synchronized void insert (int val) { 
  45.              if(count == N) go_to_sleep();     //if the buffer is full, go to sleep 
  46.              buffer [hi] = val;                // insert an item into the buffer 
  47.              hi = (hi + 1) % N;                // slot to place next item in 
  48.              count = count + 1;                // one more item in the buffer now 
  49.              if(count == 1) notify( );         // if consumer was sleeping, wake it up 
  50.         } 
  51.   
  52.         public synchronized int remove( ) { 
  53.              int val; 
  54.              if(count == 0) go_to_sleep( );    // if the buffer is empty, go to sleep 
  55.              val = buffer [lo];                // fetch an item from the buffer 
  56.              lo = (lo + 1) % N;                // slot to fetch next item from 
  57.              count = count - 1;                // one few items in the buffer 
  58.              if(count == N - 1) notify();      // if producer was sleeping, wake it up 
  59.              return val; 
  60.         } 
  61.         private void go_to_sleep() {  
  62.             try
  63.                 wait( ); 
  64.             }catch (InterruptedException exc) { 
  65.                  
  66.             } 
  67.          
  68.         } 
  69.     } 

    15.消息传递机制

    消息传递是另外一种进程间通信的方式。消息传递机制使用两个原语:send和receive。这一点和信号量很类似。send和receive是系统调用,而不是语言层次的设计。


  1. send(destination,&message); 
  2. receive(source,&message); 

    前面一个调用将消息发送给指定的接收者,后者从一个发送者处接收消息。如果没有消息则阻塞或者说返回错误代码。

 

    消息传递系统再设计上有一些信号量或者监视器等所不曾碰到的问题,尤其是当通信进程处在不同的机器上。比如,当消息在网络传输中丢失了。因此一旦接受者接收到了消息,必须返回一个回执(Acknowledgement)。如果说发送者没有收到这个回执消息,那么他会重新发送。

    那么这又有一个新的问题,怎么区别发送者发送的两次消息是一个消息,只不过由于没有收到回执从新发送了一次?因此给消息一个唯一的***,如果接受者接收到的两次消息是一个***,那证明是从新发送。
    使用消息传递机制实现的生产者消费者问题代码如下:


  1. #define N 100     /* number of slots in the buffer */ 
  2. void producer(void
  3.     int item; 
  4.     message m;    /* message buffer */ 
  5.   
  6.     while (TRUE) { 
  7.         item = produce_item( );       /* generate something to put in buffer */ 
  8.         receive(consumer, &m);        /* wait for an empty to arrive */ 
  9.         build_message (&m, item);     /* construct a message to send */ 
  10.         send(consumer, &m);           /* send item to consumer */ 
  11.     } 
  12.   
  13. void consumer(void) { 
  14.     int item, i; 
  15.     message m; 
  16.   
  17.     for (i = 0; i < N; i++) send(producer, &m);  /* send N empties */ 
  18.     while (TRUE) { 
  19.         receive(producer, &m);         /* get message containing item */ 
  20.         item = extract_item(&m);       /* extract item from message */ 
  21.         send(producer, &m);            /* send back empty reply */ 
  22.         consume_item(tem);             /* do something with the item */ 
  23.     } 

    16.屏障(Barrier)
    最后要说的一种同步机制是针对一组进程设计的。假设这样的一组情况:有的应用程序会被分成很多不同的阶段(Phase),并且有一个规定,必须所有的进程都执行完一个阶段才能进入下一个阶段。这样就可以通过在每个阶段的末尾放置一个屏障来实现。如图:

《现代操作系统》读书笔记之——进程间通信3

    举个例子:假设需要一个时间段测量一次一根铁轨的不同段的温度,存储在一个数组中,每个一段时间用来做科学计算一次。假设这根铁轨特别长,因此,分成若干段,每段的温度由一个进程收集记录。所以,每次提交所有温度之前必须所有进程都统计完这阶段的温度。这时候急需要屏障啦。

 

~~~~~完~~~~~