X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=5022d97997dfad32ef29ae16865a7e7dc3c2b6e1;hb=5c5b77dab25a38f8b0f9c09dc7aab9420f8dcb48;hp=fa436c16053bc42ad9835e7ecbccd1cb202fc4b8;hpb=c1362c86eb19e92e6c64d10099a45deb499c6db1;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index fa436c1605..5022d97997 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,70 +1,349 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSystem; +import akka.actor.ActorRef; import akka.actor.Props; +import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import akka.japi.Creator; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; -import scala.concurrent.duration.Duration; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import java.net.URI; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -public class ShardManagerTest { - private static ActorSystem system; +public class ShardManagerTest extends AbstractActorTest { + private static int ID_COUNTER = 1; - @BeforeClass - public static void setUp(){ - system = ActorSystem.create("test"); + private final String shardMrgIDSuffix = "config" + ID_COUNTER++; + private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix; + + private static ActorRef mockShardActor; + + @Before + public void setUp() { + InMemoryJournal.clear(); + + if(mockShardActor == null) { + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); + mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name); + } } - @AfterClass - public static void tearDown(){ - JavaTestKit.shutdownActorSystem(system); - system = null; + @After + public void tearDown() { + InMemoryJournal.clear(); + } + + private Props newShardMgrProps() { + return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().build()); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - new JavaTestKit(system) {{ - final Props props = ShardManager.props("config"); - final TestActorRef subject = TestActorRef.create(system, props); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - new Within(duration("1 seconds")) { - protected void run() { + shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef()); - subject.tell(new FindPrimary("inventory"), getRef()); + expectMsgEquals(duration("5 seconds"), + new PrimaryNotFound("non-existent").toSerializable()); + }}; + } - expectMsgEquals(Duration.Zero(), new PrimaryNotFound("inventory")); + @Test + public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - // Will wait for the rest of the 3 seconds - expectNoMsg(); - } - }; + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); }}; } - @Test - public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + @Test + public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - new JavaTestKit(system) {{ - final Props props = ShardManager.props("config"); - final TestActorRef subject = TestActorRef.create(system, props); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - // the run() method needs to finish within 3 seconds - new Within(duration("1 seconds")) { - protected void run() { + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); - subject.tell(new FindPrimary(Shard.DEFAULT_NAME), getRef()); + expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + }}; + } - expectMsgClass(PrimaryFound.class); + @Test + public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindLocalShard("non-existent"), getRef()); + + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + assertEquals("getShardName", "non-existent", notFound.getShardName()); + }}; + } + + @Test + public void testOnReceiveFindLocalShardForExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); + + LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertTrue("Found path contains " + found.getPath().path().toString(), + found.getPath().path().toString().contains("member-1-shard-default-config")); + }}; + } + + @Test + public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + //shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); + + expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + }}; + } + + @Test + public void testOnReceiveMemberUp() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + + shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + + PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"), + PrimaryFound.SERIALIZABLE_CLASS)); + String path = found.getPrimaryPath(); + assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config")); + }}; + } + + @Test + public void testOnReceiveMemberDown() throws Exception { + + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + + shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + + MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString()); + + shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); + }}; + } + + @Test + public void testOnRecoveryJournalIsCleaned() { + InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + ImmutableSet.of("foo"))); + InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules( + ImmutableSet.of("bar"))); + InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); - expectNoMsg(); + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + + shardManager.underlyingActor().waitForRecoveryComplete(); + InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); + + // Journal entries up to the last one should've been deleted + Map journal = InMemoryJournal.get(shardMgrID); + synchronized (journal) { + assertEquals("Journal size", 1, journal.size()); + assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next()); + } + }}; + } + + @Test + public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception { + final ImmutableSet persistedModules = ImmutableSet.of("foo", "bar"); + InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + persistedModules)); + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + + shardManager.underlyingActor().waitForRecoveryComplete(); + + Collection knownModules = shardManager.underlyingActor().getKnownModules(); + + assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules)); + }}; + } + + @Test + public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules() + throws Exception { + new JavaTestKit(getSystem()) {{ + final TestActorRef shardManager = + TestActorRef.create(getSystem(), newShardMgrProps()); + + assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size()); + + ModuleIdentifier foo = mock(ModuleIdentifier.class); + when(foo.getNamespace()).thenReturn(new URI("foo")); + + Set moduleIdentifierSet = new HashSet<>(); + moduleIdentifierSet.add(foo); + + SchemaContext schemaContext = mock(SchemaContext.class); + when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); + + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + + ModuleIdentifier bar = mock(ModuleIdentifier.class); + when(bar.getNamespace()).thenReturn(new URI("bar")); + + moduleIdentifierSet.add(bar); + + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + + assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + }}; + } + + @Test + public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules() + throws Exception { + new JavaTestKit(getSystem()) {{ + final TestActorRef shardManager = + TestActorRef.create(getSystem(), newShardMgrProps()); + + SchemaContext schemaContext = mock(SchemaContext.class); + Set moduleIdentifierSet = new HashSet<>(); + + ModuleIdentifier foo = mock(ModuleIdentifier.class); + when(foo.getNamespace()).thenReturn(new URI("foo")); + + moduleIdentifierSet.add(foo); + + when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); + + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + + //Create a completely different SchemaContext with only the bar module in it + //schemaContext = mock(SchemaContext.class); + moduleIdentifierSet.clear(); + ModuleIdentifier bar = mock(ModuleIdentifier.class); + when(bar.getNamespace()).thenReturn(new URI("bar")); + + moduleIdentifierSet.add(bar); + + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + + }}; + } + + + private static class TestShardManager extends ShardManager { + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + + TestShardManager(String shardMrgIDSuffix) { + super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().build()); + } + + @Override + public void handleRecover(Object message) throws Exception { + try { + super.handleRecover(message); + } finally { + if(message instanceof RecoveryCompleted) { + recoveryComplete.countDown(); + } + } + } + + void waitForRecoveryComplete() { + assertEquals("Recovery complete", true, + Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); + } + } + + @SuppressWarnings("serial") + static class TestShardManagerCreator implements Creator { + String shardMrgIDSuffix; + + TestShardManagerCreator(String shardMrgIDSuffix) { + this.shardMrgIDSuffix = shardMrgIDSuffix; } - }; - }}; - } -} \ No newline at end of file + + @Override + public TestShardManager create() throws Exception { + return new TestShardManager(shardMrgIDSuffix); + } + + } +}