add files
|
1 |
package Mojo::Server::Prefork; |
2 |
use Mojo::Base 'Mojo::Server::Daemon'; |
|
3 | ||
4 |
use Fcntl ':flock'; |
|
5 |
use File::Spec::Functions qw(catfile tmpdir); |
|
6 |
use IO::Poll 'POLLIN'; |
|
7 |
use List::Util 'shuffle'; |
|
8 |
use Mojo::Util 'steady_time'; |
|
9 |
use POSIX 'WNOHANG'; |
|
10 |
use Scalar::Util 'weaken'; |
|
11 |
use Time::HiRes (); |
|
12 | ||
13 |
has accepts => 1000; |
|
14 |
has accept_interval => 0.025; |
|
15 |
has [qw(graceful_timeout heartbeat_timeout)] => 20; |
|
16 |
has heartbeat_interval => 5; |
|
17 |
has lock_file => sub { catfile tmpdir, 'prefork.lock' }; |
|
18 |
has lock_timeout => 1; |
|
19 |
has multi_accept => 50; |
|
20 |
has pid_file => sub { catfile tmpdir, 'prefork.pid' }; |
|
21 |
has workers => 4; |
|
22 | ||
23 |
sub DESTROY { |
|
24 |
my $self = shift; |
|
25 | ||
26 |
# Worker |
|
27 |
return unless $self->{finished}; |
|
28 | ||
29 |
# Manager |
|
30 |
if (my $file = $self->{lock_file}) { unlink $file if -w $file } |
|
31 |
if (my $file = $self->pid_file) { unlink $file if -w $file } |
|
32 |
} |
|
33 | ||
34 |
sub check_pid { |
|
35 |
my $file = shift->pid_file; |
|
36 |
return undef unless open my $handle, '<', $file; |
|
37 |
my $pid = <$handle>; |
|
38 |
chomp $pid; |
|
39 | ||
40 |
# Running |
|
41 |
return $pid if $pid && kill 0, $pid; |
|
42 | ||
43 |
# Not running |
|
44 |
unlink $file if -w $file; |
|
45 |
return undef; |
|
46 |
} |
|
47 | ||
48 |
sub run { |
|
49 |
my $self = shift; |
|
50 | ||
51 |
# No Windows support |
|
52 |
say 'Preforking not available for Windows.' and exit 0 if $^O eq 'MSWin32'; |
|
53 | ||
54 |
# Prepare lock file and event loop |
|
55 |
$self->{lock_file} = $self->lock_file . ".$$"; |
|
56 |
my $loop = $self->ioloop->max_accepts($self->accepts); |
|
57 |
$loop->$_($self->$_) for qw(accept_interval multi_accept); |
|
58 | ||
59 |
# Pipe for worker communication |
|
60 |
pipe($self->{reader}, $self->{writer}) or die "Can't create pipe: $!"; |
|
61 |
$self->{poll} = IO::Poll->new; |
|
62 |
$self->{poll}->mask($self->{reader}, POLLIN); |
|
63 | ||
64 |
# Clean manager environment |
|
65 |
local $SIG{INT} = local $SIG{TERM} = sub { $self->_term }; |
|
66 |
local $SIG{CHLD} = sub { |
|
67 |
while ((my $pid = waitpid -1, WNOHANG) > 0) { |
|
68 |
$self->app->log->debug("Worker $pid stopped.") |
|
69 |
if delete $self->emit(reap => $pid)->{pool}{$pid}; |
|
70 |
} |
|
71 |
}; |
|
72 |
local $SIG{QUIT} = sub { $self->_term(1) }; |
|
73 |
local $SIG{TTIN} = sub { $self->workers($self->workers + 1) }; |
|
74 |
local $SIG{TTOU} = sub { |
|
75 |
$self->workers($self->workers - 1) if $self->workers > 0; |
|
76 |
return unless $self->workers; |
|
77 |
$self->{pool}{shuffle keys %{$self->{pool}}}{graceful} ||= steady_time; |
|
78 |
}; |
|
79 | ||
80 |
# Preload application before starting workers |
|
81 |
$self->start->app->log->info("Manager $$ started."); |
|
82 |
$self->{running} = 1; |
|
83 |
$self->_manage while $self->{running}; |
|
84 |
} |
|
85 | ||
86 |
sub _heartbeat { |
|
87 |
my $self = shift; |
|
88 | ||
89 |
# Poll for heartbeats |
|
90 |
my $poll = $self->{poll}; |
|
91 |
$poll->poll(1); |
|
92 |
return unless $poll->handles(POLLIN); |
|
93 |
return unless $self->{reader}->sysread(my $chunk, 4194304); |
|
94 | ||
95 |
# Update heartbeats |
|
96 |
my $time = steady_time; |
|
97 |
$self->{pool}{$1} and $self->emit(heartbeat => $1)->{pool}{$1}{time} = $time |
|
98 |
while $chunk =~ /(\d+)\n/g; |
|
99 |
} |
|
100 | ||
101 |
sub _manage { |
|
102 |
my $self = shift; |
|
103 | ||
104 |
# Spawn more workers and check PID file |
|
105 |
if (!$self->{finished}) { |
|
106 |
$self->_spawn while keys %{$self->{pool}} < $self->workers; |
|
107 |
$self->_pid_file; |
|
108 |
} |
|
109 | ||
110 |
# Shutdown |
|
111 |
elsif (!keys %{$self->{pool}}) { return delete $self->{running} } |
|
112 | ||
113 |
# Manage workers |
|
114 |
$self->emit('wait')->_heartbeat; |
|
115 |
my $log = $self->app->log; |
|
116 |
for my $pid (keys %{$self->{pool}}) { |
|
117 |
next unless my $w = $self->{pool}{$pid}; |
|
118 | ||
119 |
# No heartbeat (graceful stop) |
|
120 |
my $interval = $self->heartbeat_interval; |
|
121 |
my $timeout = $self->heartbeat_timeout; |
|
122 |
my $time = steady_time; |
|
123 |
if (!$w->{graceful} && ($w->{time} + $interval + $timeout <= $time)) { |
|
124 |
$log->info("Worker $pid has no heartbeat, restarting."); |
|
125 |
$w->{graceful} = $time; |
|
126 |
} |
|
127 | ||
128 |
# Graceful stop with timeout |
|
129 |
$w->{graceful} ||= $time if $self->{graceful}; |
|
130 |
if ($w->{graceful}) { |
|
131 |
$log->debug("Trying to stop worker $pid gracefully."); |
|
132 |
kill 'QUIT', $pid; |
|
133 |
$w->{force} = 1 if $w->{graceful} + $self->graceful_timeout <= $time; |
|
134 |
} |
|
135 | ||
136 |
# Normal stop |
|
137 |
if (($self->{finished} && !$self->{graceful}) || $w->{force}) { |
|
138 |
$log->debug("Stopping worker $pid."); |
|
139 |
kill 'KILL', $pid; |
|
140 |
} |
|
141 |
} |
|
142 |
} |
|
143 | ||
144 |
sub _pid_file { |
|
145 |
my $self = shift; |
|
146 | ||
147 |
# Check if PID file already exists |
|
148 |
return if -e (my $file = $self->pid_file); |
|
149 | ||
150 |
# Create PID file |
|
151 |
$self->app->log->info(qq{Creating process id file "$file".}); |
|
152 |
die qq{Can't create process id file "$file": $!} |
|
153 |
unless open my $handle, '>', $file; |
|
154 |
chmod 0644, $handle; |
|
155 |
print $handle $$; |
|
156 |
} |
|
157 | ||
158 |
sub _spawn { |
|
159 |
my $self = shift; |
|
160 | ||
161 |
# Manager |
|
162 |
die "Can't fork: $!" unless defined(my $pid = fork); |
|
163 |
return $self->emit(spawn => $pid)->{pool}{$pid} = {time => steady_time} |
|
164 |
if $pid; |
|
165 | ||
166 |
# Prepare lock file |
|
167 |
my $file = $self->{lock_file}; |
|
168 |
die qq{Can't open lock file "$file": $!} unless open my $handle, '>', $file; |
|
169 | ||
170 |
# Change user/group |
|
171 |
my $loop = $self->setuidgid->ioloop; |
|
172 | ||
173 |
# Accept mutex |
|
174 |
$loop->lock( |
|
175 |
sub { |
|
176 | ||
177 |
# Blocking ("ualarm" can't be imported on Windows) |
|
178 |
my $lock; |
|
179 |
if ($_[1]) { |
|
180 |
eval { |
|
181 |
local $SIG{ALRM} = sub { die "alarm\n" }; |
|
182 |
my $old = Time::HiRes::ualarm $self->lock_timeout * 1000000; |
|
183 |
$lock = flock $handle, LOCK_EX; |
|
184 |
Time::HiRes::ualarm $old; |
|
185 |
}; |
|
186 |
if ($@) { $lock = $@ eq "alarm\n" ? 0 : die($@) } |
|
187 |
} |
|
188 | ||
189 |
# Non blocking |
|
190 |
else { $lock = flock $handle, LOCK_EX | LOCK_NB } |
|
191 | ||
192 |
return $lock; |
|
193 |
} |
|
194 |
); |
|
195 |
$loop->unlock(sub { flock $handle, LOCK_UN }); |
|
196 | ||
197 |
# Heartbeat messages (stop sending during graceful stop) |
|
198 |
weaken $self; |
|
199 |
$loop->recurring( |
|
200 |
$self->heartbeat_interval => sub { |
|
201 |
return unless shift->max_connections; |
|
202 |
$self->{writer}->syswrite("$$\n") or exit 0; |
|
203 |
} |
|
204 |
); |
|
205 | ||
206 |
# Clean worker environment |
|
207 |
$SIG{$_} = 'DEFAULT' for qw(INT TERM CHLD TTIN TTOU); |
|
208 |
$SIG{QUIT} = sub { $loop->max_connections(0) }; |
|
209 |
delete @$self{qw(poll reader)}; |
|
210 | ||
211 |
$self->app->log->debug("Worker $$ started."); |
|
212 |
$loop->start; |
|
213 |
exit 0; |
|
214 |
} |
|
215 | ||
216 |
sub _term { |
|
217 |
my ($self, $graceful) = @_; |
|
218 |
$self->emit(finish => $graceful)->{finished} = 1; |
|
219 |
$self->{graceful} = 1 if $graceful; |
|
220 |
} |
|
221 | ||
222 |
1; |
|
223 | ||
224 |
=encoding utf8 |
|
225 | ||
226 |
=head1 NAME |
|
227 | ||
228 |
Mojo::Server::Prefork - Preforking non-blocking I/O HTTP and WebSocket server |
|
229 | ||
230 |
=head1 SYNOPSIS |
|
231 | ||
232 |
use Mojo::Server::Prefork; |
|
233 | ||
234 |
my $prefork = Mojo::Server::Prefork->new(listen => ['http://*:8080']); |
|
235 |
$prefork->unsubscribe('request'); |
|
236 |
$prefork->on(request => sub { |
|
237 |
my ($prefork, $tx) = @_; |
|
238 | ||
239 |
# Request |
|
240 |
my $method = $tx->req->method; |
|
241 |
my $path = $tx->req->url->path; |
|
242 | ||
243 |
# Response |
|
244 |
$tx->res->code(200); |
|
245 |
$tx->res->headers->content_type('text/plain'); |
|
246 |
$tx->res->body("$method request for $path!"); |
|
247 | ||
248 |
# Resume transaction |
|
249 |
$tx->resume; |
|
250 |
}); |
|
251 |
$prefork->run; |
|
252 | ||
253 |
=head1 DESCRIPTION |
|
254 | ||
255 |
L<Mojo::Server::Prefork> is a full featured, UNIX optimized, preforking |
|
256 |
non-blocking I/O HTTP and WebSocket server, built around the very well tested |
|
257 |
and reliable L<Mojo::Server::Daemon>, with IPv6, TLS, Comet (long polling), |
|
258 |
keep-alive, connection pooling, timeout, cookie, multipart and multiple event |
|
259 |
loop support. Note that the server uses signals for process management, so you |
|
260 |
should avoid modifying signal handlers in your applications. |
|
261 | ||
262 |
For better scalability (epoll, kqueue) and to provide IPv6 as well as TLS |
|
263 |
support, the optional modules L<EV> (4.0+), L<IO::Socket::IP> (0.16+) and |
|
264 |
L<IO::Socket::SSL> (1.75+) will be used automatically by L<Mojo::IOLoop> if |
|
265 |
they are installed. Individual features can also be disabled with the |
|
266 |
MOJO_NO_IPV6 and MOJO_NO_TLS environment variables. |
|
267 | ||
268 |
See L<Mojolicious::Guides::Cookbook> for more. |
|
269 | ||
270 |
=head1 MANAGER SIGNALS |
|
271 | ||
272 |
The L<Mojo::Server::Prefork> manager process can be controlled at runtime with |
|
273 |
the following signals. |
|
274 | ||
275 |
=head2 INT, TERM |
|
276 | ||
277 |
Shutdown server immediately. |
|
278 | ||
279 |
=head2 QUIT |
|
280 | ||
281 |
Shutdown server gracefully. |
|
282 | ||
283 |
=head2 TTIN |
|
284 | ||
285 |
Increase worker pool by one. |
|
286 | ||
287 |
=head2 TTOU |
|
288 | ||
289 |
Decrease worker pool by one. |
|
290 | ||
291 |
=head1 WORKER SIGNALS |
|
292 | ||
293 |
L<Mojo::Server::Prefork> worker processes can be controlled at runtime with |
|
294 |
the following signals. |
|
295 | ||
296 |
=head2 INT, TERM |
|
297 | ||
298 |
Stop worker immediately. |
|
299 | ||
300 |
=head2 QUIT |
|
301 | ||
302 |
Stop worker gracefully. |
|
303 | ||
304 |
=head1 EVENTS |
|
305 | ||
306 |
L<Mojo::Server::Prefork> inherits all events from L<Mojo::Server::Daemon> and |
|
307 |
can emit the following new ones. |
|
308 | ||
309 |
=head2 finish |
|
310 | ||
311 |
$prefork->on(finish => sub { |
|
312 |
my ($prefork, $graceful) = @_; |
|
313 |
... |
|
314 |
}); |
|
315 | ||
316 |
Emitted when the server shuts down. |
|
317 | ||
318 |
$prefork->on(finish => sub { |
|
319 |
my ($prefork, $graceful) = @_; |
|
320 |
say $graceful ? 'Graceful server shutdown' : 'Server shutdown'; |
|
321 |
}); |
|
322 | ||
323 |
=head2 heartbeat |
|
324 | ||
325 |
$prefork->on(heartbeat => sub { |
|
326 |
my ($prefork, $pid) = @_; |
|
327 |
... |
|
328 |
}); |
|
329 | ||
330 |
Emitted when a heartbeat message has been received from a worker. |
|
331 | ||
332 |
$prefork->on(heartbeat => sub { |
|
333 |
my ($prefork, $pid) = @_; |
|
334 |
say "Worker $pid has a heartbeat"; |
|
335 |
}); |
|
336 | ||
337 |
=head2 reap |
|
338 | ||
339 |
$prefork->on(reap => sub { |
|
340 |
my ($prefork, $pid) = @_; |
|
341 |
... |
|
342 |
}); |
|
343 | ||
344 |
Emitted when a child process dies. |
|
345 | ||
346 |
$prefork->on(reap => sub { |
|
347 |
my ($prefork, $pid) = @_; |
|
348 |
say "Worker $pid stopped"; |
|
349 |
}); |
|
350 | ||
351 |
=head2 spawn |
|
352 | ||
353 |
$prefork->on(spawn => sub { |
|
354 |
my ($prefork, $pid) = @_; |
|
355 |
... |
|
356 |
}); |
|
357 | ||
358 |
Emitted when a worker process is spawned. |
|
359 | ||
360 |
$prefork->on(spawn => sub { |
|
361 |
my ($prefork, $pid) = @_; |
|
362 |
say "Worker $pid started"; |
|
363 |
}); |
|
364 | ||
365 |
=head2 wait |
|
366 | ||
367 |
$prefork->on(wait => sub { |
|
368 |
my $prefork = shift; |
|
369 |
... |
|
370 |
}); |
|
371 | ||
372 |
Emitted when the manager starts waiting for new heartbeat messages. |
|
373 | ||
374 |
$prefork->on(wait => sub { |
|
375 |
my $prefork = shift; |
|
376 |
my $workers = $prefork->workers; |
|
377 |
say "Waiting for heartbeat messages from $workers workers"; |
|
378 |
}); |
|
379 | ||
380 |
=head1 ATTRIBUTES |
|
381 | ||
382 |
L<Mojo::Server::Prefork> inherits all attributes from L<Mojo::Server::Daemon> |
|
383 |
and implements the following new ones. |
|
384 | ||
385 |
=head2 accept_interval |
|
386 | ||
387 |
my $interval = $prefork->accept_interval; |
|
388 |
$prefork = $prefork->accept_interval(0.5); |
|
389 | ||
390 |
Interval in seconds for trying to reacquire the accept mutex, defaults to |
|
391 |
C<0.025>. Note that changing this value can affect performance and idle CPU |
|
392 |
usage. |
|
393 | ||
394 |
=head2 accepts |
|
395 | ||
396 |
my $accepts = $prefork->accepts; |
|
397 |
$prefork = $prefork->accepts(100); |
|
398 | ||
399 |
Maximum number of connections a worker is allowed to accept before stopping |
|
400 |
gracefully, defaults to C<1000>. Setting the value to C<0> will allow workers |
|
401 |
to accept new connections indefinitely. Note that up to half of this value can |
|
402 |
be subtracted randomly to improve load balancing, and that worker processes |
|
403 |
will stop sending heartbeat messages once the limit has been reached. |
|
404 | ||
405 |
=head2 graceful_timeout |
|
406 | ||
407 |
my $timeout = $prefork->graceful_timeout; |
|
408 |
$prefork = $prefork->graceful_timeout(15); |
|
409 | ||
410 |
Maximum amount of time in seconds stopping a worker gracefully may take before |
|
411 |
being forced, defaults to C<20>. |
|
412 | ||
413 |
=head2 heartbeat_interval |
|
414 | ||
415 |
my $interval = $prefork->heartbeat_intrval; |
|
416 |
$prefork = $prefork->heartbeat_interval(3); |
|
417 | ||
418 |
Heartbeat interval in seconds, defaults to C<5>. |
|
419 | ||
420 |
=head2 heartbeat_timeout |
|
421 | ||
422 |
my $timeout = $prefork->heartbeat_timeout; |
|
423 |
$prefork = $prefork->heartbeat_timeout(2); |
|
424 | ||
425 |
Maximum amount of time in seconds before a worker without a heartbeat will be |
|
426 |
stopped gracefully, defaults to C<20>. |
|
427 | ||
428 |
=head2 lock_file |
|
429 | ||
430 |
my $file = $prefork->lock_file; |
|
431 |
$prefork = $prefork->lock_file('/tmp/prefork.lock'); |
|
432 | ||
433 |
Full path of accept mutex lock file prefix, to which the process id will be |
|
434 |
appended, defaults to a random temporary path. |
|
435 | ||
436 |
=head2 lock_timeout |
|
437 | ||
438 |
my $timeout = $prefork->lock_timeout; |
|
439 |
$prefork = $prefork->lock_timeout(0.5); |
|
440 | ||
441 |
Maximum amount of time in seconds a worker may block when waiting for the |
|
442 |
accept mutex, defaults to C<1>. Note that changing this value can affect |
|
443 |
performance and idle CPU usage. |
|
444 | ||
445 |
=head2 multi_accept |
|
446 | ||
447 |
my $multi = $prefork->multi_accept; |
|
448 |
$prefork = $prefork->multi_accept(100); |
|
449 | ||
450 |
Number of connections to accept at once, defaults to C<50>. |
|
451 | ||
452 |
=head2 pid_file |
|
453 | ||
454 |
my $file = $prefork->pid_file; |
|
455 |
$prefork = $prefork->pid_file('/tmp/prefork.pid'); |
|
456 | ||
457 |
Full path of process id file, defaults to a random temporary path. |
|
458 | ||
459 |
=head2 workers |
|
460 | ||
461 |
my $workers = $prefork->workers; |
|
462 |
$prefork = $prefork->workers(10); |
|
463 | ||
464 |
Number of worker processes, defaults to C<4>. A good rule of thumb is two |
|
465 |
worker processes per CPU core. |
|
466 | ||
467 |
=head1 METHODS |
|
468 | ||
469 |
L<Mojo::Server::Prefork> inherits all methods from L<Mojo::Server::Daemon> and |
|
470 |
implements the following new ones. |
|
471 | ||
472 |
=head2 check_pid |
|
473 | ||
474 |
my $pid = $prefork->check_pid; |
|
475 | ||
476 |
Get process id for running server from L</"pid_file"> or delete it if server |
|
477 |
is not running. |
|
478 | ||
479 |
say 'Server is not running' unless $prefork->check_pid; |
|
480 | ||
481 |
=head2 run |
|
482 | ||
483 |
$prefork->run; |
|
484 | ||
485 |
Run server. |
|
486 | ||
487 |
=head1 SEE ALSO |
|
488 | ||
489 |
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>. |
|
490 | ||
491 |
=cut |