F# - 需要帮助将其转换为使用线程池

F# - 需要帮助将其转换为使用线程池

问题描述:

我是F#的新手,我已经在下面的代码中列出了我在网上找到的各种示例以试图更好地理解如何使用它。目前,下面的代码从文件中读取机器列表并且ping每台机器。我必须将文件中的初始数组分成25个较小的机器数组,以控制并发动作的数量,否则需要花费很长时间才能映射出机器列表。我想能够使用线程池来管理线程,但我还没有找到一种方法来使其工作。任何指导都会很棒。我无法使这项工作:F# - 需要帮助将其转换为使用线程池

let creatework = FileLines|> Seq.map (fun elem -> ThreadPool.QueueUserWorkItem(new WaitCallback(dowork), elem)) 

下面是完整的代码:

open System.Threading 
open System 
open System.IO 

let filePath = "c:\qa\machines.txt" 

let FileLines = File.ReadAllLines(filePath) 

let count = FileLines.Length/25 

type ProcessResult = { exitCode : int; stdout : string; stderr : string } 

let executeProcess (exe,cmdline) = 
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline) 
    psi.UseShellExecute <- false 
    psi.RedirectStandardOutput <- true 
    psi.RedirectStandardError <- true 
    psi.CreateNoWindow <- true 
    let p = System.Diagnostics.Process.Start(psi, EnableRaisingEvents = true) 
    let output = new System.Text.StringBuilder() 
    let error = new System.Text.StringBuilder() 
    p.OutputDataReceived.Add(fun args -> output.AppendLine(args.Data)|> ignore) 
    p.ErrorDataReceived.Add(fun args -> error.AppendLine(args.Data) |> ignore) 
    p.BeginErrorReadLine() 
    p.BeginOutputReadLine() 
    p.WaitForExit() 
    { exitCode = p.ExitCode; stdout = output.ToString(); stderr = error.ToString() } 

let dowork machinename= 
    async{ 
     let exeout = executeProcess(@"c:\windows\system32\ping.exe", "-n 1 " + machinename) 
     let exelines = 
      if exeout.stdout.Contains("Reply from") then Console.WriteLine(machinename + " " + "REPLY") 
      elif exeout.stdout.Contains("Request timed out.") then Console.WriteLine(machinename + " " + "RTO") 
      elif exeout.stdout.Contains("Ping request could not find host") then Console.WriteLine(machinename + " " + "Unknown Host") 
      else Console.WriteLine(machinename + " " + "ERROR") 
     exelines 
     } 

printfn "%A" (System.DateTime.Now.ToString()) 

for i in 0..count do 
    let x = i*25 
    let y = if i = count then FileLines.Length-1 else (i+1)*25 
    printfn "%s %d" "X equals: " x 
    printfn "%s %d" "Y equals: " y 
    let filesection = FileLines.[x..y] 
    let creatework = filesection |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore 
    creatework 

printfn "%A" (System.DateTime.Now.ToString()) 
printfn "finished" 

UPDATE: 以下作品中的代码,并提供了一个框架,我想做的事情。 Tomas Petricek引用的链接确实具有使这项工作成功的代码位。我只需要确定哪个例子是正确的。在用Java编写的重复框架的3秒钟内,所以我认为我正朝着正确的方向前进。我希望下面的例子将是有益的任何人试图拧出F#的各种可执行文件:

open System 
open System.IO 
open System.Diagnostics 

let filePath = "c:\qa\machines.txt" 

let FileLines = File.ReadAllLines(filePath) 

type Process with 
    static member AsyncStart psi = 
     let proc = new Process(StartInfo = psi, EnableRaisingEvents = true) 
     let asyncExit = Async.AwaitEvent proc.Exited 
     async { 
      proc.Start() |> ignore 
      let! args = asyncExit 
      return proc 
     } 

let shellExecute(program : string, args : string) = 
    let startInfo = 
     new ProcessStartInfo(FileName = program, Arguments = args, 
      UseShellExecute = false, 
      CreateNoWindow = true, 
      RedirectStandardError = true, 
      RedirectStandardOutput = true) 
    Process.AsyncStart(startInfo) 

let dowork (machinename : string)= 
    async{ 
     let nonbtstat = "NONE" 
     use! pingout = shellExecute(@"c:\windows\system32\ping.exe", "-n 1 " + machinename) 
     let pingRdToEnd = pingout.StandardOutput.ReadToEnd() 
     let pingresults = 
      if pingRdToEnd.ToString().Contains("Reply from") then (machinename + " " + "REPLY") 
      elif pingRdToEnd.ToString().Contains("Request timed out.") then (machinename + " " + "RTO") 
      elif pingRdToEnd.ToString().Contains("Ping request could not find host") then (machinename + " " + "Unknown Host") 
      else (machinename + " " + "PING_ERROR") 
     if pingresults.ToString().Contains("REPLY") then 
      use! nbtstatout = shellExecute(@"c:\windows\system32\nbtstat.exe", "-a " + machinename) 
      let nbtstatRdToEnd = nbtstatout.StandardOutput.ReadToEnd().Split('\n') 
      let nbtstatline = Array.tryFind(fun elem -> elem.ToString().Contains("<00> UNIQUE  Registered")) nbtstatRdToEnd 
      return Console.WriteLine(pingresults + nbtstatline.Value.ToString()) 
     else return Console.WriteLine(pingresults + " " + nonbtstat) 
     } 

printfn "%A" (System.DateTime.Now.ToString()) 

let creatework = FileLines |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore 
creatework 

printfn "%A" (System.DateTime.Now.ToString()) 
printfn "finished" 
+0

相反炮击了对'的ping.exe'有你使用[Ping类]考虑(http://msdn.microsoft.com/en-us/library/system.net.networkinformation.ping.aspx)而不是?通过一些工作,你可以将'SendAsync'方法和'PingCompleted'事件整合到一个F#异步工作流程中...... – 2012-03-21 21:41:31

+0

我之所以没有这样做是因为运行ping只是为了测试。我对F#的兴趣是通过并行运行线程来利用多个CPU的能力。我工作的环境使用超过15年的无法更新的可执行文件,我想找到一种方法来利用多个核心,同时仍然保持向后兼容。 – 2012-03-22 18:11:49

与您的代码的主要问题是:executeProcess是同步的功能,需要一个很长的时间来运行(它运行ping.exe进程并等待其结果)。一般的规则是线程池中的任务不应该被阻塞很长时间(因为它们会阻塞线程池线程,这意味着线程池不能有效地调度其他工作)。

我认为你可以通过使executeProcess异步来解决这个问题。而不是调用WaitForExit(该块),则可以使用Async.AwaitEvent等待Exitted事件:

let executeProcess (exe,cmdline) = async { 
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline) 
    psi.UseShellExecute <- false 
    // [Lots of stuff omitted] 
    p.BeginOutputReadLine() 
    let! _ = Async.AwaitEvent p.Exited 
    return { exitCode = p.ExitCode 
      stdout = output.ToString(); stderr = error.ToString() } } 

这应该解除线程的线程池,这样你就可以从所有的URL使用Async.Parallel输入数组,无需任何手动调度。

编辑作为@desco在评论中指出,如果达到AwaitEvent前行的进程退出(之前可能会错过的事件)以上是不完全正确。为了解决这个问题,你需要使用Event.guard功能,这在本SO问题讨论:

+1

Async.AwaitEvent p。退出可能会永久挂起,如果进程将很快退出(基本上是在调用AwaitEvent之前) – desco 2012-03-21 17:28:51

+0

我做出了有关更改执行功能的建议更改,但返回似乎并未从命令中捕获结果。搜索“exeout”转换为字符串似乎没有返回。它也运行得如此之快,我不相信它现在运行ping.exe。 – 2012-03-21 19:38:03

+0

@desco - 这是之前讨论过的,但我找不到前面的问题。感谢提醒我 - 我添加了一个链接到答案,这应该解释如何解决这个问题。 – 2012-03-21 23:14:03