Newer Older
491 lines | 12.445kb
add files
Yuki Kimoto authored on 2014-03-26
1
package Mojo::Server::Prefork;
2
use Mojo::Base 'Mojo::Server::Daemon';
3

            
4
use Fcntl ':flock';
5
use File::Spec::Functions qw(catfile tmpdir);
6
use IO::Poll 'POLLIN';
7
use List::Util 'shuffle';
8
use Mojo::Util 'steady_time';
9
use POSIX 'WNOHANG';
10
use Scalar::Util 'weaken';
11
use Time::HiRes ();
12

            
13
has accepts         => 1000;
14
has accept_interval => 0.025;
15
has [qw(graceful_timeout heartbeat_timeout)] => 20;
16
has heartbeat_interval => 5;
17
has lock_file          => sub { catfile tmpdir, 'prefork.lock' };
18
has lock_timeout       => 1;
19
has multi_accept       => 50;
20
has pid_file           => sub { catfile tmpdir, 'prefork.pid' };
21
has workers            => 4;
22

            
23
sub DESTROY {
24
  my $self = shift;
25

            
26
  # Worker
27
  return unless $self->{finished};
28

            
29
  # Manager
30
  if (my $file = $self->{lock_file}) { unlink $file if -w $file }
31
  if (my $file = $self->pid_file)    { unlink $file if -w $file }
32
}
33

            
34
sub check_pid {
35
  my $file = shift->pid_file;
36
  return undef unless open my $handle, '<', $file;
37
  my $pid = <$handle>;
38
  chomp $pid;
39

            
40
  # Running
41
  return $pid if $pid && kill 0, $pid;
42

            
43
  # Not running
44
  unlink $file if -w $file;
45
  return undef;
46
}
47

            
48
sub run {
49
  my $self = shift;
50

            
51
  # No Windows support
52
  say 'Preforking not available for Windows.' and exit 0 if $^O eq 'MSWin32';
53

            
54
  # Prepare lock file and event loop
55
  $self->{lock_file} = $self->lock_file . ".$$";
56
  my $loop = $self->ioloop->max_accepts($self->accepts);
57
  $loop->$_($self->$_) for qw(accept_interval multi_accept);
58

            
59
  # Pipe for worker communication
60
  pipe($self->{reader}, $self->{writer}) or die "Can't create pipe: $!";
61
  $self->{poll} = IO::Poll->new;
62
  $self->{poll}->mask($self->{reader}, POLLIN);
63

            
64
  # Clean manager environment
65
  local $SIG{INT} = local $SIG{TERM} = sub { $self->_term };
66
  local $SIG{CHLD} = sub {
67
    while ((my $pid = waitpid -1, WNOHANG) > 0) {
68
      $self->app->log->debug("Worker $pid stopped.")
69
        if delete $self->emit(reap => $pid)->{pool}{$pid};
70
    }
71
  };
72
  local $SIG{QUIT} = sub { $self->_term(1) };
73
  local $SIG{TTIN} = sub { $self->workers($self->workers + 1) };
74
  local $SIG{TTOU} = sub {
75
    $self->workers($self->workers - 1) if $self->workers > 0;
76
    return unless $self->workers;
77
    $self->{pool}{shuffle keys %{$self->{pool}}}{graceful} ||= steady_time;
78
  };
79

            
80
  # Preload application before starting workers
81
  $self->start->app->log->info("Manager $$ started.");
82
  $self->{running} = 1;
83
  $self->_manage while $self->{running};
84
}
85

            
86
sub _heartbeat {
87
  my $self = shift;
88

            
89
  # Poll for heartbeats
90
  my $poll = $self->{poll};
91
  $poll->poll(1);
92
  return unless $poll->handles(POLLIN);
93
  return unless $self->{reader}->sysread(my $chunk, 4194304);
94

            
95
  # Update heartbeats
96
  my $time = steady_time;
97
  $self->{pool}{$1} and $self->emit(heartbeat => $1)->{pool}{$1}{time} = $time
98
    while $chunk =~ /(\d+)\n/g;
99
}
100

            
101
sub _manage {
102
  my $self = shift;
103

            
104
  # Spawn more workers and check PID file
105
  if (!$self->{finished}) {
106
    $self->_spawn while keys %{$self->{pool}} < $self->workers;
107
    $self->_pid_file;
108
  }
109

            
110
  # Shutdown
111
  elsif (!keys %{$self->{pool}}) { return delete $self->{running} }
112

            
113
  # Manage workers
114
  $self->emit('wait')->_heartbeat;
115
  my $log = $self->app->log;
116
  for my $pid (keys %{$self->{pool}}) {
117
    next unless my $w = $self->{pool}{$pid};
118

            
119
    # No heartbeat (graceful stop)
120
    my $interval = $self->heartbeat_interval;
121
    my $timeout  = $self->heartbeat_timeout;
122
    my $time     = steady_time;
123
    if (!$w->{graceful} && ($w->{time} + $interval + $timeout <= $time)) {
124
      $log->info("Worker $pid has no heartbeat, restarting.");
125
      $w->{graceful} = $time;
126
    }
127

            
128
    # Graceful stop with timeout
129
    $w->{graceful} ||= $time if $self->{graceful};
130
    if ($w->{graceful}) {
131
      $log->debug("Trying to stop worker $pid gracefully.");
132
      kill 'QUIT', $pid;
133
      $w->{force} = 1 if $w->{graceful} + $self->graceful_timeout <= $time;
134
    }
135

            
136
    # Normal stop
137
    if (($self->{finished} && !$self->{graceful}) || $w->{force}) {
138
      $log->debug("Stopping worker $pid.");
139
      kill 'KILL', $pid;
140
    }
141
  }
142
}
143

            
144
sub _pid_file {
145
  my $self = shift;
146

            
147
  # Check if PID file already exists
148
  return if -e (my $file = $self->pid_file);
149

            
150
  # Create PID file
151
  $self->app->log->info(qq{Creating process id file "$file".});
152
  die qq{Can't create process id file "$file": $!}
153
    unless open my $handle, '>', $file;
154
  chmod 0644, $handle;
155
  print $handle $$;
156
}
157

            
158
sub _spawn {
159
  my $self = shift;
160

            
161
  # Manager
162
  die "Can't fork: $!" unless defined(my $pid = fork);
163
  return $self->emit(spawn => $pid)->{pool}{$pid} = {time => steady_time}
164
    if $pid;
165

            
166
  # Prepare lock file
167
  my $file = $self->{lock_file};
168
  die qq{Can't open lock file "$file": $!} unless open my $handle, '>', $file;
169

            
170
  # Change user/group
171
  my $loop = $self->setuidgid->ioloop;
172

            
173
  # Accept mutex
174
  $loop->lock(
175
    sub {
176

            
177
      # Blocking ("ualarm" can't be imported on Windows)
178
      my $lock;
179
      if ($_[1]) {
180
        eval {
181
          local $SIG{ALRM} = sub { die "alarm\n" };
182
          my $old = Time::HiRes::ualarm $self->lock_timeout * 1000000;
183
          $lock = flock $handle, LOCK_EX;
184
          Time::HiRes::ualarm $old;
185
        };
186
        if ($@) { $lock = $@ eq "alarm\n" ? 0 : die($@) }
187
      }
188

            
189
      # Non blocking
190
      else { $lock = flock $handle, LOCK_EX | LOCK_NB }
191

            
192
      return $lock;
193
    }
194
  );
195
  $loop->unlock(sub { flock $handle, LOCK_UN });
196

            
197
  # Heartbeat messages (stop sending during graceful stop)
198
  weaken $self;
199
  $loop->recurring(
200
    $self->heartbeat_interval => sub {
201
      return unless shift->max_connections;
202
      $self->{writer}->syswrite("$$\n") or exit 0;
203
    }
204
  );
205

            
206
  # Clean worker environment
207
  $SIG{$_} = 'DEFAULT' for qw(INT TERM CHLD TTIN TTOU);
208
  $SIG{QUIT} = sub { $loop->max_connections(0) };
209
  delete @$self{qw(poll reader)};
210

            
211
  $self->app->log->debug("Worker $$ started.");
212
  $loop->start;
213
  exit 0;
214
}
215

            
216
sub _term {
217
  my ($self, $graceful) = @_;
218
  $self->emit(finish => $graceful)->{finished} = 1;
219
  $self->{graceful} = 1 if $graceful;
220
}
221

            
222
1;
223

            
224
=encoding utf8
225

            
226
=head1 NAME
227

            
228
Mojo::Server::Prefork - Preforking non-blocking I/O HTTP and WebSocket server
229

            
230
=head1 SYNOPSIS
231

            
232
  use Mojo::Server::Prefork;
233

            
234
  my $prefork = Mojo::Server::Prefork->new(listen => ['http://*:8080']);
235
  $prefork->unsubscribe('request');
236
  $prefork->on(request => sub {
237
    my ($prefork, $tx) = @_;
238

            
239
    # Request
240
    my $method = $tx->req->method;
241
    my $path   = $tx->req->url->path;
242

            
243
    # Response
244
    $tx->res->code(200);
245
    $tx->res->headers->content_type('text/plain');
246
    $tx->res->body("$method request for $path!");
247

            
248
    # Resume transaction
249
    $tx->resume;
250
  });
251
  $prefork->run;
252

            
253
=head1 DESCRIPTION
254

            
255
L<Mojo::Server::Prefork> is a full featured, UNIX optimized, preforking
256
non-blocking I/O HTTP and WebSocket server, built around the very well tested
257
and reliable L<Mojo::Server::Daemon>, with IPv6, TLS, Comet (long polling),
258
keep-alive, connection pooling, timeout, cookie, multipart and multiple event
259
loop support. Note that the server uses signals for process management, so you
260
should avoid modifying signal handlers in your applications.
261

            
262
For better scalability (epoll, kqueue) and to provide IPv6 as well as TLS
263
support, the optional modules L<EV> (4.0+), L<IO::Socket::IP> (0.16+) and
264
L<IO::Socket::SSL> (1.75+) will be used automatically by L<Mojo::IOLoop> if
265
they are installed. Individual features can also be disabled with the
266
MOJO_NO_IPV6 and MOJO_NO_TLS environment variables.
267

            
268
See L<Mojolicious::Guides::Cookbook> for more.
269

            
270
=head1 MANAGER SIGNALS
271

            
272
The L<Mojo::Server::Prefork> manager process can be controlled at runtime with
273
the following signals.
274

            
275
=head2 INT, TERM
276

            
277
Shutdown server immediately.
278

            
279
=head2 QUIT
280

            
281
Shutdown server gracefully.
282

            
283
=head2 TTIN
284

            
285
Increase worker pool by one.
286

            
287
=head2 TTOU
288

            
289
Decrease worker pool by one.
290

            
291
=head1 WORKER SIGNALS
292

            
293
L<Mojo::Server::Prefork> worker processes can be controlled at runtime with
294
the following signals.
295

            
296
=head2 INT, TERM
297

            
298
Stop worker immediately.
299

            
300
=head2 QUIT
301

            
302
Stop worker gracefully.
303

            
304
=head1 EVENTS
305

            
306
L<Mojo::Server::Prefork> inherits all events from L<Mojo::Server::Daemon> and
307
can emit the following new ones.
308

            
309
=head2 finish
310

            
311
  $prefork->on(finish => sub {
312
    my ($prefork, $graceful) = @_;
313
    ...
314
  });
315

            
316
Emitted when the server shuts down.
317

            
318
  $prefork->on(finish => sub {
319
    my ($prefork, $graceful) = @_;
320
    say $graceful ? 'Graceful server shutdown' : 'Server shutdown';
321
  });
322

            
323
=head2 heartbeat
324

            
325
  $prefork->on(heartbeat => sub {
326
    my ($prefork, $pid) = @_;
327
    ...
328
  });
329

            
330
Emitted when a heartbeat message has been received from a worker.
331

            
332
  $prefork->on(heartbeat => sub {
333
    my ($prefork, $pid) = @_;
334
    say "Worker $pid has a heartbeat";
335
  });
336

            
337
=head2 reap
338

            
339
  $prefork->on(reap => sub {
340
    my ($prefork, $pid) = @_;
341
    ...
342
  });
343

            
344
Emitted when a child process dies.
345

            
346
  $prefork->on(reap => sub {
347
    my ($prefork, $pid) = @_;
348
    say "Worker $pid stopped";
349
  });
350

            
351
=head2 spawn
352

            
353
  $prefork->on(spawn => sub {
354
    my ($prefork, $pid) = @_;
355
    ...
356
  });
357

            
358
Emitted when a worker process is spawned.
359

            
360
  $prefork->on(spawn => sub {
361
    my ($prefork, $pid) = @_;
362
    say "Worker $pid started";
363
  });
364

            
365
=head2 wait
366

            
367
  $prefork->on(wait => sub {
368
    my $prefork = shift;
369
    ...
370
  });
371

            
372
Emitted when the manager starts waiting for new heartbeat messages.
373

            
374
  $prefork->on(wait => sub {
375
    my $prefork = shift;
376
    my $workers = $prefork->workers;
377
    say "Waiting for heartbeat messages from $workers workers";
378
  });
379

            
380
=head1 ATTRIBUTES
381

            
382
L<Mojo::Server::Prefork> inherits all attributes from L<Mojo::Server::Daemon>
383
and implements the following new ones.
384

            
385
=head2 accept_interval
386

            
387
  my $interval = $prefork->accept_interval;
388
  $prefork     = $prefork->accept_interval(0.5);
389

            
390
Interval in seconds for trying to reacquire the accept mutex, defaults to
391
C<0.025>. Note that changing this value can affect performance and idle CPU
392
usage.
393

            
394
=head2 accepts
395

            
396
  my $accepts = $prefork->accepts;
397
  $prefork    = $prefork->accepts(100);
398

            
399
Maximum number of connections a worker is allowed to accept before stopping
400
gracefully, defaults to C<1000>. Setting the value to C<0> will allow workers
401
to accept new connections indefinitely. Note that up to half of this value can
402
be subtracted randomly to improve load balancing, and that worker processes
403
will stop sending heartbeat messages once the limit has been reached.
404

            
405
=head2 graceful_timeout
406

            
407
  my $timeout = $prefork->graceful_timeout;
408
  $prefork    = $prefork->graceful_timeout(15);
409

            
410
Maximum amount of time in seconds stopping a worker gracefully may take before
411
being forced, defaults to C<20>.
412

            
413
=head2 heartbeat_interval
414

            
415
  my $interval = $prefork->heartbeat_intrval;
416
  $prefork     = $prefork->heartbeat_interval(3);
417

            
418
Heartbeat interval in seconds, defaults to C<5>.
419

            
420
=head2 heartbeat_timeout
421

            
422
  my $timeout = $prefork->heartbeat_timeout;
423
  $prefork    = $prefork->heartbeat_timeout(2);
424

            
425
Maximum amount of time in seconds before a worker without a heartbeat will be
426
stopped gracefully, defaults to C<20>.
427

            
428
=head2 lock_file
429

            
430
  my $file = $prefork->lock_file;
431
  $prefork = $prefork->lock_file('/tmp/prefork.lock');
432

            
433
Full path of accept mutex lock file prefix, to which the process id will be
434
appended, defaults to a random temporary path.
435

            
436
=head2 lock_timeout
437

            
438
  my $timeout = $prefork->lock_timeout;
439
  $prefork    = $prefork->lock_timeout(0.5);
440

            
441
Maximum amount of time in seconds a worker may block when waiting for the
442
accept mutex, defaults to C<1>. Note that changing this value can affect
443
performance and idle CPU usage.
444

            
445
=head2 multi_accept
446

            
447
  my $multi = $prefork->multi_accept;
448
  $prefork  = $prefork->multi_accept(100);
449

            
450
Number of connections to accept at once, defaults to C<50>.
451

            
452
=head2 pid_file
453

            
454
  my $file = $prefork->pid_file;
455
  $prefork = $prefork->pid_file('/tmp/prefork.pid');
456

            
457
Full path of process id file, defaults to a random temporary path.
458

            
459
=head2 workers
460

            
461
  my $workers = $prefork->workers;
462
  $prefork    = $prefork->workers(10);
463

            
464
Number of worker processes, defaults to C<4>. A good rule of thumb is two
465
worker processes per CPU core.
466

            
467
=head1 METHODS
468

            
469
L<Mojo::Server::Prefork> inherits all methods from L<Mojo::Server::Daemon> and
470
implements the following new ones.
471

            
472
=head2 check_pid
473

            
474
  my $pid = $prefork->check_pid;
475

            
476
Get process id for running server from L</"pid_file"> or delete it if server
477
is not running.
478

            
479
  say 'Server is not running' unless $prefork->check_pid;
480

            
481
=head2 run
482

            
483
  $prefork->run;
484

            
485
Run server.
486

            
487
=head1 SEE ALSO
488

            
489
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
490

            
491
=cut