add files
|
1 |
package Mojo::IOLoop::Stream; |
2 |
use Mojo::Base 'Mojo::EventEmitter'; |
|
3 | ||
4 |
use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK); |
|
5 |
use Scalar::Util 'weaken'; |
|
6 | ||
7 |
has reactor => sub { |
|
8 |
require Mojo::IOLoop; |
|
9 |
Mojo::IOLoop->singleton->reactor; |
|
10 |
}; |
|
11 | ||
12 |
sub DESTROY { shift->close } |
|
13 | ||
14 |
sub new { shift->SUPER::new(handle => shift, buffer => '', timeout => 15) } |
|
15 | ||
16 |
sub close { |
|
17 |
my $self = shift; |
|
18 | ||
19 |
return unless my $reactor = $self->reactor; |
|
20 |
return unless my $handle = delete $self->timeout(0)->{handle}; |
|
21 |
$reactor->remove($handle); |
|
22 |
close $handle; |
|
23 |
$self->emit_safe('close'); |
|
24 |
} |
|
25 | ||
26 |
sub close_gracefully { |
|
27 |
my $self = shift; |
|
28 |
return $self->{graceful} = 1 if $self->is_writing; |
|
29 |
$self->close; |
|
30 |
} |
|
31 | ||
32 |
sub handle { shift->{handle} } |
|
33 | ||
34 |
sub is_readable { |
|
35 |
my $self = shift; |
|
36 |
$self->_again; |
|
37 |
return $self->{handle} && $self->reactor->is_readable($self->{handle}); |
|
38 |
} |
|
39 | ||
40 |
sub is_writing { |
|
41 |
my $self = shift; |
|
42 |
return undef unless $self->{handle}; |
|
43 |
return !!length($self->{buffer}) || $self->has_subscribers('drain'); |
|
44 |
} |
|
45 | ||
46 |
sub start { |
|
47 |
my $self = shift; |
|
48 | ||
49 |
# Resume |
|
50 |
my $reactor = $self->reactor; |
|
51 |
return $reactor->watch($self->{handle}, 1, $self->is_writing) |
|
52 |
if delete $self->{paused}; |
|
53 | ||
54 |
weaken $self; |
|
55 |
my $cb = sub { pop() ? $self->_write : $self->_read }; |
|
56 |
$reactor->io($self->timeout($self->{timeout})->{handle} => $cb); |
|
57 |
} |
|
58 | ||
59 |
sub stop { |
|
60 |
my $self = shift; |
|
61 |
$self->reactor->watch($self->{handle}, 0, $self->is_writing) |
|
62 |
unless $self->{paused}++; |
|
63 |
} |
|
64 | ||
65 |
sub steal_handle { |
|
66 |
my $self = shift; |
|
67 |
$self->reactor->remove($self->{handle}); |
|
68 |
return delete $self->{handle}; |
|
69 |
} |
|
70 | ||
71 |
sub timeout { |
|
72 |
my $self = shift; |
|
73 | ||
74 |
return $self->{timeout} unless @_; |
|
75 | ||
76 |
my $reactor = $self->reactor; |
|
77 |
$reactor->remove(delete $self->{timer}) if $self->{timer}; |
|
78 |
return $self unless my $timeout = $self->{timeout} = shift; |
|
79 |
weaken $self; |
|
80 |
$self->{timer} |
|
81 |
= $reactor->timer($timeout => sub { $self->emit_safe('timeout')->close }); |
|
82 | ||
83 |
return $self; |
|
84 |
} |
|
85 | ||
86 |
sub write { |
|
87 |
my ($self, $chunk, $cb) = @_; |
|
88 | ||
89 |
$self->{buffer} .= $chunk; |
|
90 |
if ($cb) { $self->once(drain => $cb) } |
|
91 |
else { return $self unless length $self->{buffer} } |
|
92 |
$self->reactor->watch($self->{handle}, !$self->{paused}, 1) |
|
93 |
if $self->{handle}; |
|
94 | ||
95 |
return $self; |
|
96 |
} |
|
97 | ||
98 |
sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} } |
|
99 | ||
100 |
sub _error { |
|
101 |
my $self = shift; |
|
102 | ||
103 |
# Retry |
|
104 |
return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK; |
|
105 | ||
106 |
# Closed |
|
107 |
return $self->close if $! == ECONNRESET || $! == EPIPE; |
|
108 | ||
109 |
# Error |
|
110 |
$self->emit(error => $!)->close; |
|
111 |
} |
|
112 | ||
113 |
sub _read { |
|
114 |
my $self = shift; |
|
115 |
my $read = $self->{handle}->sysread(my $buffer, 131072, 0); |
|
116 |
return $self->_error unless defined $read; |
|
117 |
return $self->close if $read == 0; |
|
118 |
$self->emit_safe(read => $buffer)->_again; |
|
119 |
} |
|
120 | ||
121 |
sub _write { |
|
122 |
my $self = shift; |
|
123 | ||
124 |
my $handle = $self->{handle}; |
|
125 |
if (length $self->{buffer}) { |
|
126 |
my $written = $handle->syswrite($self->{buffer}); |
|
127 |
return $self->_error unless defined $written; |
|
128 |
$self->emit_safe(write => substr($self->{buffer}, 0, $written, '')); |
|
129 |
$self->_again; |
|
130 |
} |
|
131 | ||
132 |
$self->emit_safe('drain') unless length $self->{buffer}; |
|
133 |
return if $self->is_writing; |
|
134 |
return $self->close if $self->{graceful}; |
|
135 |
$self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle}; |
|
136 |
} |
|
137 | ||
138 |
1; |
|
139 | ||
140 |
=encoding utf8 |
|
141 | ||
142 |
=head1 NAME |
|
143 | ||
144 |
Mojo::IOLoop::Stream - Non-blocking I/O stream |
|
145 | ||
146 |
=head1 SYNOPSIS |
|
147 | ||
148 |
use Mojo::IOLoop::Stream; |
|
149 | ||
150 |
# Create stream |
|
151 |
my $stream = Mojo::IOLoop::Stream->new($handle); |
|
152 |
$stream->on(read => sub { |
|
153 |
my ($stream, $bytes) = @_; |
|
154 |
... |
|
155 |
}); |
|
156 |
$stream->on(close => sub { |
|
157 |
my $stream = shift; |
|
158 |
... |
|
159 |
}); |
|
160 |
$stream->on(error => sub { |
|
161 |
my ($stream, $err) = @_; |
|
162 |
... |
|
163 |
}); |
|
164 | ||
165 |
# Start and stop watching for new data |
|
166 |
$stream->start; |
|
167 |
$stream->stop; |
|
168 | ||
169 |
# Start reactor if necessary |
|
170 |
$stream->reactor->start unless $stream->reactor->is_running; |
|
171 | ||
172 |
=head1 DESCRIPTION |
|
173 | ||
174 |
L<Mojo::IOLoop::Stream> is a container for I/O streams used by |
|
175 |
L<Mojo::IOLoop>. |
|
176 | ||
177 |
=head1 EVENTS |
|
178 | ||
179 |
L<Mojo::IOLoop::Stream> inherits all events from L<Mojo::EventEmitter> and can |
|
180 |
emit the following new ones. |
|
181 | ||
182 |
=head2 close |
|
183 | ||
184 |
$stream->on(close => sub { |
|
185 |
my $stream = shift; |
|
186 |
... |
|
187 |
}); |
|
188 | ||
189 |
Emitted safely if the stream gets closed. |
|
190 | ||
191 |
=head2 drain |
|
192 | ||
193 |
$stream->on(drain => sub { |
|
194 |
my $stream = shift; |
|
195 |
... |
|
196 |
}); |
|
197 | ||
198 |
Emitted safely once all data has been written. |
|
199 | ||
200 |
=head2 error |
|
201 | ||
202 |
$stream->on(error => sub { |
|
203 |
my ($stream, $err) = @_; |
|
204 |
... |
|
205 |
}); |
|
206 | ||
207 |
Emitted if an error occurs on the stream, fatal if unhandled. |
|
208 | ||
209 |
=head2 read |
|
210 | ||
211 |
$stream->on(read => sub { |
|
212 |
my ($stream, $bytes) = @_; |
|
213 |
... |
|
214 |
}); |
|
215 | ||
216 |
Emitted safely if new data arrives on the stream. |
|
217 | ||
218 |
=head2 timeout |
|
219 | ||
220 |
$stream->on(timeout => sub { |
|
221 |
my $stream = shift; |
|
222 |
... |
|
223 |
}); |
|
224 | ||
225 |
Emitted safely if the stream has been inactive for too long and will get |
|
226 |
closed automatically. |
|
227 | ||
228 |
=head2 write |
|
229 | ||
230 |
$stream->on(write => sub { |
|
231 |
my ($stream, $bytes) = @_; |
|
232 |
... |
|
233 |
}); |
|
234 | ||
235 |
Emitted safely if new data has been written to the stream. |
|
236 | ||
237 |
=head1 ATTRIBUTES |
|
238 | ||
239 |
L<Mojo::IOLoop::Stream> implements the following attributes. |
|
240 | ||
241 |
=head2 reactor |
|
242 | ||
243 |
my $reactor = $stream->reactor; |
|
244 |
$stream = $stream->reactor(Mojo::Reactor::Poll->new); |
|
245 | ||
246 |
Low level event reactor, defaults to the C<reactor> attribute value of the |
|
247 |
global L<Mojo::IOLoop> singleton. |
|
248 | ||
249 |
=head1 METHODS |
|
250 | ||
251 |
L<Mojo::IOLoop::Stream> inherits all methods from L<Mojo::EventEmitter> and |
|
252 |
implements the following new ones. |
|
253 | ||
254 |
=head2 new |
|
255 | ||
256 |
my $stream = Mojo::IOLoop::Stream->new($handle); |
|
257 | ||
258 |
Construct a new L<Mojo::IOLoop::Stream> object. |
|
259 | ||
260 |
=head2 close |
|
261 | ||
262 |
$stream->close; |
|
263 | ||
264 |
Close stream immediately. |
|
265 | ||
266 |
=head2 close_gracefully |
|
267 | ||
268 |
$stream->close_gracefully; |
|
269 | ||
270 |
Close stream gracefully. |
|
271 | ||
272 |
=head2 handle |
|
273 | ||
274 |
my $handle = $stream->handle; |
|
275 | ||
276 |
Get handle for stream. |
|
277 | ||
278 |
=head2 is_readable |
|
279 | ||
280 |
my $bool = $stream->is_readable; |
|
281 | ||
282 |
Quick non-blocking check if stream is readable, useful for identifying tainted |
|
283 |
sockets. |
|
284 | ||
285 |
=head2 is_writing |
|
286 | ||
287 |
my $bool = $stream->is_writing; |
|
288 | ||
289 |
Check if stream is writing. |
|
290 | ||
291 |
=head2 start |
|
292 | ||
293 |
$stream->start; |
|
294 | ||
295 |
Start watching for new data on the stream. |
|
296 | ||
297 |
=head2 stop |
|
298 | ||
299 |
$stream->stop; |
|
300 | ||
301 |
Stop watching for new data on the stream. |
|
302 | ||
303 |
=head2 steal_handle |
|
304 | ||
305 |
my $handle = $stream->steal_handle; |
|
306 | ||
307 |
Steal handle from stream and prevent it from getting closed automatically. |
|
308 | ||
309 |
=head2 timeout |
|
310 | ||
311 |
my $timeout = $stream->timeout; |
|
312 |
$stream = $stream->timeout(45); |
|
313 | ||
314 |
Maximum amount of time in seconds stream can be inactive before getting closed |
|
315 |
automatically, defaults to C<15>. Setting the value to C<0> will allow this |
|
316 |
stream to be inactive indefinitely. |
|
317 | ||
318 |
=head2 write |
|
319 | ||
320 |
$stream = $stream->write($bytes); |
|
321 |
$stream = $stream->write($bytes => sub {...}); |
|
322 | ||
323 |
Write data to stream, the optional drain callback will be invoked once all |
|
324 |
data has been written. |
|
325 | ||
326 |
=head1 SEE ALSO |
|
327 | ||
328 |
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>. |
|
329 | ||
330 |
=cut |