package Mojo::Reactor::Poll; use Mojo::Base 'Mojo::Reactor'; use IO::Poll qw(POLLERR POLLHUP POLLIN POLLOUT); use List::Util 'min'; use Mojo::Util qw(md5_sum steady_time); use Time::HiRes 'usleep'; sub again { my $timer = shift->{timers}{shift()}; $timer->{time} = steady_time + $timer->{after}; } sub io { my ($self, $handle, $cb) = @_; $self->{io}{fileno $handle} = {cb => $cb}; return $self->watch($handle, 1, 1); } sub is_running { !!shift->{running} } sub one_tick { my $self = shift; # Remember state for later my $running = $self->{running}; $self->{running} = 1; # Wait for one event my $i; my $poll = $self->_poll; until ($i) { # Stop automatically if there is nothing to watch return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}}; # Calculate ideal timeout based on timers my $min = min map { $_->{time} } values %{$self->{timers}}; my $timeout = defined $min ? ($min - steady_time) : 0.5; $timeout = 0 if $timeout < 0; # I/O if (keys %{$self->{io}}) { $poll->poll($timeout); ++$i and $self->_sandbox('Read', $self->{io}{fileno $_}{cb}, 0) for $poll->handles(POLLIN | POLLHUP | POLLERR); ++$i and $self->_sandbox('Write', $self->{io}{fileno $_}{cb}, 1) for $poll->handles(POLLOUT); } # Wait for timeout if poll can't be used elsif ($timeout) { usleep $timeout * 1000000 } # Timers (time should not change in between timers) my $now = steady_time; for my $id (keys %{$self->{timers}}) { next unless my $t = $self->{timers}{$id}; next unless $t->{time} <= $now; # Recurring timer if (exists $t->{recurring}) { $t->{time} = $now + $t->{recurring} } # Normal timer else { $self->remove($id) } ++$i and $self->_sandbox("Timer $id", $t->{cb}) if $t->{cb}; } } # Restore state if necessary $self->{running} = $running if $self->{running}; } sub recurring { shift->_timer(1, @_) } sub remove { my ($self, $remove) = @_; return !!delete $self->{timers}{$remove} unless ref $remove; $self->_poll->remove($remove); return !!delete $self->{io}{fileno $remove}; } sub start { my $self = shift; return if $self->{running}++; $self->one_tick while $self->{running}; } sub stop { delete shift->{running} } sub timer { shift->_timer(0, @_) } sub watch { my ($self, $handle, $read, $write) = @_; my $poll = $self->_poll; $poll->remove($handle); if ($read && $write) { $poll->mask($handle, POLLIN | POLLOUT) } elsif ($read) { $poll->mask($handle, POLLIN) } elsif ($write) { $poll->mask($handle, POLLOUT) } return $self; } sub _poll { shift->{poll} ||= IO::Poll->new } sub _sandbox { my ($self, $event, $cb) = (shift, shift, shift); eval { $self->$cb(@_); 1 } or $self->emit(error => "$event failed: $@"); } sub _timer { my ($self, $recurring, $after, $cb) = @_; my $timers = $self->{timers} = defined $self->{timers} ? $self->{timers} : {}; my $id; do { $id = md5_sum('t' . steady_time . rand 999) } while $timers->{$id}; my $timer = $timers->{$id} = {cb => $cb, after => $after, time => steady_time + $after}; $timer->{recurring} = $after if $recurring; return $id; } 1; =encoding utf8 =head1 NAME Mojo::Reactor::Poll - Low level event reactor with poll support =head1 SYNOPSIS use Mojo::Reactor::Poll; # Watch if handle becomes readable or writable my $reactor = Mojo::Reactor::Poll->new; $reactor->io($handle => sub { my ($reactor, $writable) = @_; say $writable ? 'Handle is writable' : 'Handle is readable'; }); # Change to watching only if handle becomes writable $reactor->watch($handle, 0, 1); # Add a timer $reactor->timer(15 => sub { my $reactor = shift; $reactor->remove($handle); say 'Timeout!'; }); # Start reactor if necessary $reactor->start unless $reactor->is_running; =head1 DESCRIPTION L is a low level event reactor based on L. =head1 EVENTS L inherits all events from L. =head1 METHODS L inherits all methods from L and implements the following new ones. =head2 again $reactor->again($id); Restart active timer. =head2 io $reactor = $reactor->io($handle => sub {...}); Watch handle for I/O events, invoking the callback whenever handle becomes readable or writable. =head2 is_running my $bool = $reactor->is_running; Check if reactor is running. =head2 one_tick $reactor->one_tick; Run reactor until an event occurs or no events are being watched anymore. Note that this method can recurse back into the reactor, so you need to be careful. =head2 recurring my $id = $reactor->recurring(0.25 => sub {...}); Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds. =head2 remove my $bool = $reactor->remove($handle); my $bool = $reactor->remove($id); Remove handle or timer. =head2 start $reactor->start; Start watching for I/O and timer events, this will block until L is called or no events are being watched anymore. =head2 stop $reactor->stop; Stop watching for I/O and timer events. =head2 timer my $id = $reactor->timer(0.5 => sub {...}); Create a new timer, invoking the callback after a given amount of time in seconds. =head2 watch $reactor = $reactor->watch($handle, $readable, $writable); Change I/O events to watch handle for with true and false values. Note that this method requires an active I/O watcher. =head1 SEE ALSO L, L, L. =cut