X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=d33576d495fa66d2306ebb7d7378691269292e21;hb=refs%2Fchanges%2F78%2F20678%2F6;hp=52762b4eb352ff9de295e44969b13c4410b7f2f0;hpb=3c82a8f501a71ec8a40b170fc7ef12f8683c1842;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 52762b4eb3..d33576d495 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; @@ -41,6 +42,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -52,17 +55,21 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; -import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -114,10 +121,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final CountDownLatch waitTillReadyCountdownLatch; + private final PrimaryShardInfoFutureCache primaryShardInfoCache; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, + PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); @@ -128,6 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + this.primaryShardInfoCache = primaryShardInfoCache; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -136,20 +147,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider(); } public static Props props( final ClusterWrapper cluster, final Configuration configuration, final DatastoreContext datastoreContext, - final CountDownLatch waitTillReadyCountdownLatch) { + final CountDownLatch waitTillReadyCountdownLatch, + final PrimaryShardInfoFutureCache primaryShardInfoCache) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); + Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, + waitTillReadyCountdownLatch, primaryShardInfoCache)); } @Override @@ -174,7 +188,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ClusterEvent.MemberRemoved) { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { - ignoreMessage(message); + memberUnreachable((ClusterEvent.UnreachableMember)message); + } else if(message instanceof ClusterEvent.ReachableMember) { + memberReachable((ClusterEvent.ReachableMember) message); } else if(message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); } else if(message instanceof RoleChangeNotification) { @@ -183,27 +199,34 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else if(message instanceof ShardNotInitializedTimeout) { onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); - } else if(message instanceof LeaderStateChanged) { - onLeaderStateChanged((LeaderStateChanged)message); + } else if(message instanceof ShardLeaderStateChanged) { + onLeaderStateChanged((ShardLeaderStateChanged)message); } else { unknownMessage(message); } } - private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + private void checkReady(){ + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + } + + private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) { LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { - shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); - if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); + shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); + if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { + primaryShardInfoCache.remove(shardInformation.getShardName()); } + checkReady(); } else { LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); } @@ -247,14 +270,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); if(shardInformation != null) { shardInformation.setRole(roleChanged.getNewRole()); - - if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); - } - + checkReady(); mBean.setSyncStatus(isInSync()); } } @@ -437,6 +453,42 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), getShardActorPath(shardName, memberName), getSelf()); } + + checkReady(); + } + + private void memberReachable(ClusterEvent.ReachableMember message) { + String memberName = message.member().roles().head(); + LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + + markMemberAvailable(memberName); + } + + private void memberUnreachable(ClusterEvent.UnreachableMember message) { + String memberName = message.member().roles().head(); + LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + + markMemberUnavailable(memberName); + } + + private void markMemberUnavailable(final String memberName) { + for(ShardInformation info : localShards.values()){ + String leaderId = info.getLeaderId(); + if(leaderId != null && leaderId.contains(memberName)) { + LOG.debug("Marking Leader {} as unavailable.", leaderId); + info.setLeaderAvailable(false); + } + } + } + + private void markMemberAvailable(final String memberName) { + for(ShardInformation info : localShards.values()){ + String leaderId = info.getLeaderId(); + if(leaderId != null && leaderId.contains(memberName)) { + LOG.debug("Marking Leader {} as available.", leaderId); + info.setLeaderAvailable(true); + } + } } private void onDatastoreContext(DatastoreContext context) { @@ -508,6 +560,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: In findPrimary: {}", persistenceId(), message); final String shardName = message.getShardName(); + final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); @@ -515,7 +568,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - Object found = new PrimaryFound(info.getSerializedLeaderActor()); + String primaryPath = info.getSerializedLeaderActor(); + Object found = canReturnLocalShardState && info.isLeader() ? + new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + new RemotePrimaryShardFound(primaryPath); if(LOG.isDebugEnabled()) { LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -535,7 +591,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), shardName, path); - getContext().actorSelection(path).forward(message, getContext()); + getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName, + message.isWaitUntilReady()), getContext()); return; } } @@ -663,6 +720,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ActorRef actor; private ActorPath actorPath; private final Map peerAddresses; + private Optional localShardDataTree; + private boolean leaderAvailable = false; // flag that determines if the actor is ready for business private boolean actorInitialized = false; @@ -701,6 +760,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } + void setLocalDataTree(Optional localShardDataTree) { + this.localShardDataTree = localShardDataTree; + } + + Optional getLocalShardDataTree() { + return localShardDataTree; + } + Map getPeerAddresses() { return peerAddresses; } @@ -729,7 +796,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() { @@ -807,10 +874,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } - void setLeaderId(String leaderId) { + boolean setLeaderId(String leaderId) { + boolean changed = !Objects.equal(this.leaderId, leaderId); this.leaderId = leaderId; - + if(leaderId != null) { + this.leaderAvailable = true; + } notifyOnShardInitializedCallbacks(); + + return changed; + } + + public String getLeaderId() { + return leaderId; + } + + public void setLeaderAvailable(boolean leaderAvailable) { + this.leaderAvailable = leaderAvailable; } } @@ -821,18 +901,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Configuration configuration; final DatastoreContext datastoreContext; private final CountDownLatch waitTillReadyCountdownLatch; + private final PrimaryShardInfoFutureCache primaryShardInfoCache; - ShardManagerCreator(ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { + ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext, + CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + this.primaryShardInfoCache = primaryShardInfoCache; } @Override public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, + primaryShardInfoCache); } }