X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=5f59672ed987b4f8cb8b47cb4e82da67ca4b4f69;hb=8f81c2a44a3295f6f721268e3fd09e4cbcd02287;hp=d33576d495fa66d2306ebb7d7378691269292e21;hpb=6823ef1fd8f0a2f7ea39a2e85276a73e5de0bf72;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index d33576d495..5f59672ed9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -18,9 +18,7 @@ import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; -import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; -import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -28,22 +26,16 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.NonPersistentDataProvider; -import org.opendaylight.controller.cluster.PersistentDataProvider; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -62,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -69,12 +62,13 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * The ShardManager has the following jobs, @@ -115,10 +109,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContext datastoreContext; - private Collection knownModules = Collections.emptySet(); - - private final DataPersistenceProvider dataPersistenceProvider; - private final CountDownLatch waitTillReadyCountdownLatch; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @@ -132,7 +122,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; - this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = @@ -146,10 +135,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } - protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider(); - } - public static Props props( final ClusterWrapper cluster, final Configuration configuration, @@ -200,7 +185,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ShardNotInitializedTimeout) { onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); } else if(message instanceof ShardLeaderStateChanged) { - onLeaderStateChanged((ShardLeaderStateChanged)message); + onLeaderStateChanged((ShardLeaderStateChanged) message); + } else if(message instanceof SwitchShardBehavior){ + onSwitchShardBehavior((SwitchShardBehavior) message); } else { unknownMessage(message); } @@ -222,6 +209,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); + shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { primaryShardInfoCache.remove(shardInformation.getShardName()); } @@ -337,26 +325,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { - if(dataPersistenceProvider.isRecoveryApplicable()) { - if (message instanceof SchemaContextModules) { - SchemaContextModules msg = (SchemaContextModules) message; - knownModules = ImmutableSet.copyOf(msg.getModules()); - } else if (message instanceof RecoveryFailure) { - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error("Recovery failed", failure.cause()); - } else if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); - - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); - } - } else { - if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); - // Delete all the messages from the akka journal - deleteMessages(lastSequenceNr()); - } + // We no longer persist SchemaContext modules so delete all the prior messages from the akka + // journal on upgrade from Helium. + deleteMessages(lastSequenceNr()); } } @@ -397,8 +371,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); + FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration(); + if(shardInformation.isShardInitialized()) { + // If the shard is already initialized then we'll wait enough time for the shard to + // elect a leader, ie 2 times the election timeout. + timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS); + } + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( - datastoreContext.getShardInitializationTimeout().duration(), getSelf(), + timeout, getSelf(), new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), getContext().dispatcher(), getSelf()); @@ -421,9 +403,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { - return new NoShardLeaderException(String.format( - "Could not find a leader for shard %s. This typically happens when the system is coming up or " + - "recovering and a leader is being elected. Try again later.", shardId)); + return new NoShardLeaderException(null, shardId.toString()); } private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { @@ -477,6 +457,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(leaderId != null && leaderId.contains(memberName)) { LOG.debug("Marking Leader {} as unavailable.", leaderId); info.setLeaderAvailable(false); + + primaryShardInfoCache.remove(info.getShardName()); } } } @@ -500,6 +482,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onSwitchShardBehavior(SwitchShardBehavior message) { + ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build(); + + ShardInformation shardInformation = localShards.get(identifier.getShardName()); + + if(shardInformation != null && shardInformation.getActor() != null) { + shardInformation.getActor().tell( + new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf()); + } else { + LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", + message.getShardName(), message.getNewState()); + } + } + /** * Notifies all the local shards of a change in the schema context * @@ -508,40 +504,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void updateSchemaContext(final Object message) { final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); - Set allModuleIdentifiers = schemaContext.getAllModuleIdentifiers(); - Set newModules = new HashSet<>(128); - - for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){ - String s = moduleIdentifier.getNamespace().toString(); - newModules.add(s); - } + LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); - if(newModules.containsAll(knownModules)) { - - LOG.debug("New SchemaContext has a super set of current knownModules - persisting info"); - - knownModules = ImmutableSet.copyOf(newModules); - - dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { - - @Override - public void apply(SchemaContextModules param) throws Exception { - LOG.debug("Sending new SchemaContext to Shards"); - for (ShardInformation info : localShards.values()) { - if (info.getActor() == null) { - info.setActor(newShardActor(schemaContext, info)); - } else { - info.getActor().tell(message, getSelf()); - } - } - } - - }); - } else { - LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}", - newModules, knownModules); + for (ShardInformation info : localShards.values()) { + if (info.getActor() == null) { + LOG.debug("Creating Shard {}", info.getShardId()); + info.setActor(newShardActor(schemaContext, info)); + } else { + info.getActor().tell(message, getSelf()); + } } - } @VisibleForTesting @@ -571,7 +543,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String primaryPath = info.getSerializedLeaderActor(); Object found = canReturnLocalShardState && info.isLeader() ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : - new RemotePrimaryShardFound(primaryPath); + new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); if(LOG.isDebugEnabled()) { LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -650,8 +622,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } - mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, + mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, datastoreContext.getDataStoreMXBeanType(), localShardActorNames); + + mBean.setShardManager(this); } /** @@ -698,16 +672,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return "shard-manager-" + type; } - @VisibleForTesting - Collection getKnownModules() { - return knownModules; - } - - @VisibleForTesting - DataPersistenceProvider getDataPersistenceProvider() { - return dataPersistenceProvider; - } - @VisibleForTesting ShardManagerInfoMBean getMBean(){ return mBean; @@ -731,6 +695,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set onShardInitializedSet = Sets.newHashSet(); private String role ; private String leaderId; + private short leaderVersion; private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -796,7 +761,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); + return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && + (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() { @@ -885,13 +851,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return changed; } - public String getLeaderId() { + String getLeaderId() { return leaderId; } - public void setLeaderAvailable(boolean leaderAvailable) { + void setLeaderAvailable(boolean leaderAvailable) { this.leaderAvailable = leaderAvailable; } + + short getLeaderVersion() { + return leaderVersion; + } + + void setLeaderVersion(short leaderVersion) { + this.leaderVersion = leaderVersion; + } } private static class ShardManagerCreator implements Creator { @@ -970,6 +944,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + /** + * We no longer persist SchemaContextModules but keep this class around for now for backwards + * compatibility so we don't get de-serialization failures on upgrade from Helium. + */ + @Deprecated static class SchemaContextModules implements Serializable { private static final long serialVersionUID = -8884620101025936590L;