如何使用任何事件异步www-mechanize

问题描述:

我一直在做这个主题的大量研究,虽然有一些问题在那里,我真的很难理解如何正确地做异步使用AnyEvent和www-mechanize进行编程。我试图坚持机械化,因为它有一个干净的界面,并有内置的功能,我期望做:(像获取网站的所有图像等)。如果没有可靠/好的方法去做我想做的事,那么我会开始看AnyEvent :: HTTP,但是我想我会先向这个方向提问。如何使用任何事件异步www-mechanize

我是AnyEvent编程的新手,但在回调之前已经做了大量的perl和javascript/jquery异步调用。这些对我来说很有意义,但AnyEvent + Mech只是不会为我点击。

这是我正在处理的代码,它从上游队列中提取URL。给网址,我想得到一个说,拉一个页面上的所有图像,然后异步。抓住所有图像。

所以伪代码将是这个样子:

  • 抢网址从队列
  • 获取页面
  • 得到所有的img URL链接
  • 做的IMG网址,许多异步调用(店imgs例如在后端)

我读过了,我不能(在研究错误之后)阻塞AnyEvent回调。我如何构建我的程序以进行异步调用而不会阻塞?

AE事件只能在AE识别功能阻塞时处理,所以我使用的是LWP::Protocol::AnyEvent::http。它使用AnyEvent :: HTTP替换了LWP(Net:HTTP)的普通HTTP后端,这是AE感知的。

工人被像创建:

my Worker->new(upstream_job_url => "tcp://127.0.0.1:5555', run_on_create => 1); 

异步部分子_recv_msg这就要求_proc_msg。

我已经有一个AnyEvent循环看着ZeroMQ套接字每ZeroMQ perl的绑定文件...

任何帮助,非常感谢!

代码:

package Worker; 

use 5.12.0; 

use Moose; 
use AnyEvent; 
use LWP::Protocol::AnyEvent::http; 

use ZMQ::LibZMQ3; 
use ZMQ::Constants qw/ZMQ_PUSH ZMQ_PULL ZMQ_POLLIN ZMQ_FD/; 

use JSON; 
use WWW::Mechanize; 
use Carp; 
use Coro; 


has 'max_children' => (
    is => 'rw', 
    isa => 'Int', 
    required => 1, 
    default => sub { 0 } 
); 

has 'upstream_job_url' => (
    is => 'rw', 
    isa => 'URI', 
    required => 1, 
); 

has ['uri','sink_url'] => (
    is => 'rw', 
    isa => 'URI', 
    required => 0, 
); 

has 'run_on_create' => (
    is => 'rw', 
    isa => 'Bool', 
    required => 1, 
    default => sub { 1 } 
); 

has '_receiver' => (
    is => 'rw', 
    isa => 'ZMQ::LibZMQ3::Socket', 
    required => 0 
); 

sub BUILD { 
    my $self = shift; 
    $self->start if $self->run_on_create; 
} 

sub start 
{ 
    my $self = shift; 
    $self->_init_zmq(); 

    my $fh = zmq_getsockopt($self->_receiver, ZMQ_FD); 
    my $w; $w = AnyEvent->io(fh => $fh, poll => "r", cb => sub { $self->_recv_msg }); 
    AnyEvent->condvar->recv; 
} 

sub _init_zmq 
{ 
    my $self = shift; 
    my $c = zmq_init() or die "zmq_init: $!\n"; 
    my $recv = zmq_socket($c, ZMQ_PULL) or die "zmq_socket: $!\n"; 
    if(zmq_connect($recv, $self->upstream_job_url) != 0) { 
     croak "zmq_connect: $!\n"; 
    } 
    $self->_receiver($recv); 
} 

sub _recv_msg 
{ 
    my $self = shift; 
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver))) { 
     my $msg = JSON::from_json($message, {utf8 => 1}); 
     $self->uri(URI->new($msg->{url})); 
     $self->_proc_msg; 
    } 
} 

sub _proc_msg 
{ 
    my $self = shift; 
    my $c = async { 
     my $ua = WWW::Mechanize->new; 
     $ua->protocols_allowed(['http']); 
     print "$$ processing " . $self->uri->as_string . "... "; 
     $ua->get($self->uri->as_string); 
     if ($ua->success()) { 
      say $ua->status . " OK"; 
     } else { 
      say $ua->status . " NOT OK"; 
     } 
    }; 
    $c->join; 
} 

1; 

正如你所看到的,我是想科罗在_proc_msg,我试着只是做机甲的电话,但得到一个错误

AnyEvent::CondVar: recursive blocking wait attempted at lib/Worker.pm line 91. 

因为$机甲仍阻止回调。我不确定如何正确执行回拨中的机械通话。


在ikegami的请求,我已经添加了驱动程序,发送网址。出于测试目的,我只需阅读RSS提要,并将链接发送给工作人员尝试处理。我对任何事件的基本结构都很感兴趣,但我总是乐于帮助程序。下面是驱动程序代码:

#!/usr/local/bin/perl 

use strict; 
use warnings; 
use v5.12.0; 

use lib './lib'; 

use Config::General; 
use Getopt::Long; 
use Carp; 
use AnyEvent; 
use AnyEvent::Feed; 
use Parallel::ForkManager; 
use ZMQ::LibZMQ3; 
use ZMQ::Constants qw(ZMQ_PUSH ZMQ_PULL); 
use Worker; 

# Debug 
use Data::Dumper; 
$Data::Dumper::Deparse = 1; 

my $config_file = "feeds.cfg"; 

GetOptions(
    "--config|c" => \$config_file, 
    "--help|h" => sub { usage(); exit(0); } 
); 

sub usage() 
{ 
    say "TODO"; 
} 

$SIG{INT} = sub { croak; }; $SIG{TERM} = sub { croak; }; 
$SIG{CHLD} = 'IGNORE'; 

my $conf = Config::General->new($config_file) or croak "Couldn't open config file '$config_file' $!\n"; 

my %config = $conf->getall(); 
my @readers =(); 
my @feeds = load_feeds(\%config); 

my $mgr = Parallel::ForkManager->new($config{'max_download_children'}) or croak "Can't create fork manager: $!\n"; 
my $context = zmq_init() or croak "zmq_init: $!\n"; 
my $sender = zmq_socket($context, ZMQ_PUSH) or die "zmq_socket: $!\n"; 

foreach my $feed_cfg (@feeds) { 
    my $reader = AnyEvent::Feed->new(url => delete $feed_cfg->{url}, %$feed_cfg); 
    push(@readers, $reader); # save, don't go out of scope 
} 

# Fork Downloader children. These processes will look for incoming data 
# in the img_queue and download the images, storing them in nosql 
for (1 .. $config{'max_download_children'}) { 
    my $pid = $mgr->start; 
    if (!$pid) { 
     # Child 
     my $worker = Worker->new({ 
      upstream_job_url => URI->new('tcp://127.0.0.1:5555') 
     }); 
     $mgr->finish; 
     say "$$ exiting."; 
     exit(0); 
    } else { 
     # Parent 
     say "[forked child $pid] my pid is $$"; 
    } 
} 

if (zmq_bind($sender, 'tcp://127.0.0.1:5555') < 0) { 
    croak "zmq_bind: $!\n"; 
} 

# Event loop 
AnyEvent->condvar->recv; 

sub load_feeds 
{ 
    my $conf = shift; 
    my @feeds =(); 
    foreach my $feed (keys %{$conf->{'feeds'}}) { 
     my $feed_ref = $conf->{'feeds'}; 
     $feed_ref->{$feed}->{'name'} = $feed; 
     $feed_ref->{$feed}->{'on_fetch'} = \&fetch_feed_cb; 
     push(@feeds, $feed_ref->{$feed}); 
    } 
    return @feeds; 
} 

sub fetch_feed_cb 
{ 
    my ($feed_reader, $new_entries, $feed, $error) = @_; 
    if (defined $error) { 
     say "Error fetching feed: $error"; 
     return; 
    } 
    say "$$ checking for new feeds"; 
    for (@$new_entries) { 
     my ($hash, $entry) = @$_; 
     say "$$ sending " . $entry->link; 
     zmq_send($sender, JSON::to_json({ url => $entry->link }, { pretty => 1, utf8 => 1 })); 
    } 
} 

下面是一个运行示例:

[forked child 40790] my pid is 40789 
[forked child 40791] my pid is 40789 
[forked child 40792] my pid is 40789 
40789 checking for new feeds 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/mWprjBD3UhM/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/NngMs9pCQew/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/wiUsvafLGFU/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/QMp6gnZpFcA/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/kqUb_rpU5dE/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/tHItKqKhGXg/ 
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/7LleQbVnPmE/ 
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99. 
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99. 
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99. 
40791 processing http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/... 
40790 processing http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/... 
40792 processing http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/... ^C at /usr/local/perls/perl5162/lib/perl5/site_perl/darwin-thread-multi-2level/AnyEvent/Loop.pm line 231. 

如果我没有明确地做一个“使用科罗;”在Worker.pm中,不显示coro FATAL错误。我不知道async如何在没有进一步运行时错误的情况下工作。

示例配置文件(feeds.cfg):

max_download_children = 3 
<feeds> 
    <feed1> 
     url="http://feeds.feedburner.com/PerlNews?format=xml" 
     interval=60 
    </feed1> 
</feeds> 

所以我花多一点时间与今日。所以我的方式做错了$ c-> join。我不应该这样做,因为我不能阻止回调。 Coro将安排异步模块,并在完成后完成。我需要做的唯一事情是以某种方式知道什么时候完成了所有的异步操作,我想我可以弄清楚。现在棘手的问题是试图找出这个小片的神秘面纱:

sub _recv_msg 
{ 
    my $self = shift; 
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver))) { 
     my $msg = JSON::from_json($message, {utf8 => 1}); 
     $self->uri(URI->new($msg->{url})); 
     $self->_proc_msg; 
    } 
} 

这个while循环导致我的异步{}在_proc_msg不运行的线程。删除while循环,只处理第一个msg和coros运行。离开while循环,他们永远不会运行。对我来说很奇怪,还没有弄清楚为什么。


进一步更新:

zmq_msg_recv被封堵。另外,父级中的zmq_send可能会阻塞。必须使用ZMQ_NOBLOCK。 我把工人和主人完全分成单独的程序。

+0

L :: P :: AE :: http是我的模块。我今天晚些时候会看。 'async {}'从哪里来? – ikegami

+0

代码没有证明问题。它缺少一个输入源? – ikegami

+1

编辑原始帖子来解决您的问题。一如既往地感谢。 – mikew

您可以使用https://metacpan.org/pod/AnyEvent::HTTP::LWP::UserAgent进行异步调用。

use AnyEvent::HTTP::LWP::UserAgent; 
    use AnyEvent; 

    my $ua = AnyEvent::HTTP::LWP::UserAgent->new; 
    my @urls = (...); 
    my $cv = AE::cv; 
    $cv->begin; 
    foreach my $url (@urls) { 
     $cv->begin; 
     $ua->get_async($url)->cb(sub { 
      my $r = shift->recv; 
      print "url $url, content " . $r->content . "\n"; 
      $cv->end; 
     }); 
    } 
    $cv->end; 
    $cv->recv; 
+0

这个界面绝对是我习惯的更多。如果我无法获得www-mechanize,我可能不得不切换模块。感谢您的意见。 – mikew