Newer Older
607 lines | 16.001kb
add files
Yuki Kimoto authored on 2014-03-26
1
package Mojo::IOLoop;
2
use Mojo::Base -base;
3

            
4
# "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
5
#             used. Like the death ray."
6
use Carp 'croak';
7
use Mojo::IOLoop::Client;
8
use Mojo::IOLoop::Delay;
9
use Mojo::IOLoop::Server;
10
use Mojo::IOLoop::Stream;
11
use Mojo::Reactor::Poll;
12
use Mojo::Util qw(md5_sum steady_time);
13
use Scalar::Util qw(blessed weaken);
14

            
15
use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
16

            
17
has accept_interval => 0.025;
18
has [qw(lock unlock)];
19
has max_accepts     => 0;
20
has max_connections => 1000;
21
has multi_accept    => 50;
22
has reactor         => sub {
23
  my $class = Mojo::Reactor::Poll->detect;
24
  warn "-- Reactor initialized ($class)\n" if DEBUG;
25
  my $reactor = $class->new;
26
  $reactor->on(error => sub { warn "@{[blessed $_[0]]}: $_[1]" });
27
  return $reactor;
28
};
29

            
30
# Ignore PIPE signal
31
$SIG{PIPE} = 'IGNORE';
32

            
33
# Initialize singleton reactor early
34
__PACKAGE__->singleton->reactor;
35

            
36
sub acceptor {
37
  my ($self, $acceptor) = (_instance(shift), @_);
38

            
39
  # Find acceptor for id
40
  return $self->{acceptors}{$acceptor} unless ref $acceptor;
41

            
42
  # Connect acceptor with reactor
43
  my $id = $self->_id;
44
  $self->{acceptors}{$id} = $acceptor;
45
  weaken $acceptor->reactor($self->reactor)->{reactor};
46
  $self->{accepts} = $self->max_accepts if $self->max_accepts;
47

            
48
  # Allow new acceptor to get picked up
49
  $self->_not_accepting;
50

            
51
  return $id;
52
}
53

            
54
sub client {
55
  my ($self, $cb) = (_instance(shift), pop);
56

            
57
  # Make sure timers are running
58
  $self->_recurring;
59

            
60
  my $id = $self->_id;
61
  my $client = $self->{connections}{$id}{client} = Mojo::IOLoop::Client->new;
62
  weaken $client->reactor($self->reactor)->{reactor};
63

            
64
  weaken $self;
65
  $client->on(
66
    connect => sub {
67
      delete $self->{connections}{$id}{client};
68
      my $stream = Mojo::IOLoop::Stream->new(pop);
69
      $self->_stream($stream => $id);
70
      $self->$cb(undef, $stream);
71
    }
72
  );
73
  $client->on(
74
    error => sub {
75
      $self->_remove($id);
76
      $self->$cb(pop, undef);
77
    }
78
  );
79
  $client->connect(@_);
80

            
81
  return $id;
82
}
83

            
84
sub delay {
85
  my $self = _instance(shift);
86

            
87
  my $delay = Mojo::IOLoop::Delay->new;
88
  weaken $delay->ioloop($self)->{ioloop};
89
  @_ > 1 ? $delay->steps(@_) : $delay->once(finish => shift) if @_;
90

            
91
  return $delay;
92
}
93

            
94
sub generate_port { Mojo::IOLoop::Server->generate_port }
95

            
96
sub is_running { (ref $_[0] ? $_[0] : $_[0]->singleton)->reactor->is_running }
97
sub one_tick   { (ref $_[0] ? $_[0] : $_[0]->singleton)->reactor->one_tick }
98

            
99
sub recurring { shift->_timer(recurring => @_) }
100

            
101
sub remove {
102
  my ($self, $id) = (_instance(shift), @_);
103
  my $c = $self->{connections}{$id};
104
  if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
105
  $self->_remove($id);
106
}
107

            
108
sub server {
109
  my ($self, $cb) = (_instance(shift), pop);
110

            
111
  my $server = Mojo::IOLoop::Server->new;
112
  weaken $self;
113
  $server->on(
114
    accept => sub {
115
      my $stream = Mojo::IOLoop::Stream->new(pop);
116
      $self->$cb($stream, $self->stream($stream));
117
    }
118
  );
119
  $server->listen(@_);
120

            
121
  return $self->acceptor($server);
122
}
123

            
124
sub singleton { state $loop = shift->SUPER::new }
125

            
126
sub start {
127
  my $self = shift;
128
  croak 'Mojo::IOLoop already running' if $self->is_running;
129
  (ref $self ? $self : $self->singleton)->reactor->start;
130
}
131

            
132
sub stop { (ref $_[0] ? $_[0] : $_[0]->singleton)->reactor->stop }
133

            
134
sub stream {
135
  my ($self, $stream) = (_instance(shift), @_);
136

            
137
  # Find stream for id
138
  return ($self->{connections}{$stream} || {})->{stream} unless ref $stream;
139

            
140
  # Release accept mutex
141
  $self->_not_accepting;
142

            
143
  # Enforce connection limit (randomize to improve load balancing)
144
  $self->max_connections(0)
145
    if defined $self->{accepts} && ($self->{accepts} -= int(rand 2) + 1) <= 0;
146

            
147
  return $self->_stream($stream, $self->_id);
148
}
149

            
150
sub timer { shift->_timer(timer => @_) }
151

            
152
sub _accepting {
153
  my $self = shift;
154

            
155
  # Check if we have acceptors
156
  my $acceptors = $self->{acceptors} ||= {};
157
  return $self->_remove(delete $self->{accept}) unless keys %$acceptors;
158

            
159
  # Check connection limit
160
  my $i   = keys %{$self->{connections}};
161
  my $max = $self->max_connections;
162
  return unless $i < $max;
163

            
164
  # Acquire accept mutex
165
  if (my $cb = $self->lock) { return unless $self->$cb(!$i) }
166
  $self->_remove(delete $self->{accept});
167

            
168
  # Check if multi-accept is desirable
169
  my $multi = $self->multi_accept;
170
  $_->multi_accept($max < $multi ? 1 : $multi)->start for values %$acceptors;
171
  $self->{accepting}++;
172
}
173

            
174
sub _id {
175
  my $self = shift;
176
  my $id;
177
  do { $id = md5_sum('c' . steady_time . rand 999) }
178
    while $self->{connections}{$id} || $self->{acceptors}{$id};
179
  return $id;
180
}
181

            
182
sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
183

            
184
sub _not_accepting {
185
  my $self = shift;
186

            
187
  # Make sure timers are running
188
  $self->_recurring;
189

            
190
  # Release accept mutex
191
  return unless delete $self->{accepting};
192
  return unless my $cb = $self->unlock;
193
  $self->$cb;
194

            
195
  $_->stop for values %{$self->{acceptors} || {}};
196
}
197

            
198
sub _recurring {
199
  my $self = shift;
200
  $self->{accept} ||= $self->recurring($self->accept_interval => \&_accepting);
201
  $self->{stop} ||= $self->recurring(1 => \&_stop);
202
}
203

            
204
sub _remove {
205
  my ($self, $id) = @_;
206

            
207
  # Timer
208
  return unless my $reactor = $self->reactor;
209
  return if $reactor->remove($id);
210

            
211
  # Acceptor
212
  if (delete $self->{acceptors}{$id}) { $self->_not_accepting }
213

            
214
  # Connection
215
  else { delete $self->{connections}{$id} }
216
}
217

            
218
sub _stop {
219
  my $self = shift;
220
  return      if keys %{$self->{connections}};
221
  $self->stop if $self->max_connections == 0;
222
  return      if keys %{$self->{acceptors}};
223
  $self->{$_} && $self->_remove(delete $self->{$_}) for qw(accept stop);
224
}
225

            
226
sub _stream {
227
  my ($self, $stream, $id) = @_;
228

            
229
  # Make sure timers are running
230
  $self->_recurring;
231

            
232
  # Connect stream with reactor
233
  $self->{connections}{$id}{stream} = $stream;
234
  weaken $stream->reactor($self->reactor)->{reactor};
235
  weaken $self;
236
  $stream->on(close => sub { $self && $self->_remove($id) });
237
  $stream->start;
238

            
239
  return $id;
240
}
241

            
242
sub _timer {
243
  my ($self, $method, $after, $cb) = (_instance(shift), @_);
244
  weaken $self;
245
  return $self->reactor->$method($after => sub { $self->$cb });
246
}
247

            
248
1;
249

            
250
=encoding utf8
251

            
252
=head1 NAME
253

            
254
Mojo::IOLoop - Minimalistic event loop
255

            
256
=head1 SYNOPSIS
257

            
258
  use Mojo::IOLoop;
259

            
260
  # Listen on port 3000
261
  Mojo::IOLoop->server({port => 3000} => sub {
262
    my ($loop, $stream) = @_;
263

            
264
    $stream->on(read => sub {
265
      my ($stream, $bytes) = @_;
266

            
267
      # Process input chunk
268
      say $bytes;
269

            
270
      # Write response
271
      $stream->write('HTTP/1.1 200 OK');
272
    });
273
  });
274

            
275
  # Connect to port 3000
276
  my $id = Mojo::IOLoop->client({port => 3000} => sub {
277
    my ($loop, $err, $stream) = @_;
278

            
279
    $stream->on(read => sub {
280
      my ($stream, $bytes) = @_;
281

            
282
      # Process input
283
      say "Input: $bytes";
284
    });
285

            
286
    # Write request
287
    $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
288
  });
289

            
290
  # Add a timer
291
  Mojo::IOLoop->timer(5 => sub {
292
    my $loop = shift;
293
    $loop->remove($id);
294
  });
295

            
296
  # Start event loop if necessary
297
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
298

            
299
=head1 DESCRIPTION
300

            
301
L<Mojo::IOLoop> is a very minimalistic event loop based on L<Mojo::Reactor>,
302
it has been reduced to the absolute minimal feature set required to build
303
solid and scalable non-blocking TCP clients and servers.
304

            
305
The event loop will be resilient to time jumps if a monotonic clock is
306
available through L<Time::HiRes>. A TLS certificate and key are also built
307
right in, to make writing test servers as easy as possible. Also note that for
308
convenience the C<PIPE> signal will be set to C<IGNORE> when L<Mojo::IOLoop>
309
is loaded.
310

            
311
For better scalability (epoll, kqueue) and to provide IPv6 as well as TLS
312
support, the optional modules L<EV> (4.0+), L<IO::Socket::IP> (0.16+) and
313
L<IO::Socket::SSL> (1.75+) will be used automatically if they are installed.
314
Individual features can also be disabled with the MOJO_NO_IPV6 and MOJO_NO_TLS
315
environment variables.
316

            
317
See L<Mojolicious::Guides::Cookbook> for more.
318

            
319
=head1 ATTRIBUTES
320

            
321
L<Mojo::IOLoop> implements the following attributes.
322

            
323
=head2 accept_interval
324

            
325
  my $interval = $loop->accept_interval;
326
  $loop        = $loop->accept_interval(0.5);
327

            
328
Interval in seconds for trying to reacquire the accept mutex, defaults to
329
C<0.025>. Note that changing this value can affect performance and idle CPU
330
usage.
331

            
332
=head2 lock
333

            
334
  my $cb = $loop->lock;
335
  $loop  = $loop->lock(sub {...});
336

            
337
A callback for acquiring the accept mutex, used to sync multiple server
338
processes. The callback should return true or false. Note that exceptions in
339
this callback are not captured.
340

            
341
  $loop->lock(sub {
342
    my ($loop, $blocking) = @_;
343

            
344
    # Got the accept mutex, start accepting new connections
345
    return 1;
346
  });
347

            
348
=head2 max_accepts
349

            
350
  my $max = $loop->max_accepts;
351
  $loop   = $loop->max_accepts(1000);
352

            
353
The maximum number of connections this event loop is allowed to accept before
354
shutting down gracefully without interrupting existing connections, defaults
355
to C<0>. Setting the value to C<0> will allow this event loop to accept new
356
connections indefinitely. Note that up to half of this value can be subtracted
357
randomly to improve load balancing between multiple server processes.
358

            
359
=head2 max_connections
360

            
361
  my $max = $loop->max_connections;
362
  $loop   = $loop->max_connections(1000);
363

            
364
The maximum number of parallel connections this event loop is allowed to
365
handle before stopping to accept new incoming connections, defaults to
366
C<1000>. Setting the value to C<0> will make this event loop stop accepting
367
new connections and allow it to shut down gracefully without interrupting
368
existing connections.
369

            
370
=head2 multi_accept
371

            
372
  my $multi = $loop->multi_accept;
373
  $loop     = $loop->multi_accept(100);
374

            
375
Number of connections to accept at once, defaults to C<50>.
376

            
377
=head2 reactor
378

            
379
  my $reactor = $loop->reactor;
380
  $loop       = $loop->reactor(Mojo::Reactor->new);
381

            
382
Low level event reactor, usually a L<Mojo::Reactor::Poll> or
383
L<Mojo::Reactor::EV> object with a default C<error> event.
384

            
385
  # Watch if handle becomes readable or writable
386
  $loop->reactor->io($handle => sub {
387
    my ($reactor, $writable) = @_;
388
    say $writable ? 'Handle is writable' : 'Handle is readable';
389
  });
390

            
391
  # Change to watching only if handle becomes writable
392
  $loop->reactor->watch($handle, 0, 1);
393

            
394
=head2 unlock
395

            
396
  my $cb = $loop->unlock;
397
  $loop  = $loop->unlock(sub {...});
398

            
399
A callback for releasing the accept mutex, used to sync multiple server
400
processes. Note that exceptions in this callback are not captured.
401

            
402
=head1 METHODS
403

            
404
L<Mojo::IOLoop> inherits all methods from L<Mojo::Base> and implements the
405
following new ones.
406

            
407
=head2 acceptor
408

            
409
  my $server = Mojo::IOLoop->acceptor($id);
410
  my $server = $loop->acceptor($id);
411
  my $id     = $loop->acceptor(Mojo::IOLoop::Server->new);
412

            
413
Get L<Mojo::IOLoop::Server> object for id or turn object into an acceptor.
414

            
415
=head2 client
416

            
417
  my $id
418
    = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
419
  my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
420
  my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
421

            
422
Open TCP connection with L<Mojo::IOLoop::Client>, takes the same arguments as
423
L<Mojo::IOLoop::Client/"connect">.
424

            
425
  # Connect to localhost on port 3000
426
  Mojo::IOLoop->client({port => 3000} => sub {
427
    my ($loop, $err, $stream) = @_;
428
    ...
429
  });
430

            
431
=head2 delay
432

            
433
  my $delay = Mojo::IOLoop->delay;
434
  my $delay = $loop->delay;
435
  my $delay = $loop->delay(sub {...});
436
  my $delay = $loop->delay(sub {...}, sub {...});
437

            
438
Get L<Mojo::IOLoop::Delay> object to manage callbacks and control the flow of
439
events. A single callback will be treated as a subscriber to the C<finish>
440
event, and multiple ones as a chain of steps.
441

            
442
  # Synchronize multiple events
443
  my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' });
444
  for my $i (1 .. 10) {
445
    my $end = $delay->begin;
446
    Mojo::IOLoop->timer($i => sub {
447
      say 10 - $i;
448
      $end->();
449
    });
450
  }
451
  $delay->wait unless Mojo::IOLoop->is_running;
452

            
453
  # Sequentialize multiple events
454
  my $delay = Mojo::IOLoop->delay(
455

            
456
    # First step (simple timer)
457
    sub {
458
      my $delay = shift;
459
      Mojo::IOLoop->timer(2 => $delay->begin);
460
      say 'Second step in 2 seconds.';
461
    },
462

            
463
    # Second step (parallel timers)
464
    sub {
465
      my $delay = shift;
466
      Mojo::IOLoop->timer(1 => $delay->begin);
467
      Mojo::IOLoop->timer(3 => $delay->begin);
468
      say 'Third step in 3 seconds.';
469
    },
470

            
471
    # Third step (the end)
472
    sub { say 'And done after 5 seconds total.' }
473
  );
474
  $delay->wait unless Mojo::IOLoop->is_running;
475

            
476
=head2 generate_port
477

            
478
  my $port = Mojo::IOLoop->generate_port;
479
  my $port = $loop->generate_port;
480

            
481
Find a free TCP port, this is a utility function primarily used for tests.
482

            
483
=head2 is_running
484

            
485
  my $bool = Mojo::IOLoop->is_running;
486
  my $bool = $loop->is_running;
487

            
488
Check if event loop is running.
489

            
490
  exit unless Mojo::IOLoop->is_running;
491

            
492
=head2 one_tick
493

            
494
  Mojo::IOLoop->one_tick;
495
  $loop->one_tick;
496

            
497
Run event loop until an event occurs. Note that this method can recurse back
498
into the reactor, so you need to be careful.
499

            
500
  # Don't block longer than 0.5 seconds
501
  my $id = Mojo::IOLoop->timer(0.5 => sub {});
502
  Mojo::IOLoop->one_tick;
503
  Mojo::IOLoop->remove($id);
504

            
505
=head2 recurring
506

            
507
  my $id = Mojo::IOLoop->recurring(0.5 => sub {...});
508
  my $id = $loop->recurring(3 => sub {...});
509

            
510
Create a new recurring timer, invoking the callback repeatedly after a given
511
amount of time in seconds.
512

            
513
  # Invoke as soon as possible
514
  Mojo::IOLoop->recurring(0 => sub { say 'Reactor tick.' });
515

            
516
=head2 remove
517

            
518
  Mojo::IOLoop->remove($id);
519
  $loop->remove($id);
520

            
521
Remove anything with an id, connections will be dropped gracefully by allowing
522
them to finish writing all data in their write buffers.
523

            
524
=head2 server
525

            
526
  my $id = Mojo::IOLoop->server(port => 3000, sub {...});
527
  my $id = $loop->server(port => 3000, sub {...});
528
  my $id = $loop->server({port => 3000} => sub {...});
529

            
530
Accept TCP connections with L<Mojo::IOLoop::Server>, takes the same arguments
531
as L<Mojo::IOLoop::Server/"listen">.
532

            
533
  # Listen on port 3000
534
  Mojo::IOLoop->server({port => 3000} => sub {
535
    my ($loop, $stream, $id) = @_;
536
    ...
537
  });
538

            
539
=head2 singleton
540

            
541
  my $loop = Mojo::IOLoop->singleton;
542

            
543
The global L<Mojo::IOLoop> singleton, used to access a single shared event
544
loop object from everywhere inside the process.
545

            
546
  # Many methods also allow you to take shortcuts
547
  Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
548
  Mojo::IOLoop->start;
549

            
550
  # Restart active timer
551
  my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
552
  Mojo::IOLoop->singleton->reactor->again($id);
553

            
554
=head2 start
555

            
556
  Mojo::IOLoop->start;
557
  $loop->start;
558

            
559
Start the event loop, this will block until L</"stop"> is called. Note that
560
some reactors stop automatically if there are no events being watched anymore.
561

            
562
  # Start event loop only if it is not running already
563
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
564

            
565
=head2 stop
566

            
567
  Mojo::IOLoop->stop;
568
  $loop->stop;
569

            
570
Stop the event loop, this will not interrupt any existing connections and the
571
event loop can be restarted by running L</"start"> again.
572

            
573
=head2 stream
574

            
575
  my $stream = Mojo::IOLoop->stream($id);
576
  my $stream = $loop->stream($id);
577
  my $id     = $loop->stream(Mojo::IOLoop::Stream->new);
578

            
579
Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.
580

            
581
  # Increase inactivity timeout for connection to 300 seconds
582
  Mojo::IOLoop->stream($id)->timeout(300);
583

            
584
=head2 timer
585

            
586
  my $id = Mojo::IOLoop->timer(5 => sub {...});
587
  my $id = $loop->timer(5 => sub {...});
588
  my $id = $loop->timer(0.25 => sub {...});
589

            
590
Create a new timer, invoking the callback after a given amount of time in
591
seconds.
592

            
593
  # Invoke as soon as possible
594
  Mojo::IOLoop->timer(0 => sub { say 'Next tick.' });
595

            
596
=head1 DEBUGGING
597

            
598
You can set the MOJO_IOLOOP_DEBUG environment variable to get some advanced
599
diagnostics information printed to C<STDERR>.
600

            
601
  MOJO_IOLOOP_DEBUG=1
602

            
603
=head1 SEE ALSO
604

            
605
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
606

            
607
=cut