Bug 4564: Implement clustering backup-datastore RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManagerGetSnapshotReplyActor.java
index f77b3cbd7b5a418d4013bc3a18cc8bfbdbe9f032..d0b2dab91b953828763f5d4ea5bc8e15e301de3c 100644 (file)
@@ -14,7 +14,10 @@ import akka.actor.ReceiveTimeout;
 import akka.actor.Status.Failure;
 import akka.actor.UntypedActor;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
@@ -33,14 +36,15 @@ import scala.concurrent.duration.Duration;
 class ShardManagerGetSnapshotReplyActor extends UntypedActor {
     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
 
-    private int repliesReceived;
+    private final Set<String> remainingShardNames;
     private final Params params;
     private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
 
     private ShardManagerGetSnapshotReplyActor(Params params) {
         this.params = params;
+        remainingShardNames = new HashSet<>(params.shardNames);
 
-        LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount);
+        LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
 
         getContext().setReceiveTimeout(params.receiveTimeout);
     }
@@ -55,12 +59,12 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor {
             params.replyToActor.tell(message, getSelf());
             getSelf().tell(PoisonPill.getInstance(), getSelf());
         } else if (message instanceof ReceiveTimeout) {
-            LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}",
-                    params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived);
-
-            params.replyToActor.tell(new Failure(new TimeoutException(String.format(
-                    "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s",
-                        params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf());
+            String msg = String.format(
+                    "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.",
+                        params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(),
+                        remainingShardNames);
+            LOG.warn("{}: {}", params.id, msg);
+            params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
             getSelf().tell(PoisonPill.getInstance(), getSelf());
         }
     }
@@ -71,7 +75,8 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor {
         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
         shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
 
-        if(++repliesReceived == params.totalShardCount) {
+        remainingShardNames.remove(shardId.getShardName());
+        if(remainingShardNames.isEmpty()) {
             LOG.debug("{}: All shard snapshots received", params.id);
 
             DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
@@ -81,23 +86,23 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor {
         }
     }
 
-    public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot,
+    public static Props props(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot,
             ActorRef replyToActor, String id, Duration receiveTimeout) {
-        return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType,
+        return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
                 shardManagerSnapshot, replyToActor, id, receiveTimeout));
     }
 
     private static final class Params {
-        final int totalShardCount;
+        final Collection<String> shardNames;
         final String datastoreType;
         final byte[] shardManagerSnapshot;
         final ActorRef replyToActor;
         final String id;
         final Duration receiveTimeout;
 
-        Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
+        Params(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
                 String id, Duration receiveTimeout) {
-            this.totalShardCount = totalShardCount;
+            this.shardNames = shardNames;
             this.datastoreType = datastoreType;
             this.shardManagerSnapshot = shardManagerSnapshot;
             this.replyToActor = replyToActor;