X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=5f59672ed987b4f8cb8b47cb4e82da67ca4b4f69;hb=refs%2Fchanges%2F26%2F26426%2F2;hp=6de370e1afc1d39601d571d718093e950790d197;hpb=6d9b1057a07bd6efc29be852721070e1fec5a496;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 6de370e1af..5f59672ed9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -53,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -60,11 +62,13 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * The ShardManager has the following jobs, @@ -181,7 +185,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ShardNotInitializedTimeout) { onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); } else if(message instanceof ShardLeaderStateChanged) { - onLeaderStateChanged((ShardLeaderStateChanged)message); + onLeaderStateChanged((ShardLeaderStateChanged) message); + } else if(message instanceof SwitchShardBehavior){ + onSwitchShardBehavior((SwitchShardBehavior) message); } else { unknownMessage(message); } @@ -203,6 +209,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); + shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { primaryShardInfoCache.remove(shardInformation.getShardName()); } @@ -364,8 +371,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); + FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration(); + if(shardInformation.isShardInitialized()) { + // If the shard is already initialized then we'll wait enough time for the shard to + // elect a leader, ie 2 times the election timeout. + timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS); + } + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( - datastoreContext.getShardInitializationTimeout().duration(), getSelf(), + timeout, getSelf(), new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), getContext().dispatcher(), getSelf()); @@ -388,9 +403,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { - return new NoShardLeaderException(String.format( - "Could not find a leader for shard %s. This typically happens when the system is coming up or " + - "recovering and a leader is being elected. Try again later.", shardId)); + return new NoShardLeaderException(null, shardId.toString()); } private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { @@ -469,6 +482,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onSwitchShardBehavior(SwitchShardBehavior message) { + ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build(); + + ShardInformation shardInformation = localShards.get(identifier.getShardName()); + + if(shardInformation != null && shardInformation.getActor() != null) { + shardInformation.getActor().tell( + new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf()); + } else { + LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", + message.getShardName(), message.getNewState()); + } + } + /** * Notifies all the local shards of a change in the schema context * @@ -516,7 +543,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String primaryPath = info.getSerializedLeaderActor(); Object found = canReturnLocalShardState && info.isLeader() ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : - new RemotePrimaryShardFound(primaryPath); + new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); if(LOG.isDebugEnabled()) { LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -595,8 +622,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } - mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, + mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, datastoreContext.getDataStoreMXBeanType(), localShardActorNames); + + mBean.setShardManager(this); } /** @@ -666,6 +695,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set onShardInitializedSet = Sets.newHashSet(); private String role ; private String leaderId; + private short leaderVersion; private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -731,7 +761,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); + return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && + (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() { @@ -820,13 +851,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return changed; } - public String getLeaderId() { + String getLeaderId() { return leaderId; } - public void setLeaderAvailable(boolean leaderAvailable) { + void setLeaderAvailable(boolean leaderAvailable) { this.leaderAvailable = leaderAvailable; } + + short getLeaderVersion() { + return leaderVersion; + } + + void setLeaderVersion(short leaderVersion) { + this.leaderVersion = leaderVersion; + } } private static class ShardManagerCreator implements Creator {