Bug 4564: Implement GetSnapshot message in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 4f3d4aa7f931b1af11e1198de001d02831ac6d63..0804a50e9b1f2b6d5587742e83b8efa2218ddd30 100644 (file)
@@ -75,6 +75,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
@@ -211,12 +212,44 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onAddShardReplica((AddShardReplica)message);
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
+        } else if(message instanceof GetSnapshot) {
+            onGetSnapshot();
         } else {
             unknownMessage(message);
         }
 
     }
 
+    private void onGetSnapshot() {
+        LOG.debug("{}: onGetSnapshot", persistenceId());
+
+        List<String> notInitialized = null;
+        for(ShardInformation shardInfo: localShards.values()) {
+            if(!shardInfo.isShardInitialized()) {
+                if(notInitialized == null) {
+                    notInitialized = new ArrayList<>();
+                }
+
+                notInitialized.add(shardInfo.getShardName());
+            }
+        }
+
+        if(notInitialized != null) {
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+                    "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
+            return;
+        }
+
+        byte[] shardManagerSnapshot = null;
+        ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(),
+                type, shardManagerSnapshot , getSender(), persistenceId(),
+                datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
+
+        for(ShardInformation shardInfo: localShards.values()) {
+            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+        }
+    }
+
     private void onCreateShard(CreateShard createShard) {
         Object reply;
         try {
@@ -448,8 +481,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
-                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
-
                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
                 if(shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
@@ -458,6 +489,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
                 }
 
+                LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
+                        shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),