X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=136c6813eaba9d7d116b4ba0b4609bbd4848fb13;hb=06e889c9c78457590b6a0b62d89a6b9f44242a9f;hp=e861165c6ba2592d7d5d14d98e5fb34591344d85;hpb=846b5ae74588943cc5bd176e219809ced07104d6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e861165c6b..136c6813ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -15,8 +15,6 @@ import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.japi.Function; import akka.japi.Procedure; @@ -24,8 +22,21 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -41,17 +52,16 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; /** * The ShardManager has the following jobs, @@ -64,8 +74,7 @@ import java.util.Set; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + private final Logger LOG = LoggerFactory.getLogger(getClass()); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -85,23 +94,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Configuration configuration; - private ShardManagerInfoMBean mBean; + private final String shardDispatcherPath; - private final DatastoreContext datastoreContext; + private ShardManagerInfo mBean; - private final Collection knownModules = new HashSet<>(128); + private DatastoreContext datastoreContext; + + private Collection knownModules = Collections.emptySet(); + + private final DataPersistenceProvider dataPersistenceProvider; + + private final CountDownLatch waitTillReadyCountdownLatch; /** - * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be - * configuration or operational */ - protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext) { + protected ShardManager(ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { - this.type = Preconditions.checkNotNull(type, "type should not be null"); this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; + this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); + this.type = datastoreContext.getDataStoreType(); + this.shardDispatcherPath = + new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -109,21 +126,33 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } - public static Props props(final String type, + protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { + return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + } + + public static Props props( final ClusterWrapper cluster, final Configuration configuration, - final DatastoreContext datastoreContext) { + final DatastoreContext datastoreContext, + final CountDownLatch waitTillReadyCountdownLatch) { - Preconditions.checkNotNull(type, "type should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); - return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch)); + } + + @Override + public void postStop() { + LOG.info("Stopping ShardManager"); + + mBean.unregisterMBean(); } @Override public void handleCommand(Object message) throws Exception { - if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); @@ -137,12 +166,82 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { ignoreMessage(message); + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); + } else if(message instanceof RoleChangeNotification) { + onRoleChangeNotification((RoleChangeNotification) message); + } else if(message instanceof FollowerInitialSyncUpStatus){ + onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else{ unknownMessage(message); } } + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { + LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(), + status.isInitialSyncDone()); + + ShardInformation shardInformation = findShardInformation(status.getName()); + + if(shardInformation != null) { + shardInformation.setFollowerSyncStatus(status.isInitialSyncDone()); + + mBean.setSyncStatus(isInSync()); + } + + } + + private void onRoleChangeNotification(RoleChangeNotification roleChanged) { + LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), + roleChanged.getOldRole(), roleChanged.getNewRole()); + + ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setRole(roleChanged.getNewRole()); + + if (isReady()) { + LOG.info("All Shards are ready - data store {} is ready, available count is {}", type, + waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + + mBean.setSyncStatus(isInSync()); + } + } + + + private ShardInformation findShardInformation(String memberId) { + for(ShardInformation info : localShards.values()){ + if(info.getShardId().toString().equals(memberId)){ + return info; + } + } + + return null; + } + + private boolean isReady() { + boolean isReady = true; + for (ShardInformation info : localShards.values()) { + if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){ + isReady = false; + break; + } + } + return isReady; + } + + private boolean isInSync(){ + for (ShardInformation info : localShards.values()) { + if(!info.isInSync()){ + return false; + } + } + return true; + } + private void onActorInitialized(Object message) { final ActorRef sender = getSender(); @@ -170,18 +269,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { - if(message instanceof SchemaContextModules){ - SchemaContextModules msg = (SchemaContextModules) message; - knownModules.clear(); - knownModules.addAll(msg.getModules()); - } else if(message instanceof RecoveryFailure){ - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error(failure.cause(), "Recovery failed"); - } else if(message instanceof RecoveryCompleted){ - LOG.info("Recovery complete : {}", persistenceId()); + if(dataPersistenceProvider.isRecoveryApplicable()) { + if (message instanceof SchemaContextModules) { + SchemaContextModules msg = (SchemaContextModules) message; + knownModules = ImmutableSet.copyOf(msg.getModules()); + } else if (message instanceof RecoveryFailure) { + RecoveryFailure failure = (RecoveryFailure) message; + LOG.error("Recovery failed", failure.cause()); + } else if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal except the last one + deleteMessages(lastSequenceNr() - 1); + } + } else { + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); + // Delete all the messages from the akka journal + deleteMessages(lastSequenceNr()); + } } } @@ -239,6 +346,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + info.getActor().tell(datastoreContext, getSelf()); + } + } + } + /** * Notifies all the local shards of a change in the schema context * @@ -257,30 +373,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(newModules.containsAll(knownModules)) { - LOG.info("New SchemaContext has a super set of current knownModules - persisting info"); + LOG.debug("New SchemaContext has a super set of current knownModules - persisting info"); - knownModules.clear(); - knownModules.addAll(newModules); + knownModules = ImmutableSet.copyOf(newModules); - persist(new SchemaContextModules(newModules), new Procedure() { + dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { @Override public void apply(SchemaContextModules param) throws Exception { - LOG.info("Sending new SchemaContext to Shards"); + LOG.debug("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { - if(info.getActor() == null) { + if (info.getActor() == null) { info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext), - info.getShardId().toString())); + info.getPeerAddresses(), datastoreContext, schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString())); } else { info.getActor().tell(message, getSelf()); } + info.getActor().tell(new RegisterRoleChangeListener(), self()); } } }); } else { - LOG.info("Rejecting schema context update because it is not a super set of previously known modules"); + LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}", + newModules, knownModules); } } @@ -407,12 +524,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { new Function() { @Override public SupervisorStrategy.Directive apply(Throwable t) { - StringBuilder sb = new StringBuilder(); - for(StackTraceElement element : t.getStackTrace()) { - sb.append("\n\tat ") - .append(element.toString()); - } - LOG.warning("Supervisor Strategy of resume applied {}",sb.toString()); + LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); return SupervisorStrategy.resume(); } } @@ -430,6 +542,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return knownModules; } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + + @VisibleForTesting + ShardManagerInfoMBean getMBean(){ + return mBean; + } + private class ShardInformation { private final ShardIdentifier shardId; private final String shardName; @@ -440,7 +562,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // flag that determines if the actor is ready for business private boolean actorInitialized = false; + private boolean followerSyncStatus = false; + private final List runnablesOnInitialized = Lists.newArrayList(); + private String role ; private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -508,31 +633,56 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void addRunnableOnInitialized(Runnable runnable) { runnablesOnInitialized.add(runnable); } + + public void setRole(String newRole) { + this.role = newRole; + } + + public String getRole(){ + return this.role; + } + + public void setFollowerSyncStatus(boolean syncStatus){ + this.followerSyncStatus = syncStatus; + } + + public boolean isInSync(){ + if(RaftState.Follower.name().equals(this.role)){ + return followerSyncStatus; + } else if(RaftState.Leader.name().equals(this.role)){ + return true; + } + + return false; + } + } private static class ShardManagerCreator implements Creator { private static final long serialVersionUID = 1L; - final String type; final ClusterWrapper cluster; final Configuration configuration; final DatastoreContext datastoreContext; + private final CountDownLatch waitTillReadyCountdownLatch; - ShardManagerCreator(String type, ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext) { - this.type = type; + ShardManagerCreator(ClusterWrapper cluster, + Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; } @Override public ShardManager create() throws Exception { - return new ShardManager(type, cluster, configuration, datastoreContext); + return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); } } static class SchemaContextModules implements Serializable { + private static final long serialVersionUID = -8884620101025936590L; + private final Set modules; SchemaContextModules(Set modules){