Showing 5 changed files with 195 additions and 5 deletions
+3
Changes
... ...
@@ -1,3 +1,6 @@
1
+0.2108
2
+  - added async database access support using AnyEvent
3
+    and added "async" option to execute method
1 4
 0.2107
2 5
   - removed EXPERIMENTAL status from the following methods and functinalities.
3 6
       DBIx::Custom::Result::value method
+93 -4
lib/DBIx/Custom.pm
... ...
@@ -344,6 +344,27 @@ sub execute {
344 344
   my $params;
345 345
   $params = shift if @_ % 2;
346 346
   my %opt = @_;
347
+  
348
+  # Async query
349
+  if ($opt{async} && !$self->{_new_connection}) {
350
+    my $dsn = $self->dsn;
351
+    croak qq/Data source must be specified when "async" option is used/
352
+      unless defined $dsn;
353
+    
354
+    my $user = $self->user;
355
+    my $password = $self->password;
356
+    my $option = $self->_option;
357
+    
358
+    my $new_dbi = bless {%$self}, ref $self;
359
+    $new_dbi->connector(undef);
360
+    $new_dbi->{dbh} = DBI->connect($dsn, $user, $password,
361
+      {%{$new_dbi->default_option}, %$option});
362
+    
363
+    $new_dbi->{_new_connection} = 1;
364
+    return $new_dbi->execute($sql, defined $params ? ($params) : (), %opt);
365
+  }
366
+  
367
+  # Options
347 368
   warn "sqlfilter option is DEPRECATED" if $opt{sqlfilter};
348 369
   $params ||= $opt{param} || {};
349 370
   my $tables = $opt{table} || [];
... ...
@@ -355,6 +376,7 @@ sub execute {
355 376
   my @cleanup;
356 377
   my $saved_param;
357 378
   $opt{statement} ||= '';
379
+  $opt{statement} = 'select' if $opt{select};
358 380
   if (($opt{statement} || '') ne 'insert' && ref $params eq 'ARRAY') {
359 381
     my $params2 = $params->[1];
360 382
     $params = $params->[0];
... ...
@@ -573,7 +595,7 @@ sub execute {
573 595
   }
574 596
   
575 597
   # Result
576
-  $self->result_class->new(
598
+  my $result = $self->result_class->new(
577 599
     sth => $sth,
578 600
     dbi => $self,
579 601
     default_filter => $self->{default_in_filter},
... ...
@@ -584,6 +606,22 @@ sub execute {
584 606
       from2 => $self->type_rule->{from2}
585 607
     },
586 608
   );
609
+  
610
+  if (my $cb = $opt{async}) {
611
+    require AnyEvent;
612
+    my $watcher;
613
+    weaken $self;
614
+    $watcher = AnyEvent->io(
615
+      fh => $self->{dbh}->mysql_fd,
616
+      poll => 'r',
617
+      cb   => sub {
618
+        $cb->($self, $result);
619
+        $watcher = undef;
620
+        $result =undef;
621
+      },
622
+    );
623
+  }
624
+  else { $result }
587 625
 }
588 626
 
589 627
 sub get_table_info {
... ...
@@ -2390,6 +2428,56 @@ This is used to create update clause.
2390 2428
 
2391 2429
   "update book set " . $dbi->assign_clause({title => 'a', age => 2});
2392 2430
 
2431
+=head2 C<async> EXPERIMENTAL
2432
+
2433
+  async => sub {
2434
+    my ($dbi, $result) = @_;
2435
+    ...
2436
+  };
2437
+
2438
+Database async access. L<AnyEvent> is required.
2439
+
2440
+This is C<mysql> async access example.
2441
+
2442
+  use AnyEvent;
2443
+
2444
+  my $cond = AnyEvent->condvar;
2445
+
2446
+  my $timer = AnyEvent->timer(
2447
+    interval => 1,
2448
+    cb => sub { 1 }
2449
+  );
2450
+
2451
+  my $count = 0;
2452
+
2453
+  $dbi->execute('SELECT SLEEP(1), 3', undef,
2454
+    prepare_attr => {async => 1}, statement => 'select',
2455
+    async => sub {
2456
+      my ($dbi, $result) = @_;
2457
+      my $row = $result->fetch_one;
2458
+      is($row->[1], 3, 'before');
2459
+      $cond->send if ++$count == 2;
2460
+    }
2461
+  );
2462
+
2463
+  $dbi->select('key1', table => 'table1', prepare_attr => {async => 1},
2464
+    async => sub {
2465
+      my ($dbi, $result) = @_;
2466
+      my $row = $result->fetch_one;
2467
+      is($row->[0], 1, 'after1');
2468
+      $dbi->select('key1', table => 'table1', prepare_attr => {async => 1},
2469
+        async => sub {
2470
+          my ($dbi, $result) = @_;
2471
+          my $row = $result->fetch_one;
2472
+          is($row->[0], 1, 'after2');
2473
+          $cond->send if ++$count == 2;
2474
+        }
2475
+      )
2476
+    }
2477
+  );
2478
+
2479
+  $cond->recv;
2480
+
2393 2481
 =head2 C<column>
2394 2482
 
2395 2483
   my $column = $dbi->column(book => ['author', 'title']);
... ...
@@ -2705,11 +2793,12 @@ because generally creating query object is slow.
2705 2793
 
2706 2794
 Priamry key. This is used for C<id> option.
2707 2795
 
2708
-=item C<statement> EXPERIMETAL
2796
+=item C<select> EXPERIMETAL
2709 2797
 
2710
-  statement => 'select'
2798
+  select => 1
2711 2799
 
2712
-If you set statement to C<select>, return value is always L<DBIx::Custom::Result> object.
2800
+If you set C<select> to 1, this statement become select statement
2801
+and return value is always L<DBIx::Custom::Result> object.
2713 2802
 
2714 2803
 =item C<table>
2715 2804
   
t/_run/mysql-async-opt.run
No changes.
+96
t/mysql-async-opt.t
... ...
@@ -0,0 +1,96 @@
1
+use Test::More;
2
+use strict;
3
+use warnings;
4
+use utf8;
5
+
6
+use FindBin;
7
+use DBIx::Custom;
8
+
9
+my $dbi;
10
+my $dsn;
11
+my $args;
12
+my $user = 'dbix_custom';
13
+my $password = 'dbix_custom';
14
+my $database = 'dbix_custom';
15
+
16
+$dsn = "dbi:mysql:database=$database";
17
+$args = {dsn => $dsn, user => $user, password => $password,};
18
+
19
+plan skip_all => 'mysql private test' unless -f "$FindBin::Bin/run/mysql-async-opt.run"
20
+  && eval { $dbi = DBIx::Custom->connect($args); 1 };
21
+plan 'no_plan';
22
+
23
+$SIG{__WARN__} = sub { warn $_[0] unless $_[0] =~ /DEPRECATED/};
24
+
25
+# Function for test name
26
+sub test { print "# $_[0]\n" }
27
+
28
+# Varialbes for tests
29
+my $dbname;
30
+my $row;
31
+my $rows;
32
+my $result;
33
+my $result2;
34
+my $model;
35
+my $dbi1;
36
+my $dbi2;
37
+my $dbi3;
38
+my @dbis;
39
+my @results;
40
+
41
+test 'connect';
42
+eval {
43
+  $dbi = DBIx::Custom->connect(
44
+    dsn => "dbi:mysql:database=$database;",
45
+    user => $user,
46
+    password => $password
47
+  );
48
+};
49
+ok(!$@);
50
+
51
+eval { $dbi->do('drop table table1') };
52
+$dbi->do('create table table1 (key1 varchar(255), key2 varchar(255)) engine=InnoDB');
53
+$dbi->insert({key1 => 1, key2 => 2}, table => 'table1');
54
+
55
+test 'async test';
56
+
57
+require AnyEvent;
58
+
59
+my $cond = AnyEvent->condvar;
60
+
61
+my $timer = AnyEvent->timer(
62
+  interval => 1,
63
+  cb => sub {
64
+    1;
65
+  }
66
+);
67
+
68
+my $count = 0;
69
+
70
+$dbi->execute('SELECT SLEEP(1), 3', undef,
71
+  prepare_attr => {async => 1}, select => 1,
72
+  async => sub {
73
+    my ($dbi, $result) = @_;
74
+    my $row = $result->fetch_one;
75
+    is($row->[1], 3, 'before');
76
+    $cond->send if ++$count == 2;
77
+  }
78
+);
79
+
80
+$dbi->select('key1', table => 'table1', prepare_attr => {async => 1},
81
+  async => sub {
82
+    my ($dbi, $result) = @_;
83
+    my $row = $result->fetch_one;
84
+    is($row->[0], 1, 'after1');
85
+    $dbi->select('key1', table => 'table1', prepare_attr => {async => 1},
86
+      async => sub {
87
+        my ($dbi, $result) = @_;
88
+        my $row = $result->fetch_one;
89
+        is($row->[0], 1, 'after2');
90
+        $cond->send if ++$count == 2;
91
+      }
92
+    )
93
+  }
94
+);
95
+
96
+$cond->recv;
+3 -1
t/mysql-async.t
... ...
@@ -75,7 +75,8 @@ my $timer = AnyEvent->timer(
75 75
 
76 76
 my $count = 0;
77 77
 
78
-my $mysql_watcher = AnyEvent->io(
78
+my $mysql_watcher;
79
+$mysql_watcher = AnyEvent->io(
79 80
   fh   => $dbi->dbh->mysql_fd,
80 81
   poll => 'r',
81 82
   cb   => sub {
... ...
@@ -83,6 +84,7 @@ my $mysql_watcher = AnyEvent->io(
83 84
     is($row->[1], 3, 'before');
84 85
     $cond->send if ++$count == 2;
85 86
     undef $result;
87
+    undef $mysql_watcher;
86 88
   }
87 89
 );
88 90