X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=136c6813eaba9d7d116b4ba0b4609bbd4848fb13;hb=e638203390314beb735c5d54f7936122e8d01d42;hp=426a2e0934f173560647a13569b22d1e06f632b2;hpb=e3998d55e33da9f6ecb69da75ecc71a047b6362b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 426a2e0934..136c6813ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -22,6 +22,7 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; @@ -51,6 +53,10 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -90,18 +96,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardDispatcherPath; - private ShardManagerInfoMBean mBean; + private ShardManagerInfo mBean; - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; private Collection knownModules = Collections.emptySet(); private final DataPersistenceProvider dataPersistenceProvider; + private final CountDownLatch waitTillReadyCountdownLatch; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); @@ -110,6 +118,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.type = datastoreContext.getDataStoreType(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -124,17 +133,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { public static Props props( final ClusterWrapper cluster, final Configuration configuration, - final DatastoreContext datastoreContext) { + final DatastoreContext datastoreContext, + final CountDownLatch waitTillReadyCountdownLatch) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch)); + } + + @Override + public void postStop() { + LOG.info("Stopping ShardManager"); + + mBean.unregisterMBean(); } @Override public void handleCommand(Object message) throws Exception { - if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); @@ -148,12 +166,82 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { ignoreMessage(message); + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); + } else if(message instanceof RoleChangeNotification) { + onRoleChangeNotification((RoleChangeNotification) message); + } else if(message instanceof FollowerInitialSyncUpStatus){ + onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else{ unknownMessage(message); } } + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { + LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(), + status.isInitialSyncDone()); + + ShardInformation shardInformation = findShardInformation(status.getName()); + + if(shardInformation != null) { + shardInformation.setFollowerSyncStatus(status.isInitialSyncDone()); + + mBean.setSyncStatus(isInSync()); + } + + } + + private void onRoleChangeNotification(RoleChangeNotification roleChanged) { + LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), + roleChanged.getOldRole(), roleChanged.getNewRole()); + + ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setRole(roleChanged.getNewRole()); + + if (isReady()) { + LOG.info("All Shards are ready - data store {} is ready, available count is {}", type, + waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + + mBean.setSyncStatus(isInSync()); + } + } + + + private ShardInformation findShardInformation(String memberId) { + for(ShardInformation info : localShards.values()){ + if(info.getShardId().toString().equals(memberId)){ + return info; + } + } + + return null; + } + + private boolean isReady() { + boolean isReady = true; + for (ShardInformation info : localShards.values()) { + if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){ + isReady = false; + break; + } + } + return isReady; + } + + private boolean isInSync(){ + for (ShardInformation info : localShards.values()) { + if(!info.isInSync()){ + return false; + } + } + return true; + } + private void onActorInitialized(Object message) { final ActorRef sender = getSender(); @@ -258,6 +346,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + info.getActor().tell(datastoreContext, getSelf()); + } + } + } + /** * Notifies all the local shards of a change in the schema context * @@ -288,11 +385,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for (ShardInformation info : localShards.values()) { if (info.getActor() == null) { info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) + info.getPeerAddresses(), datastoreContext, schemaContext) .withDispatcher(shardDispatcherPath), info.getShardId().toString())); } else { info.getActor().tell(message, getSelf()); } + info.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -449,6 +547,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return dataPersistenceProvider; } + @VisibleForTesting + ShardManagerInfoMBean getMBean(){ + return mBean; + } + private class ShardInformation { private final ShardIdentifier shardId; private final String shardName; @@ -459,7 +562,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // flag that determines if the actor is ready for business private boolean actorInitialized = false; + private boolean followerSyncStatus = false; + private final List runnablesOnInitialized = Lists.newArrayList(); + private String role ; private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -527,6 +633,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void addRunnableOnInitialized(Runnable runnable) { runnablesOnInitialized.add(runnable); } + + public void setRole(String newRole) { + this.role = newRole; + } + + public String getRole(){ + return this.role; + } + + public void setFollowerSyncStatus(boolean syncStatus){ + this.followerSyncStatus = syncStatus; + } + + public boolean isInSync(){ + if(RaftState.Follower.name().equals(this.role)){ + return followerSyncStatus; + } else if(RaftState.Leader.name().equals(this.role)){ + return true; + } + + return false; + } + } private static class ShardManagerCreator implements Creator { @@ -535,17 +664,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ClusterWrapper cluster; final Configuration configuration; final DatastoreContext datastoreContext; + private final CountDownLatch waitTillReadyCountdownLatch; ShardManagerCreator(ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext) { + Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; } @Override public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext); + return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); } }