X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=88f818f0faedf76f0349ce1f7294dee37b9d79d1;hb=refs%2Fchanges%2F49%2F12649%2F4;hp=c7c382c82ffa2fc6db82664995d404320674c447;hpb=1bc01a15b1e7811ee59249eab7e815408518e354;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index c7c382c82f..88f818f0fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -44,6 +45,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; + import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -91,6 +93,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Collection knownModules = new HashSet<>(128); + private final DataPersistenceProvider dataPersistenceProvider; + /** * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational @@ -102,6 +106,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; + this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -109,6 +114,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } + protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { + return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + } + public static Props props(final String type, final ClusterWrapper cluster, final Configuration configuration, @@ -170,18 +179,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { - if(message instanceof SchemaContextModules){ - SchemaContextModules msg = (SchemaContextModules) message; - knownModules.clear(); - knownModules.addAll(msg.getModules()); - } else if(message instanceof RecoveryFailure){ - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error(failure.cause(), "Recovery failed"); - } else if(message instanceof RecoveryCompleted){ - LOG.info("Recovery complete : {}", persistenceId()); - - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); + if(dataPersistenceProvider.isRecoveryApplicable()) { + if (message instanceof SchemaContextModules) { + SchemaContextModules msg = (SchemaContextModules) message; + knownModules.clear(); + knownModules.addAll(msg.getModules()); + } else if (message instanceof RecoveryFailure) { + RecoveryFailure failure = (RecoveryFailure) message; + LOG.error(failure.cause(), "Recovery failed"); + } else if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal except the last one + deleteMessages(lastSequenceNr() - 1); + } + } else { + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal + deleteMessages(lastSequenceNr()); + } } } @@ -262,15 +280,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { knownModules.clear(); knownModules.addAll(newModules); - persist(new SchemaContextModules(newModules), new Procedure() { + dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { @Override public void apply(SchemaContextModules param) throws Exception { LOG.info("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { - if(info.getActor() == null) { + if (info.getActor() == null) { info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext), + info.getPeerAddresses(), datastoreContext, schemaContext), info.getShardId().toString())); } else { info.getActor().tell(message, getSelf()); @@ -430,6 +448,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return knownModules; } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + private class ShardInformation { private final ShardIdentifier shardId; private final String shardName; @@ -534,7 +557,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { static class SchemaContextModules implements Serializable { private static final long serialVersionUID = 1L; - private final Set modules; SchemaContextModules(Set modules){