X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=426a2e0934f173560647a13569b22d1e06f632b2;hp=157f1cb3771cd71ddd1ddf14d2541bef3a0aefc3;hb=e3998d55e33da9f6ecb69da75ecc71a047b6362b;hpb=0eba94d9411ea40945ddc8c732640c0cc004599f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 157f1cb377..426a2e0934 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -15,8 +15,6 @@ import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.japi.Function; import akka.japi.Procedure; @@ -25,6 +23,18 @@ import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -40,17 +50,12 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; /** * The ShardManager has the following jobs, @@ -63,8 +68,7 @@ import java.util.Set; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + private final Logger LOG = LoggerFactory.getLogger(getClass()); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -84,23 +88,28 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Configuration configuration; + private final String shardDispatcherPath; + private ShardManagerInfoMBean mBean; private final DatastoreContext datastoreContext; - private final Collection knownModules = new HashSet<>(128); + private Collection knownModules = Collections.emptySet(); + + private final DataPersistenceProvider dataPersistenceProvider; /** - * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be - * configuration or operational */ - protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration, + protected ShardManager(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { - this.type = Preconditions.checkNotNull(type, "type should not be null"); this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; + this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); + this.type = datastoreContext.getDataStoreType(); + this.shardDispatcherPath = + new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -108,16 +117,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } - public static Props props(final String type, + protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { + return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + } + + public static Props props( final ClusterWrapper cluster, final Configuration configuration, final DatastoreContext datastoreContext) { - Preconditions.checkNotNull(type, "type should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); - return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); } @Override @@ -163,24 +175,32 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Initializing shard [{}]", shardName); ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { - shardInformation.setShardInitialized(true); + shardInformation.setActorInitialized(); } } @Override protected void handleRecover(Object message) throws Exception { - if(message instanceof SchemaContextModules){ - SchemaContextModules msg = (SchemaContextModules) message; - knownModules.clear(); - knownModules.addAll(msg.getModules()); - } else if(message instanceof RecoveryFailure){ - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error(failure.cause(), "Recovery failed"); - } else if(message instanceof RecoveryCompleted){ - LOG.info("Recovery complete : {}", persistenceId()); - - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); + if(dataPersistenceProvider.isRecoveryApplicable()) { + if (message instanceof SchemaContextModules) { + SchemaContextModules msg = (SchemaContextModules) message; + knownModules = ImmutableSet.copyOf(msg.getModules()); + } else if (message instanceof RecoveryFailure) { + RecoveryFailure failure = (RecoveryFailure) message; + LOG.error("Recovery failed", failure.cause()); + } else if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal except the last one + deleteMessages(lastSequenceNr() - 1); + } + } else { + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal + deleteMessages(lastSequenceNr()); + } } } @@ -192,7 +212,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, new Supplier() { + sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { @Override public Object get() { return new LocalShardFound(shardInformation.getActor()); @@ -200,9 +220,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void sendResponse(ShardInformation shardInformation, Supplier messageSupplier) { - if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) { - getSender().tell(new ActorNotInitialized(), getSelf()); + private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, + final Supplier messageSupplier) { + if (!shardInformation.isShardInitialized()) { + if(waitUntilInitialized) { + final ActorRef sender = getSender(); + final ActorRef self = self(); + shardInformation.addRunnableOnInitialized(new Runnable() { + @Override + public void run() { + sender.tell(messageSupplier.get(), self); + } + }); + } else { + getSender().tell(new ActorNotInitialized(), getSelf()); + } + return; } @@ -243,21 +276,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(newModules.containsAll(knownModules)) { - LOG.info("New SchemaContext has a super set of current knownModules - persisting info"); + LOG.debug("New SchemaContext has a super set of current knownModules - persisting info"); - knownModules.clear(); - knownModules.addAll(newModules); + knownModules = ImmutableSet.copyOf(newModules); - persist(new SchemaContextModules(newModules), new Procedure() { + dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { @Override public void apply(SchemaContextModules param) throws Exception { - LOG.info("Sending new SchemaContext to Shards"); + LOG.debug("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { - if(info.getActor() == null) { + if (info.getActor() == null) { info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext), - info.getShardId().toString())); + info.getPeerAddresses(), datastoreContext, schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString())); } else { info.getActor().tell(message, getSelf()); } @@ -266,7 +298,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } else { - LOG.info("Rejecting schema context update because it is not a super set of previously known modules"); + LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}", + newModules, knownModules); } } @@ -277,7 +310,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null) { - sendResponse(info, new Supplier() { + sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { @Override public Object get() { return new PrimaryFound(info.getActorPath().toString()).toSerializable(); @@ -393,12 +426,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { new Function() { @Override public SupervisorStrategy.Directive apply(Throwable t) { - StringBuilder sb = new StringBuilder(); - for(StackTraceElement element : t.getStackTrace()) { - sb.append("\n\tat ") - .append(element.toString()); - } - LOG.warning("Supervisor Strategy of resume applied {}",sb.toString()); + LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); return SupervisorStrategy.resume(); } } @@ -416,13 +444,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return knownModules; } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + private class ShardInformation { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; private ActorPath actorPath; private final Map peerAddresses; - private boolean shardInitialized = false; // flag that determines if the actor is ready for business + + // flag that determines if the actor is ready for business + private boolean actorInitialized = false; + + private final List runnablesOnInitialized = Lists.newArrayList(); private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -474,25 +511,33 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardInitialized() { - return shardInitialized; + return getActor() != null && actorInitialized; } - void setShardInitialized(boolean shardInitialized) { - this.shardInitialized = shardInitialized; + void setActorInitialized() { + this.actorInitialized = true; + + for(Runnable runnable: runnablesOnInitialized) { + runnable.run(); + } + + runnablesOnInitialized.clear(); + } + + void addRunnableOnInitialized(Runnable runnable) { + runnablesOnInitialized.add(runnable); } } private static class ShardManagerCreator implements Creator { private static final long serialVersionUID = 1L; - final String type; final ClusterWrapper cluster; final Configuration configuration; final DatastoreContext datastoreContext; - ShardManagerCreator(String type, ClusterWrapper cluster, + ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { - this.type = type; this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; @@ -500,12 +545,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public ShardManager create() throws Exception { - return new ShardManager(type, cluster, configuration, datastoreContext); + return new ShardManager(cluster, configuration, datastoreContext); } } static class SchemaContextModules implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -8884620101025936590L; private final Set modules;