Newer Older
664 lines | 15.735kb
add files
Yuki Kimoto authored on 2014-03-26
1
package Mojo::Transaction::WebSocket;
2
use Mojo::Base 'Mojo::Transaction';
3

            
4
use Config;
5
use Mojo::JSON;
6
use Mojo::Transaction::HTTP;
7
use Mojo::Util qw(b64_encode decode encode sha1_bytes xor_encode);
8

            
9
use constant DEBUG => $ENV{MOJO_WEBSOCKET_DEBUG} || 0;
10

            
11
# Perl with support for quads
12
use constant MODERN =>
13
  (($Config{use64bitint} // '') eq 'define' || $Config{longsize} >= 8);
14

            
15
# Unique value from RFC 6455
16
use constant GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
17

            
18
# Opcodes
19
use constant {
20
  CONTINUATION => 0,
21
  TEXT         => 1,
22
  BINARY       => 2,
23
  CLOSE        => 8,
24
  PING         => 9,
25
  PONG         => 10
26
};
27

            
28
has handshake => sub { Mojo::Transaction::HTTP->new };
29
has 'masked';
30
has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
31

            
32
sub new {
33
  my $self = shift->SUPER::new(@_);
34
  $self->on(frame => sub { shift->_message(@_) });
35
  return $self;
36
}
37

            
38
sub build_frame {
39
  my ($self, $fin, $rsv1, $rsv2, $rsv3, $op, $payload) = @_;
40
  warn "-- Building frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;
41

            
42
  # Head
43
  my $frame = 0b00000000;
44
  vec($frame, 0, 8) = $op | 0b10000000 if $fin;
45
  vec($frame, 0, 8) |= 0b01000000 if $rsv1;
46
  vec($frame, 0, 8) |= 0b00100000 if $rsv2;
47
  vec($frame, 0, 8) |= 0b00010000 if $rsv3;
48

            
49
  # Small payload
50
  my $len    = length $payload;
51
  my $prefix = 0;
52
  my $masked = $self->masked;
53
  if ($len < 126) {
54
    warn "-- Small payload ($len)\n$payload\n" if DEBUG;
55
    vec($prefix, 0, 8) = $masked ? ($len | 0b10000000) : $len;
56
    $frame .= $prefix;
57
  }
58

            
59
  # Extended payload (16bit)
60
  elsif ($len < 65536) {
61
    warn "-- Extended 16bit payload ($len)\n$payload\n" if DEBUG;
62
    vec($prefix, 0, 8) = $masked ? (126 | 0b10000000) : 126;
63
    $frame .= $prefix;
64
    $frame .= pack 'n', $len;
65
  }
66

            
67
  # Extended payload (64bit with 32bit fallback)
68
  else {
69
    warn "-- Extended 64bit payload ($len)\n$payload\n" if DEBUG;
70
    vec($prefix, 0, 8) = $masked ? (127 | 0b10000000) : 127;
71
    $frame .= $prefix;
72
    $frame .= MODERN ? pack('Q>', $len) : pack('NN', 0, $len & 0xffffffff);
73
  }
74

            
75
  # Mask payload
76
  if ($masked) {
77
    my $mask = pack 'N', int(rand 9 x 7);
78
    $payload = $mask . xor_encode($payload, $mask x 128);
79
  }
80

            
81
  return $frame . $payload;
82
}
83

            
84
sub client_challenge {
85
  my $self = shift;
86
  return _challenge($self->req->headers->sec_websocket_key) eq
87
    $self->res->headers->sec_websocket_accept;
88
}
89

            
90
sub client_handshake {
91
  my $self = shift;
92

            
93
  my $headers = $self->req->headers;
94
  $headers->upgrade('websocket')      unless $headers->upgrade;
95
  $headers->connection('Upgrade')     unless $headers->connection;
96
  $headers->sec_websocket_version(13) unless $headers->sec_websocket_version;
97

            
98
  # Generate 16 byte WebSocket challenge
99
  my $challenge = b64_encode sprintf('%16u', int(rand 9 x 16)), '';
100
  $headers->sec_websocket_key($challenge) unless $headers->sec_websocket_key;
101
}
102

            
103
sub client_read  { shift->server_read(@_) }
104
sub client_write { shift->server_write(@_) }
105

            
106
sub connection { shift->handshake->connection }
107

            
108
sub finish {
109
  my $self = shift;
110

            
111
  my $close = $self->{close} = [@_];
112
  my $payload = $close->[0] ? pack('n', $close->[0]) : '';
113
  $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
114
  $close->[0] //= 1005;
115
  $self->send([1, 0, 0, 0, CLOSE, $payload])->{finished} = 1;
116

            
117
  return $self;
118
}
119

            
120
sub is_websocket {1}
121

            
122
sub kept_alive    { shift->handshake->kept_alive }
123
sub local_address { shift->handshake->local_address }
124
sub local_port    { shift->handshake->local_port }
125

            
126
sub parse_frame {
127
  my ($self, $buffer) = @_;
128

            
129
  # Head
130
  return undef unless length(my $clone = $$buffer) >= 2;
131
  my $head = substr $clone, 0, 2;
132

            
133
  # FIN
134
  my $fin = (vec($head, 0, 8) & 0b10000000) == 0b10000000 ? 1 : 0;
135

            
136
  # RSV1-3
137
  my $rsv1 = (vec($head, 0, 8) & 0b01000000) == 0b01000000 ? 1 : 0;
138
  my $rsv2 = (vec($head, 0, 8) & 0b00100000) == 0b00100000 ? 1 : 0;
139
  my $rsv3 = (vec($head, 0, 8) & 0b00010000) == 0b00010000 ? 1 : 0;
140

            
141
  # Opcode
142
  my $op = vec($head, 0, 8) & 0b00001111;
143
  warn "-- Parsing frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;
144

            
145
  # Small payload
146
  my $len = vec($head, 1, 8) & 0b01111111;
147
  my $hlen = 2;
148
  if ($len < 126) { warn "-- Small payload ($len)\n" if DEBUG }
149

            
150
  # Extended payload (16bit)
151
  elsif ($len == 126) {
152
    return undef unless length $clone > 4;
153
    $hlen = 4;
154
    $len = unpack 'n', substr($clone, 2, 2);
155
    warn "-- Extended 16bit payload ($len)\n" if DEBUG;
156
  }
157

            
158
  # Extended payload (64bit with 32bit fallback)
159
  elsif ($len == 127) {
160
    return undef unless length $clone > 10;
161
    $hlen = 10;
162
    my $ext = substr $clone, 2, 8;
163
    $len = MODERN ? unpack('Q>', $ext) : unpack('N', substr($ext, 4, 4));
164
    warn "-- Extended 64bit payload ($len)\n" if DEBUG;
165
  }
166

            
167
  # Check message size
168
  $self->finish(1009) and return undef if $len > $self->max_websocket_size;
169

            
170
  # Check if whole packet has arrived
171
  my $masked = vec($head, 1, 8) & 0b10000000;
172
  return undef if length $clone < ($len + $hlen + ($masked ? 4 : 0));
173
  substr $clone, 0, $hlen, '';
174

            
175
  # Payload
176
  $len += 4 if $masked;
177
  return undef if length $clone < $len;
178
  my $payload = $len ? substr($clone, 0, $len, '') : '';
179

            
180
  # Unmask payload
181
  $payload = xor_encode($payload, substr($payload, 0, 4, '') x 128) if $masked;
182
  warn "$payload\n" if DEBUG;
183
  $$buffer = $clone;
184

            
185
  return [$fin, $rsv1, $rsv2, $rsv3, $op, $payload];
186
}
187

            
188
sub remote_address { shift->handshake->remote_address }
189
sub remote_port    { shift->handshake->remote_port }
190
sub req            { shift->handshake->req }
191
sub res            { shift->handshake->res }
192

            
193
sub resume {
194
  my $self = shift;
195
  $self->handshake->resume;
196
  return $self;
197
}
198

            
199
sub send {
200
  my ($self, $frame, $cb) = @_;
201

            
202
  if (ref $frame eq 'HASH') {
203

            
204
    # JSON
205
    $frame->{text} = Mojo::JSON->new->encode($frame->{json}) if $frame->{json};
206

            
207
    # Binary or raw text
208
    $frame
209
      = exists $frame->{text}
210
      ? [1, 0, 0, 0, TEXT, $frame->{text}]
211
      : [1, 0, 0, 0, BINARY, $frame->{binary}];
212
  }
213

            
214
  # Text
215
  $frame = [1, 0, 0, 0, TEXT, encode('UTF-8', $frame)]
216
    if ref $frame ne 'ARRAY';
217

            
218
  $self->once(drain => $cb) if $cb;
219
  $self->{write} .= $self->build_frame(@$frame);
220
  $self->{state} = 'write';
221

            
222
  return $self->emit('resume');
223
}
224

            
225
sub server_close {
226
  my $self = shift;
227
  $self->{state} = 'finished';
228
  return $self->emit(finish => $self->{close} ? (@{$self->{close}}) : 1006);
229
}
230

            
231
sub server_handshake {
232
  my $self = shift;
233

            
234
  my $res_headers = $self->res->code(101)->headers;
235
  $res_headers->upgrade('websocket')->connection('Upgrade');
236
  my $req_headers = $self->req->headers;
237
  ($req_headers->sec_websocket_protocol // '') =~ /^\s*([^,]+)/
238
    and $res_headers->sec_websocket_protocol($1);
239
  $res_headers->sec_websocket_accept(
240
    _challenge($req_headers->sec_websocket_key));
241
}
242

            
243
sub server_read {
244
  my ($self, $chunk) = @_;
245

            
246
  $self->{read} .= $chunk // '';
247
  while (my $frame = $self->parse_frame(\$self->{read})) {
248
    $self->emit(frame => $frame);
249
  }
250

            
251
  $self->emit('resume');
252
}
253

            
254
sub server_write {
255
  my $self = shift;
256

            
257
  unless (length($self->{write} // '')) {
258
    $self->{state} = $self->{finished} ? 'finished' : 'read';
259
    $self->emit('drain');
260
  }
261

            
262
  return delete $self->{write} // '';
263
}
264

            
265
sub _challenge { b64_encode(sha1_bytes(($_[0] || '') . GUID), '') }
266

            
267
sub _message {
268
  my ($self, $frame) = @_;
269

            
270
  # Assume continuation
271
  my $op = $frame->[4] || CONTINUATION;
272

            
273
  # Ping/Pong
274
  return $self->send([1, 0, 0, 0, PONG, $frame->[5]]) if $op == PING;
275
  return if $op == PONG;
276

            
277
  # Close
278
  if ($op == CLOSE) {
279
    return $self->finish unless length $frame->[5] >= 2;
280
    return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')),
281
      decode('UTF-8', $frame->[5]));
282
  }
283

            
284
  # Append chunk and check message size
285
  $self->{op} = $op unless exists $self->{op};
286
  $self->{message} .= $frame->[5];
287
  return $self->finish(1009)
288
    if length $self->{message} > $self->max_websocket_size;
289

            
290
  # No FIN bit (Continuation)
291
  return unless $frame->[0];
292

            
293
  # Whole message
294
  my $msg = delete $self->{message};
295
  $self->emit(json => Mojo::JSON->new->decode($msg))
296
    if $self->has_subscribers('json');
297
  $op = delete $self->{op};
298
  $self->emit($op == TEXT ? 'text' : 'binary' => $msg);
299
  $self->emit(message => $op == TEXT ? decode('UTF-8', $msg) : $msg)
300
    if $self->has_subscribers('message');
301
}
302

            
303
1;
304

            
305
=encoding utf8
306

            
307
=head1 NAME
308

            
309
Mojo::Transaction::WebSocket - WebSocket transaction
310

            
311
=head1 SYNOPSIS
312

            
313
  use Mojo::Transaction::WebSocket;
314

            
315
  # Send and receive WebSocket messages
316
  my $ws = Mojo::Transaction::WebSocket->new;
317
  $ws->send('Hello World!');
318
  $ws->on(message => sub {
319
    my ($ws, $msg) = @_;
320
    say "Message: $msg";
321
  });
322
  $ws->on(finish => sub {
323
    my ($ws, $code, $reason) = @_;
324
    say "WebSocket closed with status $code.";
325
  });
326

            
327
=head1 DESCRIPTION
328

            
329
L<Mojo::Transaction::WebSocket> is a container for WebSocket transactions as
330
described in RFC 6455. Note that 64bit frames require a Perl with support for
331
quads or they are limited to 32bit.
332

            
333
=head1 EVENTS
334

            
335
L<Mojo::Transaction::WebSocket> inherits all events from L<Mojo::Transaction>
336
and can emit the following new ones.
337

            
338
=head2 binary
339

            
340
  $ws->on(binary => sub {
341
    my ($ws, $bytes) = @_;
342
    ...
343
  });
344

            
345
Emitted when a complete WebSocket binary message has been received.
346

            
347
  $ws->on(binary => sub {
348
    my ($ws, $bytes) = @_;
349
    say "Binary: $bytes";
350
  });
351

            
352
=head2 drain
353

            
354
  $ws->on(drain => sub {
355
    my $ws = shift;
356
    ...
357
  });
358

            
359
Emitted once all data has been sent.
360

            
361
  $ws->on(drain => sub {
362
    my $ws = shift;
363
    $ws->send(time);
364
  });
365

            
366
=head2 finish
367

            
368
  $ws->on(finish => sub {
369
    my ($ws, $code, $reason) = @_;
370
    ...
371
  });
372

            
373
Emitted when transaction is finished.
374

            
375
=head2 frame
376

            
377
  $ws->on(frame => sub {
378
    my ($ws, $frame) = @_;
379
    ...
380
  });
381

            
382
Emitted when a WebSocket frame has been received.
383

            
384
  $ws->unsubscribe('frame');
385
  $ws->on(frame => sub {
386
    my ($ws, $frame) = @_;
387
    say "FIN: $frame->[0]";
388
    say "RSV1: $frame->[1]";
389
    say "RSV2: $frame->[2]";
390
    say "RSV3: $frame->[3]";
391
    say "Opcode: $frame->[4]";
392
    say "Payload: $frame->[5]";
393
  });
394

            
395
=head2 json
396

            
397
  $ws->on(json => sub {
398
    my ($ws, $json) = @_;
399
    ...
400
  });
401

            
402
Emitted when a complete WebSocket message has been received, all text and
403
binary messages will be automatically JSON decoded. Note that this event only
404
gets emitted when it has at least one subscriber.
405

            
406
  $ws->on(json => sub {
407
    my ($ws, $hash) = @_;
408
    say "Message: $hash->{msg}";
409
  });
410

            
411
=head2 message
412

            
413
  $ws->on(message => sub {
414
    my ($ws, $msg) = @_;
415
    ...
416
  });
417

            
418
Emitted when a complete WebSocket message has been received, text messages
419
will be automatically decoded. Note that this event only gets emitted when it
420
has at least one subscriber.
421

            
422
  $ws->on(message => sub {
423
    my ($ws, $msg) = @_;
424
    say "Message: $msg";
425
  });
426

            
427
=head2 text
428

            
429
  $ws->on(text => sub {
430
    my ($ws, $bytes) = @_;
431
    ...
432
  });
433

            
434
Emitted when a complete WebSocket text message has been received.
435

            
436
  $ws->on(text => sub {
437
    my ($ws, $bytes) = @_;
438
    say "Text: $bytes";
439
  });
440

            
441
=head1 ATTRIBUTES
442

            
443
L<Mojo::Transaction::WebSocket> inherits all attributes from
444
L<Mojo::Transaction> and implements the following new ones.
445

            
446
=head2 handshake
447

            
448
  my $handshake = $ws->handshake;
449
  $ws           = $ws->handshake(Mojo::Transaction::HTTP->new);
450

            
451
The original handshake transaction, defaults to a L<Mojo::Transaction::HTTP>
452
object.
453

            
454
=head2 masked
455

            
456
  my $bool = $ws->masked;
457
  $ws      = $ws->masked($bool);
458

            
459
Mask outgoing frames with XOR cipher and a random 32bit key.
460

            
461
=head2 max_websocket_size
462

            
463
  my $size = $ws->max_websocket_size;
464
  $ws      = $ws->max_websocket_size(1024);
465

            
466
Maximum WebSocket message size in bytes, defaults to the value of the
467
MOJO_MAX_WEBSOCKET_SIZE environment variable or C<262144>.
468

            
469
=head1 METHODS
470

            
471
L<Mojo::Transaction::WebSocket> inherits all methods from
472
L<Mojo::Transaction> and implements the following new ones.
473

            
474
=head2 new
475

            
476
  my $ws = Mojo::Transaction::WebSocket->new;
477

            
478
Construct a new L<Mojo::Transaction::WebSocket> object and subscribe to
479
L</"frame"> event with default message parser, which also handles C<PING> and
480
C<CLOSE> frames automatically.
481

            
482
=head2 build_frame
483

            
484
  my $bytes = $ws->build_frame($fin, $rsv1, $rsv2, $rsv3, $op, $payload);
485

            
486
Build WebSocket frame.
487

            
488
  # Binary frame with FIN bit and payload
489
  say $ws->build_frame(1, 0, 0, 0, 2, 'Hello World!');
490

            
491
  # Text frame with payload but without FIN bit
492
  say $ws->build_frame(0, 0, 0, 0, 1, 'Hello ');
493

            
494
  # Continuation frame with FIN bit and payload
495
  say $ws->build_frame(1, 0, 0, 0, 0, 'World!');
496

            
497
  # Close frame with FIN bit and without payload
498
  say $ws->build_frame(1, 0, 0, 0, 8, '');
499

            
500
  # Ping frame with FIN bit and payload
501
  say $ws->build_frame(1, 0, 0, 0, 9, 'Test 123');
502

            
503
  # Pong frame with FIN bit and payload
504
  say $ws->build_frame(1, 0, 0, 0, 10, 'Test 123');
505

            
506
=head2 client_challenge
507

            
508
  my $bool = $ws->client_challenge;
509

            
510
Check WebSocket handshake challenge client-side, used to implement user
511
agents.
512

            
513
=head2 client_handshake
514

            
515
  $ws->client_handshake;
516

            
517
Perform WebSocket handshake client-side, used to implement user agents.
518

            
519
=head2 client_read
520

            
521
  $ws->client_read($data);
522

            
523
Read data client-side, used to implement user agents.
524

            
525
=head2 client_write
526

            
527
  my $bytes = $ws->client_write;
528

            
529
Write data client-side, used to implement user agents.
530

            
531
=head2 connection
532

            
533
  my $connection = $ws->connection;
534

            
535
Connection identifier or socket.
536

            
537
=head2 finish
538

            
539
  $ws = $ws->finish;
540
  $ws = $ws->finish(1000);
541
  $ws = $ws->finish(1003 => 'Cannot accept data!');
542

            
543
Close WebSocket connection gracefully.
544

            
545
=head2 is_websocket
546

            
547
  my $true = $ws->is_websocket;
548

            
549
True.
550

            
551
=head2 kept_alive
552

            
553
  my $kept_alive = $ws->kept_alive;
554

            
555
Connection has been kept alive.
556

            
557
=head2 local_address
558

            
559
  my $address = $ws->local_address;
560

            
561
Local interface address.
562

            
563
=head2 local_port
564

            
565
  my $port = $ws->local_port;
566

            
567
Local interface port.
568

            
569
=head2 parse_frame
570

            
571
  my $frame = $ws->parse_frame(\$bytes);
572

            
573
Parse WebSocket frame.
574

            
575
  # Parse single frame and remove it from buffer
576
  my $frame = $ws->parse_frame(\$buffer);
577
  say "FIN: $frame->[0]";
578
  say "RSV1: $frame->[1]";
579
  say "RSV2: $frame->[2]";
580
  say "RSV3: $frame->[3]";
581
  say "Opcode: $frame->[4]";
582
  say "Payload: $frame->[5]";
583

            
584
=head2 remote_address
585

            
586
  my $address = $ws->remote_address;
587

            
588
Remote interface address.
589

            
590
=head2 remote_port
591

            
592
  my $port = $ws->remote_port;
593

            
594
Remote interface port.
595

            
596
=head2 req
597

            
598
  my $req = $ws->req;
599

            
600
Handshake request, usually a L<Mojo::Message::Request> object.
601

            
602
=head2 res
603

            
604
  my $res = $ws->res;
605

            
606
Handshake response, usually a L<Mojo::Message::Response> object.
607

            
608
=head2 resume
609

            
610
  $ws = $ws->resume;
611

            
612
Resume L</"handshake"> transaction.
613

            
614
=head2 send
615

            
616
  $ws = $ws->send({binary => $bytes});
617
  $ws = $ws->send({text   => $bytes});
618
  $ws = $ws->send({json   => {test => [1, 2, 3]}});
619
  $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $bytes]);
620
  $ws = $ws->send($chars);
621
  $ws = $ws->send($chars => sub {...});
622

            
623
Send message or frame non-blocking via WebSocket, the optional drain callback
624
will be invoked once all data has been written.
625

            
626
  # Send "Ping" frame
627
  $ws->send([1, 0, 0, 0, 9, 'Hello World!']);
628

            
629
=head2 server_close
630

            
631
  $ws->server_close;
632

            
633
Transaction closed server-side, used to implement web servers.
634

            
635
=head2 server_handshake
636

            
637
  $ws->server_handshake;
638

            
639
Perform WebSocket handshake server-side, used to implement web servers.
640

            
641
=head2 server_read
642

            
643
  $ws->server_read($data);
644

            
645
Read data server-side, used to implement web servers.
646

            
647
=head2 server_write
648

            
649
  my $bytes = $ws->server_write;
650

            
651
Write data server-side, used to implement web servers.
652

            
653
=head1 DEBUGGING
654

            
655
You can set the MOJO_WEBSOCKET_DEBUG environment variable to get some advanced
656
diagnostics information printed to C<STDERR>.
657

            
658
  MOJO_WEBSOCKET_DEBUG=1
659

            
660
=head1 SEE ALSO
661

            
662
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
663

            
664
=cut