Newer Older
330 lines | 6.741kb
add files
Yuki Kimoto authored on 2014-03-26
1
package Mojo::IOLoop::Stream;
2
use Mojo::Base 'Mojo::EventEmitter';
3

            
4
use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK);
5
use Scalar::Util 'weaken';
6

            
7
has reactor => sub {
8
  require Mojo::IOLoop;
9
  Mojo::IOLoop->singleton->reactor;
10
};
11

            
12
sub DESTROY { shift->close }
13

            
14
sub new { shift->SUPER::new(handle => shift, buffer => '', timeout => 15) }
15

            
16
sub close {
17
  my $self = shift;
18

            
19
  return unless my $reactor = $self->reactor;
20
  return unless my $handle  = delete $self->timeout(0)->{handle};
21
  $reactor->remove($handle);
22
  close $handle;
23
  $self->emit_safe('close');
24
}
25

            
26
sub close_gracefully {
27
  my $self = shift;
28
  return $self->{graceful} = 1 if $self->is_writing;
29
  $self->close;
30
}
31

            
32
sub handle { shift->{handle} }
33

            
34
sub is_readable {
35
  my $self = shift;
36
  $self->_again;
37
  return $self->{handle} && $self->reactor->is_readable($self->{handle});
38
}
39

            
40
sub is_writing {
41
  my $self = shift;
42
  return undef unless $self->{handle};
43
  return !!length($self->{buffer}) || $self->has_subscribers('drain');
44
}
45

            
46
sub start {
47
  my $self = shift;
48

            
49
  # Resume
50
  my $reactor = $self->reactor;
51
  return $reactor->watch($self->{handle}, 1, $self->is_writing)
52
    if delete $self->{paused};
53

            
54
  weaken $self;
55
  my $cb = sub { pop() ? $self->_write : $self->_read };
56
  $reactor->io($self->timeout($self->{timeout})->{handle} => $cb);
57
}
58

            
59
sub stop {
60
  my $self = shift;
61
  $self->reactor->watch($self->{handle}, 0, $self->is_writing)
62
    unless $self->{paused}++;
63
}
64

            
65
sub steal_handle {
66
  my $self = shift;
67
  $self->reactor->remove($self->{handle});
68
  return delete $self->{handle};
69
}
70

            
71
sub timeout {
72
  my $self = shift;
73

            
74
  return $self->{timeout} unless @_;
75

            
76
  my $reactor = $self->reactor;
77
  $reactor->remove(delete $self->{timer}) if $self->{timer};
78
  return $self unless my $timeout = $self->{timeout} = shift;
79
  weaken $self;
80
  $self->{timer}
81
    = $reactor->timer($timeout => sub { $self->emit_safe('timeout')->close });
82

            
83
  return $self;
84
}
85

            
86
sub write {
87
  my ($self, $chunk, $cb) = @_;
88

            
89
  $self->{buffer} .= $chunk;
90
  if ($cb) { $self->once(drain => $cb) }
91
  else     { return $self unless length $self->{buffer} }
92
  $self->reactor->watch($self->{handle}, !$self->{paused}, 1)
93
    if $self->{handle};
94

            
95
  return $self;
96
}
97

            
98
sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} }
99

            
100
sub _error {
101
  my $self = shift;
102

            
103
  # Retry
104
  return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
105

            
106
  # Closed
107
  return $self->close if $! == ECONNRESET || $! == EPIPE;
108

            
109
  # Error
110
  $self->emit(error => $!)->close;
111
}
112

            
113
sub _read {
114
  my $self = shift;
115
  my $read = $self->{handle}->sysread(my $buffer, 131072, 0);
116
  return $self->_error unless defined $read;
117
  return $self->close if $read == 0;
118
  $self->emit_safe(read => $buffer)->_again;
119
}
120

            
121
sub _write {
122
  my $self = shift;
123

            
124
  my $handle = $self->{handle};
125
  if (length $self->{buffer}) {
126
    my $written = $handle->syswrite($self->{buffer});
127
    return $self->_error unless defined $written;
128
    $self->emit_safe(write => substr($self->{buffer}, 0, $written, ''));
129
    $self->_again;
130
  }
131

            
132
  $self->emit_safe('drain') unless length $self->{buffer};
133
  return if $self->is_writing;
134
  return $self->close if $self->{graceful};
135
  $self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle};
136
}
137

            
138
1;
139

            
140
=encoding utf8
141

            
142
=head1 NAME
143

            
144
Mojo::IOLoop::Stream - Non-blocking I/O stream
145

            
146
=head1 SYNOPSIS
147

            
148
  use Mojo::IOLoop::Stream;
149

            
150
  # Create stream
151
  my $stream = Mojo::IOLoop::Stream->new($handle);
152
  $stream->on(read => sub {
153
    my ($stream, $bytes) = @_;
154
    ...
155
  });
156
  $stream->on(close => sub {
157
    my $stream = shift;
158
    ...
159
  });
160
  $stream->on(error => sub {
161
    my ($stream, $err) = @_;
162
    ...
163
  });
164

            
165
  # Start and stop watching for new data
166
  $stream->start;
167
  $stream->stop;
168

            
169
  # Start reactor if necessary
170
  $stream->reactor->start unless $stream->reactor->is_running;
171

            
172
=head1 DESCRIPTION
173

            
174
L<Mojo::IOLoop::Stream> is a container for I/O streams used by
175
L<Mojo::IOLoop>.
176

            
177
=head1 EVENTS
178

            
179
L<Mojo::IOLoop::Stream> inherits all events from L<Mojo::EventEmitter> and can
180
emit the following new ones.
181

            
182
=head2 close
183

            
184
  $stream->on(close => sub {
185
    my $stream = shift;
186
    ...
187
  });
188

            
189
Emitted safely if the stream gets closed.
190

            
191
=head2 drain
192

            
193
  $stream->on(drain => sub {
194
    my $stream = shift;
195
    ...
196
  });
197

            
198
Emitted safely once all data has been written.
199

            
200
=head2 error
201

            
202
  $stream->on(error => sub {
203
    my ($stream, $err) = @_;
204
    ...
205
  });
206

            
207
Emitted if an error occurs on the stream, fatal if unhandled.
208

            
209
=head2 read
210

            
211
  $stream->on(read => sub {
212
    my ($stream, $bytes) = @_;
213
    ...
214
  });
215

            
216
Emitted safely if new data arrives on the stream.
217

            
218
=head2 timeout
219

            
220
  $stream->on(timeout => sub {
221
    my $stream = shift;
222
    ...
223
  });
224

            
225
Emitted safely if the stream has been inactive for too long and will get
226
closed automatically.
227

            
228
=head2 write
229

            
230
  $stream->on(write => sub {
231
    my ($stream, $bytes) = @_;
232
    ...
233
  });
234

            
235
Emitted safely if new data has been written to the stream.
236

            
237
=head1 ATTRIBUTES
238

            
239
L<Mojo::IOLoop::Stream> implements the following attributes.
240

            
241
=head2 reactor
242

            
243
  my $reactor = $stream->reactor;
244
  $stream     = $stream->reactor(Mojo::Reactor::Poll->new);
245

            
246
Low level event reactor, defaults to the C<reactor> attribute value of the
247
global L<Mojo::IOLoop> singleton.
248

            
249
=head1 METHODS
250

            
251
L<Mojo::IOLoop::Stream> inherits all methods from L<Mojo::EventEmitter> and
252
implements the following new ones.
253

            
254
=head2 new
255

            
256
  my $stream = Mojo::IOLoop::Stream->new($handle);
257

            
258
Construct a new L<Mojo::IOLoop::Stream> object.
259

            
260
=head2 close
261

            
262
  $stream->close;
263

            
264
Close stream immediately.
265

            
266
=head2 close_gracefully
267

            
268
  $stream->close_gracefully;
269

            
270
Close stream gracefully.
271

            
272
=head2 handle
273

            
274
  my $handle = $stream->handle;
275

            
276
Get handle for stream.
277

            
278
=head2 is_readable
279

            
280
  my $bool = $stream->is_readable;
281

            
282
Quick non-blocking check if stream is readable, useful for identifying tainted
283
sockets.
284

            
285
=head2 is_writing
286

            
287
  my $bool = $stream->is_writing;
288

            
289
Check if stream is writing.
290

            
291
=head2 start
292

            
293
  $stream->start;
294

            
295
Start watching for new data on the stream.
296

            
297
=head2 stop
298

            
299
  $stream->stop;
300

            
301
Stop watching for new data on the stream.
302

            
303
=head2 steal_handle
304

            
305
  my $handle = $stream->steal_handle;
306

            
307
Steal handle from stream and prevent it from getting closed automatically.
308

            
309
=head2 timeout
310

            
311
  my $timeout = $stream->timeout;
312
  $stream     = $stream->timeout(45);
313

            
314
Maximum amount of time in seconds stream can be inactive before getting closed
315
automatically, defaults to C<15>. Setting the value to C<0> will allow this
316
stream to be inactive indefinitely.
317

            
318
=head2 write
319

            
320
  $stream = $stream->write($bytes);
321
  $stream = $stream->write($bytes => sub {...});
322

            
323
Write data to stream, the optional drain callback will be invoked once all
324
data has been written.
325

            
326
=head1 SEE ALSO
327

            
328
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
329

            
330
=cut