Merge "Handle FollowerInitialSyncStatus message in Shard/ShardManager"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 22e2dbd47d4d7148c8e41cfe05ce8532df527466..136c6813eaba9d7d116b4ba0b4609bbd4848fb13 100644 (file)
@@ -15,8 +15,6 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.japi.Function;
 import akka.japi.Procedure;
@@ -24,6 +22,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -36,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -52,8 +52,15 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
 /**
@@ -67,8 +74,7 @@ import scala.concurrent.duration.Duration;
  */
 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -88,26 +94,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Configuration configuration;
 
-    private ShardManagerInfoMBean mBean;
+    private final String shardDispatcherPath;
 
-    private final DatastoreContext datastoreContext;
+    private ShardManagerInfo mBean;
+
+    private DatastoreContext datastoreContext;
 
     private Collection<String> knownModules = Collections.emptySet();
 
     private final DataPersistenceProvider dataPersistenceProvider;
 
+    private final CountDownLatch waitTillReadyCountdownLatch;
+
     /**
-     * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
-     *             configuration or operational
      */
-    protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext) {
+    protected ShardManager(ClusterWrapper cluster, Configuration configuration,
+            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
 
-        this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+        this.type = datastoreContext.getDataStoreType();
+        this.shardDispatcherPath =
+                new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+        this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -119,21 +130,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
     }
 
-    public static Props props(final String type,
+    public static Props props(
         final ClusterWrapper cluster,
         final Configuration configuration,
-        final DatastoreContext datastoreContext) {
+        final DatastoreContext datastoreContext,
+        final CountDownLatch waitTillReadyCountdownLatch) {
 
-        Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
+        Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
+    }
 
-        return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+    @Override
+    public void postStop() {
+        LOG.info("Stopping ShardManager");
+
+        mBean.unregisterMBean();
     }
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
             findPrimary(FindPrimary.fromSerializable(message));
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
@@ -147,12 +166,82 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
             ignoreMessage(message);
+        } else if(message instanceof DatastoreContext) {
+            onDatastoreContext((DatastoreContext)message);
+        } else if(message instanceof RoleChangeNotification) {
+            onRoleChangeNotification((RoleChangeNotification) message);
+        } else if(message instanceof FollowerInitialSyncUpStatus){
+            onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else{
             unknownMessage(message);
         }
 
     }
 
+    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+        LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
+                status.isInitialSyncDone());
+
+        ShardInformation shardInformation = findShardInformation(status.getName());
+
+        if(shardInformation != null) {
+            shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
+
+            mBean.setSyncStatus(isInSync());
+        }
+
+    }
+
+    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
+        LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+                roleChanged.getOldRole(), roleChanged.getNewRole());
+
+        ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
+        if(shardInformation != null) {
+            shardInformation.setRole(roleChanged.getNewRole());
+
+            if (isReady()) {
+                LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
+                        waitTillReadyCountdownLatch.getCount());
+
+                waitTillReadyCountdownLatch.countDown();
+            }
+
+            mBean.setSyncStatus(isInSync());
+        }
+    }
+
+
+    private ShardInformation findShardInformation(String memberId) {
+        for(ShardInformation info : localShards.values()){
+            if(info.getShardId().toString().equals(memberId)){
+                return info;
+            }
+        }
+
+        return null;
+    }
+
+    private boolean isReady() {
+        boolean isReady = true;
+        for (ShardInformation info : localShards.values()) {
+            if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+                isReady = false;
+                break;
+            }
+        }
+        return isReady;
+    }
+
+    private boolean isInSync(){
+        for (ShardInformation info : localShards.values()) {
+            if(!info.isInSync()){
+                return false;
+            }
+        }
+        return true;
+    }
+
     private void onActorInitialized(Object message) {
         final ActorRef sender = getSender();
 
@@ -186,7 +275,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 knownModules = ImmutableSet.copyOf(msg.getModules());
             } else if (message instanceof RecoveryFailure) {
                 RecoveryFailure failure = (RecoveryFailure) message;
-                LOG.error(failure.cause(), "Recovery failed");
+                LOG.error("Recovery failed", failure.cause());
             } else if (message instanceof RecoveryCompleted) {
                 LOG.info("Recovery complete : {}", persistenceId());
 
@@ -257,6 +346,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                info.getActor().tell(datastoreContext, getSelf());
+            }
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
@@ -275,7 +373,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         if(newModules.containsAll(knownModules)) {
 
-            LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+            LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
 
             knownModules = ImmutableSet.copyOf(newModules);
 
@@ -283,21 +381,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 @Override
                 public void apply(SchemaContextModules param) throws Exception {
-                    LOG.info("Sending new SchemaContext to Shards");
+                    LOG.debug("Sending new SchemaContext to Shards");
                     for (ShardInformation info : localShards.values()) {
                         if (info.getActor() == null) {
                             info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
-                                            info.getPeerAddresses(), datastoreContext, schemaContext),
-                                    info.getShardId().toString()));
+                                    info.getPeerAddresses(), datastoreContext, schemaContext)
+                                            .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
                         } else {
                             info.getActor().tell(message, getSelf());
                         }
+                        info.getActor().tell(new RegisterRoleChangeListener(), self());
                     }
                 }
 
             });
         } else {
-            LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
+            LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
+                    newModules, knownModules);
         }
 
     }
@@ -424,12 +524,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             new Function<Throwable, SupervisorStrategy.Directive>() {
                 @Override
                 public SupervisorStrategy.Directive apply(Throwable t) {
-                    StringBuilder sb = new StringBuilder();
-                    for(StackTraceElement element : t.getStackTrace()) {
-                       sb.append("\n\tat ")
-                         .append(element.toString());
-                    }
-                    LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
+                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
                     return SupervisorStrategy.resume();
                 }
             }
@@ -452,6 +547,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return dataPersistenceProvider;
     }
 
+    @VisibleForTesting
+    ShardManagerInfoMBean getMBean(){
+        return mBean;
+    }
+
     private class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
@@ -462,7 +562,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
+        private boolean followerSyncStatus = false;
+
         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+        private String role ;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<ShardIdentifier, String> peerAddresses) {
@@ -530,27 +633,50 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void addRunnableOnInitialized(Runnable runnable) {
             runnablesOnInitialized.add(runnable);
         }
+
+        public void setRole(String newRole) {
+            this.role = newRole;
+        }
+
+        public String getRole(){
+            return this.role;
+        }
+
+        public void setFollowerSyncStatus(boolean syncStatus){
+            this.followerSyncStatus = syncStatus;
+        }
+
+        public boolean isInSync(){
+            if(RaftState.Follower.name().equals(this.role)){
+                return followerSyncStatus;
+            } else if(RaftState.Leader.name().equals(this.role)){
+                return true;
+            }
+
+            return false;
+        }
+
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
         private static final long serialVersionUID = 1L;
 
-        final String type;
         final ClusterWrapper cluster;
         final Configuration configuration;
         final DatastoreContext datastoreContext;
+        private final CountDownLatch waitTillReadyCountdownLatch;
 
-        ShardManagerCreator(String type, ClusterWrapper cluster,
-                Configuration configuration, DatastoreContext datastoreContext) {
-            this.type = type;
+        ShardManagerCreator(ClusterWrapper cluster,
+                            Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
+            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(type, cluster, configuration, datastoreContext);
+            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
         }
     }