药剂流量:管道多个参数

问题描述:

我想写一个Flow,看起来像这样:药剂流量:管道多个参数

def perform(id) do 
    id 
    |> Flow.from_stage(max_demand: 10) 
    |> Flow.map(&download(&1)) 
    |> Flow.map(&process(&1)) 
    |> Flow.each(&cleanup(&1, &2)) 
    |> Flow.each(&respond(&1, &2)) 
    |> Flow.run 
end 

其他功能模块是这样的:

def download(id) do 
    Service.get_images(id) 
    id 
end 

def process(id) do 
    %Result{out: sys_out, status: _} = Porcelain.exec("osascript", 
    ["#{File.cwd!}/lib/script/test", "#{id}", "#{Application.get_env(:app, :server_env)}"] 
) 
    {id, sys_out} 
end 

def cleanup(id, files) do 
    for file <- String.split(files, " ") do 
    System.cmd("exiftool", ["#{File.cwd!}/#{file}"]) 
    end 
    {id, files} 
end 

def respond(id, files) do 
    files = String.split(files, " ") 
    System.cmd("curl", [" -d 'dowhatever=#{files[0]}&else=#{files[1]}' -k #{Application.get_env(:app, :api)}what/#{id}/notify"]) 
    :ok 
end 

但我不断收到此错误:

** (FunctionClauseError) no function clause matching in Flow.each/2 
... 
(app) lib/app/processor.ex:18: App.Processor.perform/1 

行18是&cleanup/2行。我在这里做错了什么?这感觉就像我没有返回正确的价值观莫名其妙......

process/1返回一个元组{id, sys_out},而cleanup/2声明具有元数。

遍历元组的列表意味着迭代器必须是元数1.你应该分解你的元组cleanup/1

- def cleanup(id, files) do 
+ def cleanup({id, files}) do 

,并调用它像:

Flow.each(&cleanup/1) 

同样适用到后续的respond

+0

啊,非常感谢你! –