Merge "Handle FollowerInitialSyncStatus message in Shard/ShardManager"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index c441afb49787fb7d5ae946c7fc0e0e91ec7137ad..136c6813eaba9d7d116b4ba0b4609bbd4848fb13 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -55,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -166,16 +168,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             ignoreMessage(message);
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
-        } else if(message instanceof RoleChangeNotification){
+        } else if(message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
+        } else if(message instanceof FollowerInitialSyncUpStatus){
+            onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else{
             unknownMessage(message);
         }
 
     }
 
-    private void onRoleChangeNotification(RoleChangeNotification message) {
-        RoleChangeNotification roleChanged = message;
+    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+        LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
+                status.isInitialSyncDone());
+
+        ShardInformation shardInformation = findShardInformation(status.getName());
+
+        if(shardInformation != null) {
+            shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
+
+            mBean.setSyncStatus(isInSync());
+        }
+
+    }
+
+    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
         LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
@@ -189,6 +206,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 waitTillReadyCountdownLatch.countDown();
             }
+
+            mBean.setSyncStatus(isInSync());
         }
     }
 
@@ -214,6 +233,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return isReady;
     }
 
+    private boolean isInSync(){
+        for (ShardInformation info : localShards.values()) {
+            if(!info.isInSync()){
+                return false;
+            }
+        }
+        return true;
+    }
+
     private void onActorInitialized(Object message) {
         final ActorRef sender = getSender();
 
@@ -519,6 +547,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return dataPersistenceProvider;
     }
 
+    @VisibleForTesting
+    ShardManagerInfoMBean getMBean(){
+        return mBean;
+    }
+
     private class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
@@ -529,6 +562,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
+        private boolean followerSyncStatus = false;
+
         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
         private String role ;
 
@@ -607,6 +642,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return this.role;
         }
 
+        public void setFollowerSyncStatus(boolean syncStatus){
+            this.followerSyncStatus = syncStatus;
+        }
+
+        public boolean isInSync(){
+            if(RaftState.Follower.name().equals(this.role)){
+                return followerSyncStatus;
+            } else if(RaftState.Leader.name().equals(this.role)){
+                return true;
+            }
+
+            return false;
+        }
+
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {