Newer Older
490 lines | 11.808kb
add files
Yuki Kimoto authored on 2014-03-26
1
package Mojo::Server::Daemon;
2
use Mojo::Base 'Mojo::Server';
3

            
4
use Carp 'croak';
5
use Mojo::IOLoop;
6
use Mojo::URL;
7
use POSIX;
8
use Scalar::Util 'weaken';
9

            
10
use constant DEBUG => $ENV{MOJO_DAEMON_DEBUG} || 0;
11

            
12
has acceptors => sub { [] };
13
has [qw(backlog group silent user)];
14
has inactivity_timeout => sub { $ENV{MOJO_INACTIVITY_TIMEOUT} // 15 };
15
has ioloop => sub { Mojo::IOLoop->singleton };
16
has listen => sub { [split /,/, $ENV{MOJO_LISTEN} || 'http://*:3000'] };
17
has max_clients  => 1000;
18
has max_requests => 25;
19

            
20
sub DESTROY {
21
  my $self = shift;
22
  return unless my $loop = $self->ioloop;
23
  $self->_remove($_) for keys %{$self->{connections} || {}};
24
  $loop->remove($_) for @{$self->acceptors};
25
}
26

            
27
sub run {
28
  my $self = shift;
29
  local $SIG{INT} = local $SIG{TERM} = sub { $self->ioloop->stop };
30
  $self->start->setuidgid->ioloop->start;
31
}
32

            
33
sub setuidgid {
34
  my $self = shift;
35

            
36
  # Group
37
  if (my $group = $self->group) {
38
    croak qq{Group "$group" does not exist}
39
      unless defined(my $gid = (getgrnam($group))[2]);
40
    POSIX::setgid($gid) or croak qq{Can't switch to group "$group": $!};
41
  }
42

            
43
  # User
44
  if (my $user = $self->user) {
45
    croak qq{User "$user" does not exist}
46
      unless defined(my $uid = (getpwnam($self->user))[2]);
47
    POSIX::setuid($uid) or croak qq{Can't switch to user "$user": $!};
48
  }
49

            
50
  return $self;
51
}
52

            
53
sub start {
54
  my $self = shift;
55

            
56
  # Resume accepting connections
57
  my $loop = $self->ioloop;
58
  if (my $servers = $self->{servers}) {
59
    push @{$self->acceptors}, $loop->acceptor(delete $servers->{$_})
60
      for keys %$servers;
61
  }
62

            
63
  # Start listening
64
  else { $self->_listen($_) for @{$self->listen} }
65
  $loop->max_connections($self->max_clients);
66

            
67
  return $self;
68
}
69

            
70
sub stop {
71
  my $self = shift;
72

            
73
  # Suspend accepting connections but keep listen sockets open
74
  my $loop = $self->ioloop;
75
  while (my $id = shift @{$self->acceptors}) {
76
    my $server = $self->{servers}{$id} = $loop->acceptor($id);
77
    $loop->remove($id);
78
    $server->stop;
79
  }
80

            
81
  return $self;
82
}
83

            
84
sub _build_tx {
85
  my ($self, $id, $c) = @_;
86

            
87
  my $tx = $self->build_tx->connection($id);
88
  $tx->res->headers->server('Mojolicious (Perl)');
89
  my $handle = $self->ioloop->stream($id)->handle;
90
  $tx->local_address($handle->sockhost)->local_port($handle->sockport);
91
  $tx->remote_address($handle->peerhost)->remote_port($handle->peerport);
92
  $tx->req->url->base->scheme('https') if $c->{tls};
93

            
94
  # Handle upgrades and requests
95
  weaken $self;
96
  $tx->on(
97
    upgrade => sub {
98
      my ($tx, $ws) = @_;
99
      $ws->server_handshake;
100
      $self->{connections}{$id}{ws} = $ws;
101
    }
102
  );
103
  $tx->on(
104
    request => sub {
105
      my $tx = shift;
106
      $self->emit(request => $self->{connections}{$id}{ws} || $tx);
107
      $tx->on(resume => sub { $self->_write($id) });
108
    }
109
  );
110

            
111
  # Kept alive if we have more than one request on the connection
112
  return ++$c->{requests} > 1 ? $tx->kept_alive(1) : $tx;
113
}
114

            
115
sub _close {
116
  my ($self, $id) = @_;
117

            
118
  # Finish gracefully
119
  if (my $tx = $self->{connections}{$id}{tx}) { $tx->server_close }
120

            
121
  delete $self->{connections}{$id};
122
}
123

            
124
sub _finish {
125
  my ($self, $id, $tx) = @_;
126

            
127
  # Always remove connection for WebSockets
128
  return $self->_remove($id) if $tx->is_websocket;
129

            
130
  # Finish transaction
131
  $tx->server_close;
132

            
133
  # Upgrade connection to WebSocket
134
  my $c = $self->{connections}{$id};
135
  if (my $ws = $c->{tx} = delete $c->{ws}) {
136

            
137
    # Successful upgrade
138
    if ($ws->res->code eq '101') {
139
      weaken $self;
140
      $ws->on(resume => sub { $self->_write($id) });
141
    }
142

            
143
    # Failed upgrade
144
    else {
145
      delete $c->{tx};
146
      $ws->server_close;
147
    }
148
  }
149

            
150
  # Close connection if necessary
151
  my $req = $tx->req;
152
  return $self->_remove($id) if $req->error || !$tx->keep_alive;
153

            
154
  # Build new transaction for leftovers
155
  return unless length(my $leftovers = $req->content->leftovers);
156
  $tx = $c->{tx} = $self->_build_tx($id, $c);
157
  $tx->server_read($leftovers);
158
}
159

            
160
sub _listen {
161
  my ($self, $listen) = @_;
162

            
163
  my $url     = Mojo::URL->new($listen);
164
  my $query   = $url->query;
165
  my $options = {
166
    address => $url->host,
167
    backlog => $self->backlog,
168
    port    => $url->port,
169
    reuse   => scalar $query->param('reuse'),
170
  };
171
  $options->{"tls_$_"} = scalar $query->param($_) for qw(ca cert ciphers key);
172
  my $verify = $query->param('verify');
173
  $options->{tls_verify} = hex $verify if defined $verify;
174
  delete $options->{address} if $options->{address} eq '*';
175
  my $tls = $options->{tls} = $url->protocol eq 'https';
176

            
177
  weaken $self;
178
  push @{$self->acceptors}, $self->ioloop->server(
179
    $options => sub {
180
      my ($loop, $stream, $id) = @_;
181

            
182
      my $c = $self->{connections}{$id} = {tls => $tls};
183
      warn "-- Accept (@{[$stream->handle->peerhost]})\n" if DEBUG;
184
      $stream->timeout($self->inactivity_timeout);
185

            
186
      $stream->on(close => sub { $self->_close($id) });
187
      $stream->on(
188
        error => sub {
189
          return unless $self;
190
          $self->app->log->error(pop);
191
          $self->_close($id);
192
        }
193
      );
194
      $stream->on(read => sub { $self->_read($id => pop) });
195
      $stream->on(timeout =>
196
          sub { $self->app->log->debug('Inactivity timeout.') if $c->{tx} });
197
    }
198
  );
199

            
200
  return if $self->silent;
201
  $self->app->log->info(qq{Listening at "$url".});
202
  $query->params([]);
203
  $url->host('127.0.0.1') if $url->host eq '*';
204
  say "Server available at $url.";
205
}
206

            
207
sub _read {
208
  my ($self, $id, $chunk) = @_;
209

            
210
  # Make sure we have a transaction and parse chunk
211
  return unless my $c = $self->{connections}{$id};
212
  my $tx = $c->{tx} ||= $self->_build_tx($id, $c);
213
  warn "-- Server <<< Client (@{[$tx->req->url->to_abs]})\n$chunk\n" if DEBUG;
214
  $tx->server_read($chunk);
215

            
216
  # Last keep-alive request or corrupted connection
217
  $tx->res->headers->connection('close')
218
    if (($c->{requests} || 0) >= $self->max_requests) || $tx->req->error;
219

            
220
  # Finish or start writing
221
  if ($tx->is_finished) { $self->_finish($id, $tx) }
222
  elsif ($tx->is_writing) { $self->_write($id) }
223
}
224

            
225
sub _remove {
226
  my ($self, $id) = @_;
227
  $self->ioloop->remove($id);
228
  $self->_close($id);
229
}
230

            
231
sub _write {
232
  my ($self, $id) = @_;
233

            
234
  # Not writing
235
  return unless my $c  = $self->{connections}{$id};
236
  return unless my $tx = $c->{tx};
237
  return unless $tx->is_writing;
238

            
239
  # Get chunk and write
240
  return if $c->{writing}++;
241
  my $chunk = $tx->server_write;
242
  delete $c->{writing};
243
  warn "-- Server >>> Client (@{[$tx->req->url->to_abs]})\n$chunk\n" if DEBUG;
244
  my $stream = $self->ioloop->stream($id)->write($chunk);
245

            
246
  # Finish or continue writing
247
  weaken $self;
248
  my $cb = sub { $self->_write($id) };
249
  if ($tx->is_finished) {
250
    if ($tx->has_subscribers('finish')) {
251
      $cb = sub { $self->_finish($id, $tx) }
252
    }
253
    else {
254
      $self->_finish($id, $tx);
255
      return unless $c->{tx};
256
    }
257
  }
258
  $stream->write('' => $cb);
259
}
260

            
261
1;
262

            
263
=encoding utf8
264

            
265
=head1 NAME
266

            
267
Mojo::Server::Daemon - Non-blocking I/O HTTP and WebSocket server
268

            
269
=head1 SYNOPSIS
270

            
271
  use Mojo::Server::Daemon;
272

            
273
  my $daemon = Mojo::Server::Daemon->new(listen => ['http://*:8080']);
274
  $daemon->unsubscribe('request');
275
  $daemon->on(request => sub {
276
    my ($daemon, $tx) = @_;
277

            
278
    # Request
279
    my $method = $tx->req->method;
280
    my $path   = $tx->req->url->path;
281

            
282
    # Response
283
    $tx->res->code(200);
284
    $tx->res->headers->content_type('text/plain');
285
    $tx->res->body("$method request for $path!");
286

            
287
    # Resume transaction
288
    $tx->resume;
289
  });
290
  $daemon->run;
291

            
292
=head1 DESCRIPTION
293

            
294
L<Mojo::Server::Daemon> is a full featured, highly portable non-blocking I/O
295
HTTP and WebSocket server, with IPv6, TLS, Comet (long polling), keep-alive,
296
connection pooling, timeout, cookie, multipart and multiple event loop
297
support.
298

            
299
For better scalability (epoll, kqueue) and to provide IPv6 as well as TLS
300
support, the optional modules L<EV> (4.0+), L<IO::Socket::IP> (0.16+) and
301
L<IO::Socket::SSL> (1.75+) will be used automatically by L<Mojo::IOLoop> if
302
they are installed. Individual features can also be disabled with the
303
MOJO_NO_IPV6 and MOJO_NO_TLS environment variables.
304

            
305
See L<Mojolicious::Guides::Cookbook> for more.
306

            
307
=head1 EVENTS
308

            
309
L<Mojo::Server::Daemon> inherits all events from L<Mojo::Server>.
310

            
311
=head1 ATTRIBUTES
312

            
313
L<Mojo::Server::Daemon> inherits all attributes from L<Mojo::Server> and
314
implements the following new ones.
315

            
316
=head2 acceptors
317

            
318
  my $acceptors = $daemon->acceptors;
319
  $daemon       = $daemon->acceptors([]);
320

            
321
Active acceptors.
322

            
323
=head2 backlog
324

            
325
  my $backlog = $daemon->backlog;
326
  $daemon     = $daemon->backlog(128);
327

            
328
Listen backlog size, defaults to C<SOMAXCONN>.
329

            
330
=head2 group
331

            
332
  my $group = $daemon->group;
333
  $daemon   = $daemon->group('users');
334

            
335
Group for server process.
336

            
337
=head2 inactivity_timeout
338

            
339
  my $timeout = $daemon->inactivity_timeout;
340
  $daemon     = $daemon->inactivity_timeout(5);
341

            
342
Maximum amount of time in seconds a connection can be inactive before getting
343
closed, defaults to the value of the MOJO_INACTIVITY_TIMEOUT environment
344
variable or C<15>. Setting the value to C<0> will allow connections to be
345
inactive indefinitely.
346

            
347
=head2 ioloop
348

            
349
  my $loop = $daemon->ioloop;
350
  $daemon  = $daemon->ioloop(Mojo::IOLoop->new);
351

            
352
Event loop object to use for I/O operations, defaults to the global
353
L<Mojo::IOLoop> singleton.
354

            
355
=head2 listen
356

            
357
  my $listen = $daemon->listen;
358
  $daemon    = $daemon->listen(['https://localhost:3000']);
359

            
360
List of one or more locations to listen on, defaults to the value of the
361
MOJO_LISTEN environment variable or C<http://*:3000>.
362

            
363
  # Allow multiple servers to use the same port (SO_REUSEPORT)
364
  $daemon->listen(['http://*:8080?reuse=1']);
365

            
366
  # Listen on IPv6 interface
367
  $daemon->listen(['http://[::1]:4000']);
368

            
369
  # Listen on two ports with HTTP and HTTPS at the same time
370
  $daemon->listen([qw(http://*:3000 https://*:4000)]);
371

            
372
  # Use a custom certificate and key
373
  $daemon->listen(['https://*:3000?cert=/x/server.crt&key=/y/server.key']);
374

            
375
  # Or even a custom certificate authority
376
  $daemon->listen(
377
    ['https://*:3000?cert=/x/server.crt&key=/y/server.key&ca=/z/ca.crt']);
378

            
379
These parameters are currently available:
380

            
381
=over 2
382

            
383
=item ca
384

            
385
  ca=/etc/tls/ca.crt
386

            
387
Path to TLS certificate authority file.
388

            
389
=item cert
390

            
391
  cert=/etc/tls/server.crt
392

            
393
Path to the TLS cert file, defaults to a built-in test certificate.
394

            
395
=item ciphers
396

            
397
  ciphers=AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH
398

            
399
Cipher specification string.
400

            
401
=item key
402

            
403
  key=/etc/tls/server.key
404

            
405
Path to the TLS key file, defaults to a built-in test key.
406

            
407
=item reuse
408

            
409
  reuse=1
410

            
411
Allow multiple servers to use the same port with the C<SO_REUSEPORT> socket
412
option.
413

            
414
=item verify
415

            
416
  verify=0x00
417

            
418
TLS verification mode, defaults to C<0x03>.
419

            
420
=back
421

            
422
=head2 max_clients
423

            
424
  my $max = $daemon->max_clients;
425
  $daemon = $daemon->max_clients(1000);
426

            
427
Maximum number of parallel client connections, defaults to C<1000>.
428

            
429
=head2 max_requests
430

            
431
  my $max = $daemon->max_requests;
432
  $daemon = $daemon->max_requests(100);
433

            
434
Maximum number of keep-alive requests per connection, defaults to C<25>.
435

            
436
=head2 silent
437

            
438
  my $bool = $daemon->silent;
439
  $daemon  = $daemon->silent($bool);
440

            
441
Disable console messages.
442

            
443
=head2 user
444

            
445
  my $user = $daemon->user;
446
  $daemon  = $daemon->user('web');
447

            
448
User for the server process.
449

            
450
=head1 METHODS
451

            
452
L<Mojo::Server::Daemon> inherits all methods from L<Mojo::Server> and
453
implements the following new ones.
454

            
455
=head2 run
456

            
457
  $daemon->run;
458

            
459
Run server.
460

            
461
=head2 setuidgid
462

            
463
  $daemon = $daemon->setuidgid;
464

            
465
Set user and group for process.
466

            
467
=head2 start
468

            
469
  $daemon = $daemon->start;
470

            
471
Start accepting connections.
472

            
473
=head2 stop
474

            
475
  $daemon = $daemon->stop;
476

            
477
Stop accepting connections.
478

            
479
=head1 DEBUGGING
480

            
481
You can set the MOJO_DAEMON_DEBUG environment variable to get some advanced
482
diagnostics information printed to C<STDERR>.
483

            
484
  MOJO_DAEMON_DEBUG=1
485

            
486
=head1 SEE ALSO
487

            
488
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
489

            
490
=cut