X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=136c6813eaba9d7d116b4ba0b4609bbd4848fb13;hb=06e889c9c78457590b6a0b62d89a6b9f44242a9f;hp=d836a347c514b434db26a32cbb172d1671e99253;hpb=6561b5cd3b3d2d8bcb26a6c4f3b8efa52b63523d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index d836a347c5..136c6813ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -55,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -150,7 +152,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { - if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); @@ -166,16 +168,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ignoreMessage(message); } else if(message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); - } else if(message instanceof RoleChangeNotification){ + } else if(message instanceof RoleChangeNotification) { onRoleChangeNotification((RoleChangeNotification) message); + } else if(message instanceof FollowerInitialSyncUpStatus){ + onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else{ unknownMessage(message); } } - private void onRoleChangeNotification(RoleChangeNotification message) { - RoleChangeNotification roleChanged = message; + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { + LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(), + status.isInitialSyncDone()); + + ShardInformation shardInformation = findShardInformation(status.getName()); + + if(shardInformation != null) { + shardInformation.setFollowerSyncStatus(status.isInitialSyncDone()); + + mBean.setSyncStatus(isInSync()); + } + + } + + private void onRoleChangeNotification(RoleChangeNotification roleChanged) { LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), roleChanged.getOldRole(), roleChanged.getNewRole()); @@ -189,6 +206,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { waitTillReadyCountdownLatch.countDown(); } + + mBean.setSyncStatus(isInSync()); } } @@ -214,6 +233,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return isReady; } + private boolean isInSync(){ + for (ShardInformation info : localShards.values()) { + if(!info.isInSync()){ + return false; + } + } + return true; + } + private void onActorInitialized(Object message) { final ActorRef sender = getSender(); @@ -519,6 +547,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return dataPersistenceProvider; } + @VisibleForTesting + ShardManagerInfoMBean getMBean(){ + return mBean; + } + private class ShardInformation { private final ShardIdentifier shardId; private final String shardName; @@ -529,6 +562,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // flag that determines if the actor is ready for business private boolean actorInitialized = false; + private boolean followerSyncStatus = false; + private final List runnablesOnInitialized = Lists.newArrayList(); private String role ; @@ -607,6 +642,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return this.role; } + public void setFollowerSyncStatus(boolean syncStatus){ + this.followerSyncStatus = syncStatus; + } + + public boolean isInSync(){ + if(RaftState.Follower.name().equals(this.role)){ + return followerSyncStatus; + } else if(RaftState.Leader.name().equals(this.role)){ + return true; + } + + return false; + } + } private static class ShardManagerCreator implements Creator {