Newer Older
241 lines | 5.615kb
add files
Yuki Kimoto authored on 2014-03-26
1
package Mojo::Reactor::Poll;
2
use Mojo::Base 'Mojo::Reactor';
3

            
4
use IO::Poll qw(POLLERR POLLHUP POLLIN POLLOUT);
5
use List::Util 'min';
6
use Mojo::Util qw(md5_sum steady_time);
7
use Time::HiRes 'usleep';
8

            
9
sub again {
10
  my $timer = shift->{timers}{shift()};
11
  $timer->{time} = steady_time + $timer->{after};
12
}
13

            
14
sub io {
15
  my ($self, $handle, $cb) = @_;
16
  $self->{io}{fileno $handle} = {cb => $cb};
17
  return $self->watch($handle, 1, 1);
18
}
19

            
20
sub is_running { !!shift->{running} }
21

            
22
sub one_tick {
23
  my $self = shift;
24

            
25
  # Remember state for later
26
  my $running = $self->{running};
27
  $self->{running} = 1;
28

            
29
  # Wait for one event
30
  my $i;
31
  my $poll = $self->_poll;
32
  until ($i) {
33

            
34
    # Stop automatically if there is nothing to watch
35
    return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
36

            
37
    # Calculate ideal timeout based on timers
38
    my $min = min map { $_->{time} } values %{$self->{timers}};
39
    my $timeout = defined $min ? ($min - steady_time) : 0.5;
40
    $timeout = 0 if $timeout < 0;
41

            
42
    # I/O
43
    if (keys %{$self->{io}}) {
44
      $poll->poll($timeout);
45
      ++$i and $self->_sandbox('Read', $self->{io}{fileno $_}{cb}, 0)
46
        for $poll->handles(POLLIN | POLLHUP | POLLERR);
47
      ++$i and $self->_sandbox('Write', $self->{io}{fileno $_}{cb}, 1)
48
        for $poll->handles(POLLOUT);
49
    }
50

            
51
    # Wait for timeout if poll can't be used
52
    elsif ($timeout) { usleep $timeout * 1000000 }
53

            
54
    # Timers (time should not change in between timers)
55
    my $now = steady_time;
56
    for my $id (keys %{$self->{timers}}) {
57
      next unless my $t = $self->{timers}{$id};
58
      next unless $t->{time} <= $now;
59

            
60
      # Recurring timer
61
      if (exists $t->{recurring}) { $t->{time} = $now + $t->{recurring} }
62

            
63
      # Normal timer
64
      else { $self->remove($id) }
65

            
66
      ++$i and $self->_sandbox("Timer $id", $t->{cb}) if $t->{cb};
67
    }
68
  }
69

            
70
  # Restore state if necessary
71
  $self->{running} = $running if $self->{running};
72
}
73

            
74
sub recurring { shift->_timer(1, @_) }
75

            
76
sub remove {
77
  my ($self, $remove) = @_;
78
  return !!delete $self->{timers}{$remove} unless ref $remove;
79
  $self->_poll->remove($remove);
80
  return !!delete $self->{io}{fileno $remove};
81
}
82

            
83
sub start {
84
  my $self = shift;
85
  return if $self->{running}++;
86
  $self->one_tick while $self->{running};
87
}
88

            
89
sub stop { delete shift->{running} }
90

            
91
sub timer { shift->_timer(0, @_) }
92

            
93
sub watch {
94
  my ($self, $handle, $read, $write) = @_;
95

            
96
  my $poll = $self->_poll;
97
  $poll->remove($handle);
98
  if ($read && $write) { $poll->mask($handle, POLLIN | POLLOUT) }
99
  elsif ($read)  { $poll->mask($handle, POLLIN) }
100
  elsif ($write) { $poll->mask($handle, POLLOUT) }
101

            
102
  return $self;
103
}
104

            
105
sub _poll { shift->{poll} ||= IO::Poll->new }
106

            
107
sub _sandbox {
108
  my ($self, $event, $cb) = (shift, shift, shift);
109
  eval { $self->$cb(@_); 1 } or $self->emit(error => "$event failed: $@");
110
}
111

            
112
sub _timer {
113
  my ($self, $recurring, $after, $cb) = @_;
114

            
115
  my $timers = $self->{timers} //= {};
116
  my $id;
117
  do { $id = md5_sum('t' . steady_time . rand 999) } while $timers->{$id};
118
  my $timer = $timers->{$id}
119
    = {cb => $cb, after => $after, time => steady_time + $after};
120
  $timer->{recurring} = $after if $recurring;
121

            
122
  return $id;
123
}
124

            
125
1;
126

            
127
=encoding utf8
128

            
129
=head1 NAME
130

            
131
Mojo::Reactor::Poll - Low level event reactor with poll support
132

            
133
=head1 SYNOPSIS
134

            
135
  use Mojo::Reactor::Poll;
136

            
137
  # Watch if handle becomes readable or writable
138
  my $reactor = Mojo::Reactor::Poll->new;
139
  $reactor->io($handle => sub {
140
    my ($reactor, $writable) = @_;
141
    say $writable ? 'Handle is writable' : 'Handle is readable';
142
  });
143

            
144
  # Change to watching only if handle becomes writable
145
  $reactor->watch($handle, 0, 1);
146

            
147
  # Add a timer
148
  $reactor->timer(15 => sub {
149
    my $reactor = shift;
150
    $reactor->remove($handle);
151
    say 'Timeout!';
152
  });
153

            
154
  # Start reactor if necessary
155
  $reactor->start unless $reactor->is_running;
156

            
157
=head1 DESCRIPTION
158

            
159
L<Mojo::Reactor::Poll> is a low level event reactor based on L<IO::Poll>.
160

            
161
=head1 EVENTS
162

            
163
L<Mojo::Reactor::Poll> inherits all events from L<Mojo::Reactor>.
164

            
165
=head1 METHODS
166

            
167
L<Mojo::Reactor::Poll> inherits all methods from L<Mojo::Reactor> and
168
implements the following new ones.
169

            
170
=head2 again
171

            
172
  $reactor->again($id);
173

            
174
Restart active timer.
175

            
176
=head2 io
177

            
178
  $reactor = $reactor->io($handle => sub {...});
179

            
180
Watch handle for I/O events, invoking the callback whenever handle becomes
181
readable or writable.
182

            
183
=head2 is_running
184

            
185
  my $bool = $reactor->is_running;
186

            
187
Check if reactor is running.
188

            
189
=head2 one_tick
190

            
191
  $reactor->one_tick;
192

            
193
Run reactor until an event occurs or no events are being watched anymore. Note
194
that this method can recurse back into the reactor, so you need to be careful.
195

            
196
=head2 recurring
197

            
198
  my $id = $reactor->recurring(0.25 => sub {...});
199

            
200
Create a new recurring timer, invoking the callback repeatedly after a given
201
amount of time in seconds.
202

            
203
=head2 remove
204

            
205
  my $bool = $reactor->remove($handle);
206
  my $bool = $reactor->remove($id);
207

            
208
Remove handle or timer.
209

            
210
=head2 start
211

            
212
  $reactor->start;
213

            
214
Start watching for I/O and timer events, this will block until L</"stop"> is
215
called or no events are being watched anymore.
216

            
217
=head2 stop
218

            
219
  $reactor->stop;
220

            
221
Stop watching for I/O and timer events.
222

            
223
=head2 timer
224

            
225
  my $id = $reactor->timer(0.5 => sub {...});
226

            
227
Create a new timer, invoking the callback after a given amount of time in
228
seconds.
229

            
230
=head2 watch
231

            
232
  $reactor = $reactor->watch($handle, $readable, $writable);
233

            
234
Change I/O events to watch handle for with true and false values. Note that
235
this method requires an active I/O watcher.
236

            
237
=head1 SEE ALSO
238

            
239
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
240

            
241
=cut