Newer Older
181 lines | 3.922kb
add files
Yuki Kimoto authored on 2014-03-26
1
package Mojo::Reactor::EV;
2
use Mojo::Base 'Mojo::Reactor::Poll';
3

            
4
use EV 4.0;
5
use Scalar::Util 'weaken';
6

            
7
my $EV;
8

            
9
sub CLONE { die "EV does not work with ithreads.\n" }
10

            
11
sub DESTROY { undef $EV }
12

            
13
# We have to fall back to Mojo::Reactor::Poll, since EV is unique
14
sub new { $EV++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }
15

            
16
sub again { shift->{timers}{shift()}{watcher}->again }
17

            
18
sub is_running { !!EV::depth }
19

            
20
sub one_tick { EV::run(EV::RUN_ONCE) }
21

            
22
sub recurring { shift->_timer(1, @_) }
23

            
24
sub start {EV::run}
25

            
26
sub stop { EV::break(EV::BREAK_ALL) }
27

            
28
sub timer { shift->_timer(0, @_) }
29

            
30
sub watch {
31
  my ($self, $handle, $read, $write) = @_;
32

            
33
  my $fd = fileno $handle;
34
  my $io = $self->{io}{$fd};
35
  my $mode;
36
  if ($read && $write) { $mode = EV::READ | EV::WRITE }
37
  elsif ($read)  { $mode = EV::READ }
38
  elsif ($write) { $mode = EV::WRITE }
39
  else           { delete $io->{watcher} }
40
  if (my $w = $io->{watcher}) { $w->set($fd, $mode) }
41
  elsif ($mode) {
42
    weaken $self;
43
    $io->{watcher} = EV::io($fd, $mode, sub { $self->_io($fd, @_) });
44
  }
45

            
46
  return $self;
47
}
48

            
49
sub _io {
50
  my ($self, $fd, $w, $revents) = @_;
51
  my $io = $self->{io}{$fd};
52
  $self->_sandbox('Read', $io->{cb}, 0) if EV::READ &$revents;
53
  $self->_sandbox('Write', $io->{cb}, 1)
54
    if EV::WRITE &$revents && $self->{io}{$fd};
55
}
56

            
57
sub _timer {
58
  my ($self, $recurring, $after, $cb) = @_;
59
  $after ||= '0.0001';
60

            
61
  my $id = $self->SUPER::_timer(0, 0, $cb);
62
  weaken $self;
63
  $self->{timers}{$id}{watcher} = EV::timer(
64
    $after => $after => sub {
65
      $self->_sandbox("Timer $id", $self->{timers}{$id}{cb});
66
      delete $self->{timers}{$id} unless $recurring;
67
    }
68
  );
69

            
70
  return $id;
71
}
72

            
73
1;
74

            
75
=encoding utf8
76

            
77
=head1 NAME
78

            
79
Mojo::Reactor::EV - Low level event reactor with libev support
80

            
81
=head1 SYNOPSIS
82

            
83
  use Mojo::Reactor::EV;
84

            
85
  # Watch if handle becomes readable or writable
86
  my $reactor = Mojo::Reactor::EV->new;
87
  $reactor->io($handle => sub {
88
    my ($reactor, $writable) = @_;
89
    say $writable ? 'Handle is writable' : 'Handle is readable';
90
  });
91

            
92
  # Change to watching only if handle becomes writable
93
  $reactor->watch($handle, 0, 1);
94

            
95
  # Add a timer
96
  $reactor->timer(15 => sub {
97
    my $reactor = shift;
98
    $reactor->remove($handle);
99
    say 'Timeout!';
100
  });
101

            
102
  # Start reactor if necessary
103
  $reactor->start unless $reactor->is_running;
104

            
105
=head1 DESCRIPTION
106

            
107
L<Mojo::Reactor::EV> is a low level event reactor based on L<EV> (4.0+).
108

            
109
=head1 EVENTS
110

            
111
L<Mojo::Reactor::EV> inherits all events from L<Mojo::Reactor::Poll>.
112

            
113
=head1 METHODS
114

            
115
L<Mojo::Reactor::EV> inherits all methods from L<Mojo::Reactor::Poll> and
116
implements the following new ones.
117

            
118
=head2 new
119

            
120
  my $reactor = Mojo::Reactor::EV->new;
121

            
122
Construct a new L<Mojo::Reactor::EV> object.
123

            
124
=head2 again
125

            
126
  $reactor->again($id);
127

            
128
Restart active timer.
129

            
130
=head2 is_running
131

            
132
  my $bool = $reactor->is_running;
133

            
134
Check if reactor is running.
135

            
136
=head2 one_tick
137

            
138
  $reactor->one_tick;
139

            
140
Run reactor until an event occurs or no events are being watched anymore. Note
141
that this method can recurse back into the reactor, so you need to be careful.
142

            
143
=head2 recurring
144

            
145
  my $id = $reactor->recurring(0.25 => sub {...});
146

            
147
Create a new recurring timer, invoking the callback repeatedly after a given
148
amount of time in seconds.
149

            
150
=head2 start
151

            
152
  $reactor->start;
153

            
154
Start watching for I/O and timer events, this will block until L</"stop"> is
155
called or no events are being watched anymore.
156

            
157
=head2 stop
158

            
159
  $reactor->stop;
160

            
161
Stop watching for I/O and timer events.
162

            
163
=head2 timer
164

            
165
  my $id = $reactor->timer(0.5 => sub {...});
166

            
167
Create a new timer, invoking the callback after a given amount of time in
168
seconds.
169

            
170
=head2 watch
171

            
172
  $reactor = $reactor->watch($handle, $readable, $writable);
173

            
174
Change I/O events to watch handle for with true and false values. Note that
175
this method requires an active I/O watcher.
176

            
177
=head1 SEE ALSO
178

            
179
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
180

            
181
=cut