From: Tom Pantelis Date: Sun, 17 May 2015 16:20:05 +0000 (-0400) Subject: Bug 2970: Remove SchemaContextModules perisistence X-Git-Tag: release/beryllium~522 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=6d9b1057a07bd6efc29be852721070e1fec5a496 Bug 2970: Remove SchemaContextModules perisistence Now that the GlobalBundleScanningSchemaServiceImpl has been fixed to load all schema contexts on startup, we no longer need to persist the last known schema context modules in the ShardManager. Change-Id: Ie5d261e3725c680d4365d63fecdb2115772f69ba Signed-off-by: Tom Pantelis (cherry picked from commit 7e5c1fa516878ee5744e95f784e724381335b21e) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 3241134492..6de370e1af 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -18,9 +18,7 @@ import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; -import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; -import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -28,22 +26,15 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.NonPersistentDataProvider; -import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -70,7 +61,6 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,10 +105,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContext datastoreContext; - private Collection knownModules = Collections.emptySet(); - - private final DataPersistenceProvider dataPersistenceProvider; - private final CountDownLatch waitTillReadyCountdownLatch; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @@ -132,7 +118,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; - this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = @@ -146,10 +131,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } - protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider(); - } - public static Props props( final ClusterWrapper cluster, final Configuration configuration, @@ -337,26 +318,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { - if(dataPersistenceProvider.isRecoveryApplicable()) { - if (message instanceof SchemaContextModules) { - SchemaContextModules msg = (SchemaContextModules) message; - knownModules = ImmutableSet.copyOf(msg.getModules()); - } else if (message instanceof RecoveryFailure) { - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error("Recovery failed", failure.cause()); - } else if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); - - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); - } - } else { - if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); - // Delete all the messages from the akka journal - deleteMessages(lastSequenceNr()); - } + // We no longer persist SchemaContext modules so delete all the prior messages from the akka + // journal on upgrade from Helium. + deleteMessages(lastSequenceNr()); } } @@ -510,40 +477,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void updateSchemaContext(final Object message) { final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); - Set allModuleIdentifiers = schemaContext.getAllModuleIdentifiers(); - Set newModules = new HashSet<>(128); - - for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){ - String s = moduleIdentifier.getNamespace().toString(); - newModules.add(s); - } - - if(newModules.containsAll(knownModules)) { - - LOG.debug("New SchemaContext has a super set of current knownModules - persisting info"); - - knownModules = ImmutableSet.copyOf(newModules); + LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); - dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { - - @Override - public void apply(SchemaContextModules param) throws Exception { - LOG.debug("Sending new SchemaContext to Shards"); - for (ShardInformation info : localShards.values()) { - if (info.getActor() == null) { - info.setActor(newShardActor(schemaContext, info)); - } else { - info.getActor().tell(message, getSelf()); - } - } - } - - }); - } else { - LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}", - newModules, knownModules); + for (ShardInformation info : localShards.values()) { + if (info.getActor() == null) { + LOG.debug("Creating Shard {}", info.getShardId()); + info.setActor(newShardActor(schemaContext, info)); + } else { + info.getActor().tell(message, getSelf()); + } } - } @VisibleForTesting @@ -700,16 +643,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return "shard-manager-" + type; } - @VisibleForTesting - Collection getKnownModules() { - return knownModules; - } - - @VisibleForTesting - DataPersistenceProvider getDataPersistenceProvider() { - return dataPersistenceProvider; - } - @VisibleForTesting ShardManagerInfoMBean getMBean(){ return mBean; @@ -972,6 +905,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + /** + * We no longer persist SchemaContextModules but keep this class around for now for backwards + * compatibility so we don't get de-serialization failures on upgrade from Helium. + */ + @Deprecated static class SchemaContextModules implements Serializable { private static final long serialVersionUID = -8884620101025936590L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 33d5a2ed98..ae8db2b5f5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,7 +1,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -9,7 +8,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.AddressFromURIString; @@ -26,16 +24,11 @@ import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.net.URI; import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -43,7 +36,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -71,7 +63,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; @@ -674,170 +665,11 @@ public class ShardManagerTest extends AbstractActorTest { // Journal entries up to the last one should've been deleted Map journal = InMemoryJournal.get(shardMgrID); synchronized (journal) { - assertEquals("Journal size", 1, journal.size()); - assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next()); + assertEquals("Journal size", 0, journal.size()); } }}; } - @Test - public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception { - final ImmutableSet persistedModules = ImmutableSet.of("foo", "bar"); - InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( - persistedModules)); - new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), - Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); - - shardManager.underlyingActor().waitForRecoveryComplete(); - - Collection knownModules = shardManager.underlyingActor().getKnownModules(); - - assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules)); - }}; - } - - @Test - public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules() - throws Exception { - new JavaTestKit(getSystem()) {{ - final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps(true)); - - assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size()); - - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); - - Set moduleIdentifierSet = new HashSet<>(); - moduleIdentifierSet.add(foo); - - SchemaContext schemaContext = mock(SchemaContext.class); - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - - ModuleIdentifier bar = mock(ModuleIdentifier.class); - when(bar.getNamespace()).thenReturn(new URI("bar")); - - moduleIdentifierSet.add(bar); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - }}; - } - - @Test - public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules() - throws Exception { - new JavaTestKit(getSystem()) {{ - final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps(true)); - - SchemaContext schemaContext = mock(SchemaContext.class); - Set moduleIdentifierSet = new HashSet<>(); - - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); - - moduleIdentifierSet.add(foo); - - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - - //Create a completely different SchemaContext with only the bar module in it - //schemaContext = mock(SchemaContext.class); - moduleIdentifierSet.clear(); - ModuleIdentifier bar = mock(ModuleIdentifier.class); - when(bar.getNamespace()).thenReturn(new URI("bar")); - - moduleIdentifierSet.add(bar); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - - }}; - } - - @Test - public void testRecoveryApplicable(){ - new JavaTestKit(getSystem()) { - { - final Props persistentProps = newShardMgrProps(true); - final TestActorRef persistentShardManager = - TestActorRef.create(getSystem(), persistentProps); - - DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider(); - - assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); - - final Props nonPersistentProps = newShardMgrProps(false); - final TestActorRef nonPersistentShardManager = - TestActorRef.create(getSystem(), nonPersistentProps); - - DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider(); - - assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable()); - - - }}; - - } - - @Test - public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider() - throws Exception { - final CountDownLatch persistLatch = new CountDownLatch(1); - final Creator creator = new Creator() { - private static final long serialVersionUID = 1L; - @Override - public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), - ready, new PrimaryShardInfoFutureCache()) { - @Override - protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - DataPersistenceProviderMonitor dataPersistenceProviderMonitor - = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setPersistLatch(persistLatch); - return dataPersistenceProviderMonitor; - } - }; - } - }; - - new JavaTestKit(getSystem()) {{ - - final TestActorRef shardManager = - TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator))); - - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); - - Set moduleIdentifierSet = new HashSet<>(); - moduleIdentifierSet.add(foo); - - SchemaContext schemaContext = mock(SchemaContext.class); - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("Persisted", true, - Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS)); - - }}; - } - @Test public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) {