add files
|
1 |
package Mojo::Reactor::Poll; |
2 |
use Mojo::Base 'Mojo::Reactor'; |
|
3 | ||
4 |
use IO::Poll qw(POLLERR POLLHUP POLLIN POLLOUT); |
|
5 |
use List::Util 'min'; |
|
6 |
use Mojo::Util qw(md5_sum steady_time); |
|
7 |
use Time::HiRes 'usleep'; |
|
8 | ||
9 |
sub again { |
|
10 |
my $timer = shift->{timers}{shift()}; |
|
11 |
$timer->{time} = steady_time + $timer->{after}; |
|
12 |
} |
|
13 | ||
14 |
sub io { |
|
15 |
my ($self, $handle, $cb) = @_; |
|
16 |
$self->{io}{fileno $handle} = {cb => $cb}; |
|
17 |
return $self->watch($handle, 1, 1); |
|
18 |
} |
|
19 | ||
20 |
sub is_running { !!shift->{running} } |
|
21 | ||
22 |
sub one_tick { |
|
23 |
my $self = shift; |
|
24 | ||
25 |
# Remember state for later |
|
26 |
my $running = $self->{running}; |
|
27 |
$self->{running} = 1; |
|
28 | ||
29 |
# Wait for one event |
|
30 |
my $i; |
|
31 |
my $poll = $self->_poll; |
|
32 |
until ($i) { |
|
33 | ||
34 |
# Stop automatically if there is nothing to watch |
|
35 |
return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}}; |
|
36 | ||
37 |
# Calculate ideal timeout based on timers |
|
38 |
my $min = min map { $_->{time} } values %{$self->{timers}}; |
|
39 |
my $timeout = defined $min ? ($min - steady_time) : 0.5; |
|
40 |
$timeout = 0 if $timeout < 0; |
|
41 | ||
42 |
# I/O |
|
43 |
if (keys %{$self->{io}}) { |
|
44 |
$poll->poll($timeout); |
|
45 |
++$i and $self->_sandbox('Read', $self->{io}{fileno $_}{cb}, 0) |
|
46 |
for $poll->handles(POLLIN | POLLHUP | POLLERR); |
|
47 |
++$i and $self->_sandbox('Write', $self->{io}{fileno $_}{cb}, 1) |
|
48 |
for $poll->handles(POLLOUT); |
|
49 |
} |
|
50 | ||
51 |
# Wait for timeout if poll can't be used |
|
52 |
elsif ($timeout) { usleep $timeout * 1000000 } |
|
53 | ||
54 |
# Timers (time should not change in between timers) |
|
55 |
my $now = steady_time; |
|
56 |
for my $id (keys %{$self->{timers}}) { |
|
57 |
next unless my $t = $self->{timers}{$id}; |
|
58 |
next unless $t->{time} <= $now; |
|
59 | ||
60 |
# Recurring timer |
|
61 |
if (exists $t->{recurring}) { $t->{time} = $now + $t->{recurring} } |
|
62 | ||
63 |
# Normal timer |
|
64 |
else { $self->remove($id) } |
|
65 | ||
66 |
++$i and $self->_sandbox("Timer $id", $t->{cb}) if $t->{cb}; |
|
67 |
} |
|
68 |
} |
|
69 | ||
70 |
# Restore state if necessary |
|
71 |
$self->{running} = $running if $self->{running}; |
|
72 |
} |
|
73 | ||
74 |
sub recurring { shift->_timer(1, @_) } |
|
75 | ||
76 |
sub remove { |
|
77 |
my ($self, $remove) = @_; |
|
78 |
return !!delete $self->{timers}{$remove} unless ref $remove; |
|
79 |
$self->_poll->remove($remove); |
|
80 |
return !!delete $self->{io}{fileno $remove}; |
|
81 |
} |
|
82 | ||
83 |
sub start { |
|
84 |
my $self = shift; |
|
85 |
return if $self->{running}++; |
|
86 |
$self->one_tick while $self->{running}; |
|
87 |
} |
|
88 | ||
89 |
sub stop { delete shift->{running} } |
|
90 | ||
91 |
sub timer { shift->_timer(0, @_) } |
|
92 | ||
93 |
sub watch { |
|
94 |
my ($self, $handle, $read, $write) = @_; |
|
95 | ||
96 |
my $poll = $self->_poll; |
|
97 |
$poll->remove($handle); |
|
98 |
if ($read && $write) { $poll->mask($handle, POLLIN | POLLOUT) } |
|
99 |
elsif ($read) { $poll->mask($handle, POLLIN) } |
|
100 |
elsif ($write) { $poll->mask($handle, POLLOUT) } |
|
101 | ||
102 |
return $self; |
|
103 |
} |
|
104 | ||
105 |
sub _poll { shift->{poll} ||= IO::Poll->new } |
|
106 | ||
107 |
sub _sandbox { |
|
108 |
my ($self, $event, $cb) = (shift, shift, shift); |
|
109 |
eval { $self->$cb(@_); 1 } or $self->emit(error => "$event failed: $@"); |
|
110 |
} |
|
111 | ||
112 |
sub _timer { |
|
113 |
my ($self, $recurring, $after, $cb) = @_; |
|
114 | ||
115 |
my $timers = $self->{timers} //= {}; |
|
116 |
my $id; |
|
117 |
do { $id = md5_sum('t' . steady_time . rand 999) } while $timers->{$id}; |
|
118 |
my $timer = $timers->{$id} |
|
119 |
= {cb => $cb, after => $after, time => steady_time + $after}; |
|
120 |
$timer->{recurring} = $after if $recurring; |
|
121 | ||
122 |
return $id; |
|
123 |
} |
|
124 | ||
125 |
1; |
|
126 | ||
127 |
=encoding utf8 |
|
128 | ||
129 |
=head1 NAME |
|
130 | ||
131 |
Mojo::Reactor::Poll - Low level event reactor with poll support |
|
132 | ||
133 |
=head1 SYNOPSIS |
|
134 | ||
135 |
use Mojo::Reactor::Poll; |
|
136 | ||
137 |
# Watch if handle becomes readable or writable |
|
138 |
my $reactor = Mojo::Reactor::Poll->new; |
|
139 |
$reactor->io($handle => sub { |
|
140 |
my ($reactor, $writable) = @_; |
|
141 |
say $writable ? 'Handle is writable' : 'Handle is readable'; |
|
142 |
}); |
|
143 | ||
144 |
# Change to watching only if handle becomes writable |
|
145 |
$reactor->watch($handle, 0, 1); |
|
146 | ||
147 |
# Add a timer |
|
148 |
$reactor->timer(15 => sub { |
|
149 |
my $reactor = shift; |
|
150 |
$reactor->remove($handle); |
|
151 |
say 'Timeout!'; |
|
152 |
}); |
|
153 | ||
154 |
# Start reactor if necessary |
|
155 |
$reactor->start unless $reactor->is_running; |
|
156 | ||
157 |
=head1 DESCRIPTION |
|
158 | ||
159 |
L<Mojo::Reactor::Poll> is a low level event reactor based on L<IO::Poll>. |
|
160 | ||
161 |
=head1 EVENTS |
|
162 | ||
163 |
L<Mojo::Reactor::Poll> inherits all events from L<Mojo::Reactor>. |
|
164 | ||
165 |
=head1 METHODS |
|
166 | ||
167 |
L<Mojo::Reactor::Poll> inherits all methods from L<Mojo::Reactor> and |
|
168 |
implements the following new ones. |
|
169 | ||
170 |
=head2 again |
|
171 | ||
172 |
$reactor->again($id); |
|
173 | ||
174 |
Restart active timer. |
|
175 | ||
176 |
=head2 io |
|
177 | ||
178 |
$reactor = $reactor->io($handle => sub {...}); |
|
179 | ||
180 |
Watch handle for I/O events, invoking the callback whenever handle becomes |
|
181 |
readable or writable. |
|
182 | ||
183 |
=head2 is_running |
|
184 | ||
185 |
my $bool = $reactor->is_running; |
|
186 | ||
187 |
Check if reactor is running. |
|
188 | ||
189 |
=head2 one_tick |
|
190 | ||
191 |
$reactor->one_tick; |
|
192 | ||
193 |
Run reactor until an event occurs or no events are being watched anymore. Note |
|
194 |
that this method can recurse back into the reactor, so you need to be careful. |
|
195 | ||
196 |
=head2 recurring |
|
197 | ||
198 |
my $id = $reactor->recurring(0.25 => sub {...}); |
|
199 | ||
200 |
Create a new recurring timer, invoking the callback repeatedly after a given |
|
201 |
amount of time in seconds. |
|
202 | ||
203 |
=head2 remove |
|
204 | ||
205 |
my $bool = $reactor->remove($handle); |
|
206 |
my $bool = $reactor->remove($id); |
|
207 | ||
208 |
Remove handle or timer. |
|
209 | ||
210 |
=head2 start |
|
211 | ||
212 |
$reactor->start; |
|
213 | ||
214 |
Start watching for I/O and timer events, this will block until L</"stop"> is |
|
215 |
called or no events are being watched anymore. |
|
216 | ||
217 |
=head2 stop |
|
218 | ||
219 |
$reactor->stop; |
|
220 | ||
221 |
Stop watching for I/O and timer events. |
|
222 | ||
223 |
=head2 timer |
|
224 | ||
225 |
my $id = $reactor->timer(0.5 => sub {...}); |
|
226 | ||
227 |
Create a new timer, invoking the callback after a given amount of time in |
|
228 |
seconds. |
|
229 | ||
230 |
=head2 watch |
|
231 | ||
232 |
$reactor = $reactor->watch($handle, $readable, $writable); |
|
233 | ||
234 |
Change I/O events to watch handle for with true and false values. Note that |
|
235 |
this method requires an active I/O watcher. |
|
236 | ||
237 |
=head1 SEE ALSO |
|
238 | ||
239 |
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>. |
|
240 | ||
241 |
=cut |