package Mojo::Server::Daemon; use Mojo::Base 'Mojo::Server'; use Carp 'croak'; use Mojo::IOLoop; use Mojo::URL; use POSIX; use Scalar::Util 'weaken'; use constant DEBUG => $ENV{MOJO_DAEMON_DEBUG} || 0; has acceptors => sub { [] }; has [qw(backlog group silent user)]; has inactivity_timeout => sub { $ENV{MOJO_INACTIVITY_TIMEOUT} // 15 }; has ioloop => sub { Mojo::IOLoop->singleton }; has listen => sub { [split /,/, $ENV{MOJO_LISTEN} || 'http://*:3000'] }; has max_clients => 1000; has max_requests => 25; sub DESTROY { my $self = shift; return unless my $loop = $self->ioloop; $self->_remove($_) for keys %{$self->{connections} || {}}; $loop->remove($_) for @{$self->acceptors}; } sub run { my $self = shift; local $SIG{INT} = local $SIG{TERM} = sub { $self->ioloop->stop }; $self->start->setuidgid->ioloop->start; } sub setuidgid { my $self = shift; # Group if (my $group = $self->group) { croak qq{Group "$group" does not exist} unless defined(my $gid = (getgrnam($group))[2]); POSIX::setgid($gid) or croak qq{Can't switch to group "$group": $!}; } # User if (my $user = $self->user) { croak qq{User "$user" does not exist} unless defined(my $uid = (getpwnam($self->user))[2]); POSIX::setuid($uid) or croak qq{Can't switch to user "$user": $!}; } return $self; } sub start { my $self = shift; # Resume accepting connections my $loop = $self->ioloop; if (my $servers = $self->{servers}) { push @{$self->acceptors}, $loop->acceptor(delete $servers->{$_}) for keys %$servers; } # Start listening else { $self->_listen($_) for @{$self->listen} } $loop->max_connections($self->max_clients); return $self; } sub stop { my $self = shift; # Suspend accepting connections but keep listen sockets open my $loop = $self->ioloop; while (my $id = shift @{$self->acceptors}) { my $server = $self->{servers}{$id} = $loop->acceptor($id); $loop->remove($id); $server->stop; } return $self; } sub _build_tx { my ($self, $id, $c) = @_; my $tx = $self->build_tx->connection($id); $tx->res->headers->server('Mojolicious (Perl)'); my $handle = $self->ioloop->stream($id)->handle; $tx->local_address($handle->sockhost)->local_port($handle->sockport); $tx->remote_address($handle->peerhost)->remote_port($handle->peerport); $tx->req->url->base->scheme('https') if $c->{tls}; # Handle upgrades and requests weaken $self; $tx->on( upgrade => sub { my ($tx, $ws) = @_; $ws->server_handshake; $self->{connections}{$id}{ws} = $ws; } ); $tx->on( request => sub { my $tx = shift; $self->emit(request => $self->{connections}{$id}{ws} || $tx); $tx->on(resume => sub { $self->_write($id) }); } ); # Kept alive if we have more than one request on the connection return ++$c->{requests} > 1 ? $tx->kept_alive(1) : $tx; } sub _close { my ($self, $id) = @_; # Finish gracefully if (my $tx = $self->{connections}{$id}{tx}) { $tx->server_close } delete $self->{connections}{$id}; } sub _finish { my ($self, $id, $tx) = @_; # Always remove connection for WebSockets return $self->_remove($id) if $tx->is_websocket; # Finish transaction $tx->server_close; # Upgrade connection to WebSocket my $c = $self->{connections}{$id}; if (my $ws = $c->{tx} = delete $c->{ws}) { # Successful upgrade if ($ws->res->code eq '101') { weaken $self; $ws->on(resume => sub { $self->_write($id) }); } # Failed upgrade else { delete $c->{tx}; $ws->server_close; } } # Close connection if necessary my $req = $tx->req; return $self->_remove($id) if $req->error || !$tx->keep_alive; # Build new transaction for leftovers return unless length(my $leftovers = $req->content->leftovers); $tx = $c->{tx} = $self->_build_tx($id, $c); $tx->server_read($leftovers); } sub _listen { my ($self, $listen) = @_; my $url = Mojo::URL->new($listen); my $query = $url->query; my $options = { address => $url->host, backlog => $self->backlog, port => $url->port, reuse => scalar $query->param('reuse'), }; $options->{"tls_$_"} = scalar $query->param($_) for qw(ca cert ciphers key); my $verify = $query->param('verify'); $options->{tls_verify} = hex $verify if defined $verify; delete $options->{address} if $options->{address} eq '*'; my $tls = $options->{tls} = $url->protocol eq 'https'; weaken $self; push @{$self->acceptors}, $self->ioloop->server( $options => sub { my ($loop, $stream, $id) = @_; my $c = $self->{connections}{$id} = {tls => $tls}; warn "-- Accept (@{[$stream->handle->peerhost]})\n" if DEBUG; $stream->timeout($self->inactivity_timeout); $stream->on(close => sub { $self->_close($id) }); $stream->on( error => sub { return unless $self; $self->app->log->error(pop); $self->_close($id); } ); $stream->on(read => sub { $self->_read($id => pop) }); $stream->on(timeout => sub { $self->app->log->debug('Inactivity timeout.') if $c->{tx} }); } ); return if $self->silent; $self->app->log->info(qq{Listening at "$url".}); $query->params([]); $url->host('127.0.0.1') if $url->host eq '*'; say "Server available at $url."; } sub _read { my ($self, $id, $chunk) = @_; # Make sure we have a transaction and parse chunk return unless my $c = $self->{connections}{$id}; my $tx = $c->{tx} ||= $self->_build_tx($id, $c); warn "-- Server <<< Client (@{[$tx->req->url->to_abs]})\n$chunk\n" if DEBUG; $tx->server_read($chunk); # Last keep-alive request or corrupted connection $tx->res->headers->connection('close') if (($c->{requests} || 0) >= $self->max_requests) || $tx->req->error; # Finish or start writing if ($tx->is_finished) { $self->_finish($id, $tx) } elsif ($tx->is_writing) { $self->_write($id) } } sub _remove { my ($self, $id) = @_; $self->ioloop->remove($id); $self->_close($id); } sub _write { my ($self, $id) = @_; # Not writing return unless my $c = $self->{connections}{$id}; return unless my $tx = $c->{tx}; return unless $tx->is_writing; # Get chunk and write return if $c->{writing}++; my $chunk = $tx->server_write; delete $c->{writing}; warn "-- Server >>> Client (@{[$tx->req->url->to_abs]})\n$chunk\n" if DEBUG; my $stream = $self->ioloop->stream($id)->write($chunk); # Finish or continue writing weaken $self; my $cb = sub { $self->_write($id) }; if ($tx->is_finished) { if ($tx->has_subscribers('finish')) { $cb = sub { $self->_finish($id, $tx) } } else { $self->_finish($id, $tx); return unless $c->{tx}; } } $stream->write('' => $cb); } 1; =encoding utf8 =head1 NAME Mojo::Server::Daemon - Non-blocking I/O HTTP and WebSocket server =head1 SYNOPSIS use Mojo::Server::Daemon; my $daemon = Mojo::Server::Daemon->new(listen => ['http://*:8080']); $daemon->unsubscribe('request'); $daemon->on(request => sub { my ($daemon, $tx) = @_; # Request my $method = $tx->req->method; my $path = $tx->req->url->path; # Response $tx->res->code(200); $tx->res->headers->content_type('text/plain'); $tx->res->body("$method request for $path!"); # Resume transaction $tx->resume; }); $daemon->run; =head1 DESCRIPTION L is a full featured, highly portable non-blocking I/O HTTP and WebSocket server, with IPv6, TLS, Comet (long polling), keep-alive, connection pooling, timeout, cookie, multipart and multiple event loop support. For better scalability (epoll, kqueue) and to provide IPv6 as well as TLS support, the optional modules L (4.0+), L (0.16+) and L (1.75+) will be used automatically by L if they are installed. Individual features can also be disabled with the MOJO_NO_IPV6 and MOJO_NO_TLS environment variables. See L for more. =head1 EVENTS L inherits all events from L. =head1 ATTRIBUTES L inherits all attributes from L and implements the following new ones. =head2 acceptors my $acceptors = $daemon->acceptors; $daemon = $daemon->acceptors([]); Active acceptors. =head2 backlog my $backlog = $daemon->backlog; $daemon = $daemon->backlog(128); Listen backlog size, defaults to C. =head2 group my $group = $daemon->group; $daemon = $daemon->group('users'); Group for server process. =head2 inactivity_timeout my $timeout = $daemon->inactivity_timeout; $daemon = $daemon->inactivity_timeout(5); Maximum amount of time in seconds a connection can be inactive before getting closed, defaults to the value of the MOJO_INACTIVITY_TIMEOUT environment variable or C<15>. Setting the value to C<0> will allow connections to be inactive indefinitely. =head2 ioloop my $loop = $daemon->ioloop; $daemon = $daemon->ioloop(Mojo::IOLoop->new); Event loop object to use for I/O operations, defaults to the global L singleton. =head2 listen my $listen = $daemon->listen; $daemon = $daemon->listen(['https://localhost:3000']); List of one or more locations to listen on, defaults to the value of the MOJO_LISTEN environment variable or C. # Allow multiple servers to use the same port (SO_REUSEPORT) $daemon->listen(['http://*:8080?reuse=1']); # Listen on IPv6 interface $daemon->listen(['http://[::1]:4000']); # Listen on two ports with HTTP and HTTPS at the same time $daemon->listen([qw(http://*:3000 https://*:4000)]); # Use a custom certificate and key $daemon->listen(['https://*:3000?cert=/x/server.crt&key=/y/server.key']); # Or even a custom certificate authority $daemon->listen( ['https://*:3000?cert=/x/server.crt&key=/y/server.key&ca=/z/ca.crt']); These parameters are currently available: =over 2 =item ca ca=/etc/tls/ca.crt Path to TLS certificate authority file. =item cert cert=/etc/tls/server.crt Path to the TLS cert file, defaults to a built-in test certificate. =item ciphers ciphers=AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH Cipher specification string. =item key key=/etc/tls/server.key Path to the TLS key file, defaults to a built-in test key. =item reuse reuse=1 Allow multiple servers to use the same port with the C socket option. =item verify verify=0x00 TLS verification mode, defaults to C<0x03>. =back =head2 max_clients my $max = $daemon->max_clients; $daemon = $daemon->max_clients(1000); Maximum number of parallel client connections, defaults to C<1000>. =head2 max_requests my $max = $daemon->max_requests; $daemon = $daemon->max_requests(100); Maximum number of keep-alive requests per connection, defaults to C<25>. =head2 silent my $bool = $daemon->silent; $daemon = $daemon->silent($bool); Disable console messages. =head2 user my $user = $daemon->user; $daemon = $daemon->user('web'); User for the server process. =head1 METHODS L inherits all methods from L and implements the following new ones. =head2 run $daemon->run; Run server. =head2 setuidgid $daemon = $daemon->setuidgid; Set user and group for process. =head2 start $daemon = $daemon->start; Start accepting connections. =head2 stop $daemon = $daemon->stop; Stop accepting connections. =head1 DEBUGGING You can set the MOJO_DAEMON_DEBUG environment variable to get some advanced diagnostics information printed to C. MOJO_DAEMON_DEBUG=1 =head1 SEE ALSO L, L, L. =cut