Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions lib/Redis/Fast.pm
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use Scalar::Util qw(weaken);
use Redis::Fast::Sentinel;

sub _new_on_connect_cb {
my ($self, $on_conn, $password, $name) = @_;
my ($self, $on_conn, $password, $name, $wait_until_loaded) = @_;
weaken $self;
return sub {
# If we are in PubSub mode we shouldn't perform any command besides
# (p)(un)subscribe
if (! $self->is_subscriber) {
$self->__wait_until_loaded if $wait_until_loaded;
defined $name
and try {
my $n = $name;
Expand Down Expand Up @@ -77,7 +78,7 @@ sub new {
my $on_conn = $args{on_connect};
my $password = $args{password};
my $name = $args{name};
$self->__set_on_connect($self->_new_on_connect_cb($on_conn, $password, $name));
$self->__set_on_connect($self->_new_on_connect_cb($on_conn, $password, $name, $args{wait_until_loaded}));
$self->__set_data({
subscribers => {},
sentinels_cnx_timeout => $args{sentinels_cnx_timeout},
Expand Down Expand Up @@ -373,6 +374,14 @@ sub __is_valid_command {
if $self->is_subscriber;
}


sub __wait_until_loaded {
my ($self) = @_;
local $@;
require Time::HiRes;
while ( !eval{ $self->exists('checking_persistent_state'); 1 } && $@ =~ m/LOADING/ ) { Time::HiRes::usleep(100_000); }
}

1; # End of Redis.pm

__END__
Expand Down
69 changes: 69 additions & 0 deletions t/53-wait-until-loaded.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!perl

use warnings;
use strict;
use Test::More;
use Test::Fatal;
use Redis::Fast;
use lib 't/tlib';
use Test::SpawnRedisServer;
use File::Temp;
use POSIX;
use Time::HiRes;

sub create_dummy_dump {
my ($c, $srv, $buf);
eval {
($c, $srv) = redis();
my $r = Redis::Fast->new(server => $srv);
$r->set("foo$_" => 'x' x 10240) for 1..1024;
$r->set(foo => 'bar');
$r->save();
my $size = -s "dump.rdb";
open my $fh, "<", "dump.rdb" or die "fail to open dump.rdb";
binmode($fh);
read($fh, $buf, $size);
close $fh;
};
$c->() if $c;
return $buf;
}

ok my $dump = create_dummy_dump(), "create big dump.rdb";

mkfifo("dump.rdb", 0600) or die 'creating named pipe failed';

my $pid = fork;
if(!defined $pid) {
die "fork failed";
} elsif(!$pid) {
# child process
open my $fh, ">", "dump.rdb" or die "fail to create dump.rdb";
binmode($fh);
select $fh;
$| = 1;

# write dump.rdb very slowly...(it takes about 10 seconds to complete)
my $loading_process_events_interval_bytes = 1024 * 1024 * 2;
my $sleep_time = 10 / (length($dump) / $loading_process_events_interval_bytes);
while(my $b = substr $dump, 0, $loading_process_events_interval_bytes, '') {
Time::HiRes::sleep($sleep_time);
print $fh $b;
}

close $fh;
exit;
}

my ($c, $srv) = redis(skip_unlink => 1);
END { $c->() if $c }

my $r = Redis::Fast->new(
server => $srv,
wait_until_loaded => 1,
);

is $r->get('foo'), 'bar';

## All done
done_testing();
3 changes: 2 additions & 1 deletion t/tlib/Test/SpawnRedisServer.pm
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sub redis {
my $addr = "127.0.0.1:$local_port";

unlink("redis-server-$addr.log");
unlink('dump.rdb');
unlink('dump.rdb') unless $params{skip_unlink};

$fh->print("
timeout $params{timeout}
Expand All @@ -41,6 +41,7 @@ sub redis {
bind 127.0.0.1
loglevel debug
logfile redis-server-$addr.log
rdbcompression no
");
$fh->flush;

Expand Down