从命名管道连续读取

从命名管道连续读取

问题描述:

我想知道为了使用golang从命名管道连续读取其他选项。我当前的代码依赖于在gorutine中运行的无限循环;但帽子保持100%使用率的一个CPU。从命名管道连续读取

func main() { 
.... 

var wg sync.WaitGroup 
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600) 
defer fpipe.Close() 

f, _ := os.Create("dump.txt") 
defer f.Close() 
var buff bytes.Buffer 

wg.Add(1) 
go func() { 
     for { 
      io.Copy(&buff, fpipe) 
      if buff.Len() > 0 { 
       buff.WriteTo(f) 
      } 
     } 
    }() 

    wg.Wait() 
} 
+2

'io.Copy' “连续读”,不要把它放在一个循环。 – JimB

+0

@JimB我想我应该提到写作方不会连续写入命名管道。如果我完全依赖io.Copy,那么一旦EOF达到,goroutine就会终止。 –

当没有写入器时,命名管道读取器将接收EOF。此代码之外的解决方案是确保始终有一个写入器进程持有文件描述符,尽管它不需要写入任何内容。

在Go程序中,如果您想等待新作家,您将不得不在poll循环中轮询io.Reader。你当前的代码是通过一个繁忙的循环来完成的,它将占用1个cpu核心的100%。添加一个睡眠和的方式来返回其他错误会解决此问题:

for { 
    err := io.Copy(&buff, fpipe) 
    if buff.Len() > 0 { 
     buff.WriteTo(f) 
    } 

    if err != nil { 
     // something other than EOF happened 
     return 
    } 

    time.Sleep(100 * time.Millisecond) 
} 
+0

出于好奇,为什么runtime.Gosched不做这项工作? –

+0

@ Benjamin.E:你不需要'runtime.GoSched',因为你已经屈服于其他goroutines。函数/方法调用和通道操作将会产生效果,所以'io.Copy'就足够了。问题在于,当没有任何东西时,你会尽可能快地旋转。读者不会在亚毫秒的时间内进出,所以没有理由很快重试。 – JimB

+0

@JimB请在我的回答中看到我对你的解决方案的批评。 –

介绍

正如已经写好,如果没有作家们留下了命名管道的读者将收到EOF。

但是我发现@ JimB的解决方案不是最佳少:

  1. 命名管道的最大容量(65KB,IIRC),这很可能得到填补了100毫秒的睡眠时间内。当缓冲区被填充时,所有编写者都会因为没有理由而阻塞。
  2. 如果重启发生,平均会丢失50ms的数据。再次,没有理由。
  3. 如果你想使用静态缓冲区进行复制,io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)将是更好的解决方案,恕我直言。但是这甚至不是必需的,因为io.Copy(或底层实现)实际上分配了32kB的缓冲区。

我的做法

一个更好的解决办法是等待写入发生并立即命名管道的内容复制到目标文件。在大多数系统上,文件系统事件存在某种通知。包github.com/rjeczalik/notify可用于访问我们感兴趣的事件,因为写事件在大多数重要操作系统上跨平台工作。另一个对我们来说很有意思的事件是删除了命名管道,因为我们没有任何可读的东西。

因此,我的解决办法是:

package main 

import (
    "flag" 
    "io" 
    "log" 
    "os" 

    "github.com/rjeczalik/notify" 
) 

const (
    MAX_CONCURRENT_WRITERS = 5 
) 

var (
    pipePath string 
    filePath string 
) 

func init() { 
    flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from") 
    flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file") 
    log.SetOutput(os.Stderr) 
} 

func main() { 
    flag.Parse() 

    var p, f *os.File 
    var err error 
    var e notify.EventInfo 

    // The usual stuff: checking wether the named pipe exists etc 
    if p, err = os.Open(pipePath); os.IsNotExist(err) { 
     log.Fatalf("Named pipe '%s' does not exist", pipePath) 
    } else if os.IsPermission(err) { 
     log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err) 
    } else if err != nil { 
     log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err) 
    } 
    // Yep, there and readable. Close the file handle on exit 
    defer p.Close() 

    // Do the same for the output file 
    if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) { 
     log.Fatalf("File '%s' does not exist", filePath) 
    } else if os.IsPermission(err) { 
     log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err) 
    } else if err != nil { 
     log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err) 
    } 
    // Again, close the filehandle on exit 
    defer f.Close() 

    // Here is where it happens. We create a buffered channel for events which might happen 
    // on the file. The reason why we make it buffered to the number of expected concurrent writers 
    // is that if all writers would (theoretically) write at once or at least pretty close 
    // to each other, it might happen that we loose event. This is due to the underlying implementations 
    // not because of go. 
    c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS) 

    // Here we tell notify to watch out named pipe for events, Write and Remove events 
    // specifically. We watch for remove events, too, as this removes the file handle we 
    // read from, making reads impossible 
    notify.Watch(pipePath, c, notify.Write|notify.Remove) 

    // We start an infinite loop... 
    for { 
     // ...waiting for an event to be passed. 
     e = <-c 

     switch e.Event() { 

     case notify.Write: 
      // If it a a write event, we copy the content of the named pipe to 
      // our output file and wait for the next event to happen. 
      // Note that this is idempotent: Even if we have huge writes by multiple 
      // writers on the named pipe, the first call to Copy will copy the contents. 
      // The time to copy that data may well be longer than it takes to generate the events. 
      // However, subsequent calls may copy nothing, but that does not do any harm. 
      io.Copy(f, p) 

     case notify.Remove: 
      // Some user or process has removed the named pipe, 
      // so we have nothing left to read from. 
      // We should inform the user and quit. 
      log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath) 
     } 
    } 
} 
+0

只是为了重申一下,管道的缓冲区就是一个缓冲区,无论读取过程如何,它都会接受数据。无法保证缓冲区在发生故障时完全被读取,并告诉自己,否则只是诱人的命运。您必须接受在没有确认时总会有数据丢失的可能性。然而,这确实使响应更具响应性(以增加复杂性为代价)。 – JimB

+0

@JimB关闭,你可以释放数据,但是它不太可能,并且这个解决方案不太可能阻止作者。 –

+0

我也在考虑依靠通知机制来避免混乱的数据。 –