From: tpantelis Date: Wed, 8 Oct 2014 11:21:43 +0000 (-0400) Subject: Bug 2135: Create ShardInformation on startup X-Git-Tag: release/helium-sr1~41 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d386367faeea25cd19b178ce1bd6fa76066e90ed Bug 2135: Create ShardInformation on startup Modified the ShardManager to create the ShardInformation map on startup, but without creating the shard actors yet, instead of when the SchemaContext is initialized. When the SchemaContext is fully initialized then create the shard actors. On FindLocalShard and FindPrimaryShard messages, only return a valid response if the shard actor is created and it's initialized. Change-Id: I361f2af9e53878f62c5890350afd7a5f2877b95c Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e68628dbf5..157f1cb377 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -42,7 +43,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -94,7 +94,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational */ - private ShardManager(String type, ClusterWrapper cluster, Configuration configuration, + protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { this.type = Preconditions.checkNotNull(type, "type should not be null"); @@ -105,7 +105,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - //createLocalShards(null); + createLocalShards(); } public static Props props(final String type, @@ -123,8 +123,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { - findPrimary( - FindPrimary.fromSerializable(message)); + findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { @@ -160,7 +159,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { markShardAsInitialized(shardId.getShardName()); } - @VisibleForTesting protected void markShardAsInitialized(String shardName) { + private void markShardAsInitialized(String shardName) { LOG.debug("Initializing shard [{}]", shardName); ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { @@ -168,8 +167,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @Override protected void handleRecover(Object message) throws Exception { - + @Override + protected void handleRecover(Object message) throws Exception { if(message instanceof SchemaContextModules){ SchemaContextModules msg = (SchemaContextModules) message; knownModules.clear(); @@ -186,23 +185,28 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findLocalShard(FindLocalShard message) { - ShardInformation shardInformation = localShards.get(message.getShardName()); + final ShardInformation shardInformation = localShards.get(message.getShardName()); if(shardInformation == null){ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); return; } - sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor())); + sendResponse(shardInformation, new Supplier() { + @Override + public Object get() { + return new LocalShardFound(shardInformation.getActor()); + } + }); } - private void sendResponse(ShardInformation shardInformation, Object message) { - if (!shardInformation.isShardInitialized()) { + private void sendResponse(ShardInformation shardInformation, Supplier messageSupplier) { + if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) { getSender().tell(new ActorNotInitialized(), getSelf()); return; } - getSender().tell(message, getSelf()); + getSender().tell(messageSupplier.get(), getSelf()); } private void memberRemoved(ClusterEvent.MemberRemoved message) { @@ -246,12 +250,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { persist(new SchemaContextModules(newModules), new Procedure() { - @Override public void apply(SchemaContextModules param) throws Exception { + @Override + public void apply(SchemaContextModules param) throws Exception { LOG.info("Sending new SchemaContext to Shards"); - if (localShards.size() == 0) { - createLocalShards(schemaContext); - } else { - for (ShardInformation info : localShards.values()) { + for (ShardInformation info : localShards.values()) { + if(info.getActor() == null) { + info.setActor(getContext().actorOf(Shard.props(info.getShardId(), + info.getPeerAddresses(), datastoreContext, schemaContext), + info.getShardId().toString())); + } else { info.getActor().tell(message, getSelf()); } } @@ -265,14 +272,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findPrimary(FindPrimary message) { - final ActorRef sender = getSender(); String shardName = message.getShardName(); // First see if the there is a local replica for the shard - ShardInformation info = localShards.get(shardName); + final ShardInformation info = localShards.get(shardName); if (info != null) { - ActorPath shardPath = info.getActorPath(); - sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable()); + sendResponse(info, new Supplier() { + @Override + public Object get() { + return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + } + }); + return; } @@ -331,7 +342,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * runs * */ - private void createLocalShards(SchemaContext schemaContext) { + private void createLocalShards() { String memberName = this.cluster.getCurrentMemberName(); List memberShardNames = this.configuration.getMemberShardNames(memberName); @@ -340,11 +351,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); - ActorRef actor = getContext() - .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext), - shardId.toString()); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, @@ -398,63 +406,78 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } - @Override public String persistenceId() { + @Override + public String persistenceId() { return "shard-manager-" + type; } - @VisibleForTesting public Collection getKnownModules() { + @VisibleForTesting + Collection getKnownModules() { return knownModules; } private class ShardInformation { + private final ShardIdentifier shardId; private final String shardName; - private final ActorRef actor; - private final ActorPath actorPath; + private ActorRef actor; + private ActorPath actorPath; private final Map peerAddresses; - private boolean shardInitialized = false; //flag that determines if the actor is ready for business + private boolean shardInitialized = false; // flag that determines if the actor is ready for business - private ShardInformation(String shardName, ActorRef actor, - Map peerAddresses) { + private ShardInformation(String shardName, ShardIdentifier shardId, + Map peerAddresses) { this.shardName = shardName; - this.actor = actor; - this.actorPath = actor.path(); + this.shardId = shardId; this.peerAddresses = peerAddresses; } - public String getShardName() { + String getShardName() { return shardName; } - public ActorRef getActor(){ + ActorRef getActor(){ return actor; } - public ActorPath getActorPath() { + ActorPath getActorPath() { return actorPath; } - public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void setActor(ActorRef actor) { + this.actor = actor; + this.actorPath = actor.path(); + } + + ShardIdentifier getShardId() { + return shardId; + } + + Map getPeerAddresses() { + return peerAddresses; + } + + void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ peerAddresses.put(peerId, peerAddress); - if(LOG.isDebugEnabled()) { - LOG.debug( - "Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); - } - actor - .tell(new PeerAddressResolved(peerId, peerAddress), - getSelf()); + if(actor != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); + } + + actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + } } } - public boolean isShardInitialized() { + boolean isShardInitialized() { return shardInitialized; } - public void setShardInitialized(boolean shardInitialized) { + void setShardInitialized(boolean shardInitialized) { this.shardInitialized = shardInitialized; } } @@ -482,6 +505,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } static class SchemaContextModules implements Serializable { + private static final long serialVersionUID = 1L; + private final Set modules; SchemaContextModules(Set modules){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index dcfa35c0dc..5022d97997 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,29 +1,20 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Props; -import akka.dispatch.Futures; -import akka.japi.Procedure; -import akka.persistence.PersistentConfirmation; -import akka.persistence.PersistentId; -import akka.persistence.PersistentImpl; -import akka.persistence.PersistentRepr; -import akka.persistence.journal.japi.AsyncWriteJournal; +import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.japi.Creator; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -32,316 +23,257 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.Future; - import java.net.URI; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - -import static junit.framework.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ShardManagerTest { - private static ActorSystem system; - Configuration mockConfig = new MockConfiguration(); - private static ActorRef defaultShardMockActor; - - @BeforeClass - public static void setUpClass() { - Map myJournal = new HashMap<>(); - myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal"); - myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher"); - Config config = ConfigFactory.load() - .withValue("akka.persistence.journal.plugin", - ConfigValueFactory.fromAnyRef("my-journal")) - .withValue("my-journal", ConfigValueFactory.fromMap(myJournal)); - - MyJournal.clear(); +public class ShardManagerTest extends AbstractActorTest { + private static int ID_COUNTER = 1; - system = ActorSystem.create("test", config); + private final String shardMrgIDSuffix = "config" + ID_COUNTER++; + private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix; - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); - defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name); + private static ActorRef mockShardActor; + @Before + public void setUp() { + InMemoryJournal.clear(); + if(mockShardActor == null) { + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); + mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name); + } } - @AfterClass - public static void tearDown() { - JavaTestKit.shutdownActorSystem(system); - system = null; + @After + public void tearDown() { + InMemoryJournal.clear(); } - @Before - public void setUpTest(){ - MyJournal.clear(); + private Props newShardMgrProps() { + return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().build()); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - new JavaTestKit(system) { - { - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); - - final ActorRef subject = getSystem().actorOf(props); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new FindPrimary("inventory").toSerializable(), getRef()); + shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef()); - expectMsgEquals(duration("2 seconds"), - new PrimaryNotFound("inventory").toSerializable()); - }}; + expectMsgEquals(duration("5 seconds"), + new PrimaryNotFound("non-existent").toSerializable()); + }}; } @Test public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + }}; + } - final ActorRef subject = getSystem().actorOf(props); + @Test + public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new ActorInitialized(), defaultShardMockActor); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); - expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS); - } - }; + expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + }}; } @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - final ActorRef subject = getSystem().actorOf(props); + shardManager.tell(new FindLocalShard("non-existent"), getRef()); - subject.tell(new FindLocalShard("inventory"), getRef()); + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); - final String out = new ExpectMsg(duration("3 seconds"), "find local") { - @Override - protected String match(Object in) { - if (in instanceof LocalShardNotFound) { - return ((LocalShardNotFound) in).getShardName(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("inventory", out); + assertEquals("getShardName", "non-existent", notFound.getShardName()); }}; } @Test public void testOnReceiveFindLocalShardForExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper(); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", mockClusterWrapper, - new MockConfiguration(), DatastoreContext.newBuilder().build()); + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); - final ActorRef subject = getSystem().actorOf(props); + LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertTrue("Found path contains " + found.getPath().path().toString(), + found.getPath().path().toString().contains("member-1-shard-default-config")); + }}; + } - subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new ActorInitialized(), defaultShardMockActor); + @Test + public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + //shardManager.tell(new ActorInitialized(), mockShardActor); - final ActorRef out = new ExpectMsg(duration("3 seconds"), "find local") { - @Override - protected ActorRef match(Object in) { - if (in instanceof LocalShardFound) { - return ((LocalShardFound) in).getPath(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); - assertTrue(out.path().toString(), - out.path().toString().contains("member-1-shard-default-config")); + expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); }}; } @Test public void testOnReceiveMemberUp() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); - - final ActorRef subject = getSystem().actorOf(props); + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString()); + shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef()); - subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); - - final String out = new ExpectMsg(duration("3 seconds"), "primary found") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { - PrimaryFound f = PrimaryFound.fromSerializable(in); - return f.getPrimaryPath(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertTrue(out, out.contains("member-2-shard-astronauts-config")); + PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"), + PrimaryFound.SERIALIZABLE_CLASS)); + String path = found.getPrimaryPath(); + assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config")); }}; } @Test public void testOnReceiveMemberDown() throws Exception { - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); - - final ActorRef subject = getSystem().actorOf(props); + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString()); + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef()); - expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); - MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString()); + MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString()); - subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef()); - expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); + expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); }}; } @Test - public void testOnRecoveryJournalIsEmptied(){ - MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules( + public void testOnRecoveryJournalIsCleaned() { + InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( ImmutableSet.of("foo"))); - - assertEquals(1, MyJournal.get().size()); - - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); - - final ActorRef subject = getSystem().actorOf(props); - - // Send message to check that ShardManager is ready - subject.tell(new FindPrimary("unknown").toSerializable(), getRef()); - - expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); - - assertEquals(0, MyJournal.get().size()); + InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules( + ImmutableSet.of("bar"))); + InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); + + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + + shardManager.underlyingActor().waitForRecoveryComplete(); + InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); + + // Journal entries up to the last one should've been deleted + Map journal = InMemoryJournal.get(shardMgrID); + synchronized (journal) { + assertEquals("Journal size", 1, journal.size()); + assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next()); + } }}; } @Test public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception { - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); - final TestActorRef subject = - TestActorRef.create(system, props); + final ImmutableSet persistedModules = ImmutableSet.of("foo", "bar"); + InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + persistedModules)); + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); - subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo"))); + shardManager.underlyingActor().waitForRecoveryComplete(); - Collection knownModules = subject.underlyingActor().getKnownModules(); + Collection knownModules = shardManager.underlyingActor().getKnownModules(); - assertTrue(knownModules.contains("foo")); + assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules)); }}; } @Test public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules() throws Exception { - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); - final TestActorRef subject = - TestActorRef.create(system, props); - - Collection knownModules = subject.underlyingActor().getKnownModules(); + new JavaTestKit(getSystem()) {{ + final TestActorRef shardManager = + TestActorRef.create(getSystem(), newShardMgrProps()); - assertEquals(0, knownModules.size()); - - SchemaContext schemaContext = mock(SchemaContext.class); - Set moduleIdentifierSet = new HashSet<>(); + assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size()); ModuleIdentifier foo = mock(ModuleIdentifier.class); when(foo.getNamespace()).thenReturn(new URI("foo")); + Set moduleIdentifierSet = new HashSet<>(); moduleIdentifierSet.add(foo); + SchemaContext schemaContext = mock(SchemaContext.class); when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertTrue(knownModules.contains("foo")); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - assertEquals(1, knownModules.size()); + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); ModuleIdentifier bar = mock(ModuleIdentifier.class); when(bar.getNamespace()).thenReturn(new URI("bar")); moduleIdentifierSet.add(bar); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertTrue(knownModules.contains("bar")); - - assertEquals(2, knownModules.size()); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); }}; - } - @Test public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules() throws Exception { - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), DatastoreContext.newBuilder().build()); - final TestActorRef subject = - TestActorRef.create(system, props); - - Collection knownModules = subject.underlyingActor().getKnownModules(); - - assertEquals(0, knownModules.size()); + new JavaTestKit(getSystem()) {{ + final TestActorRef shardManager = + TestActorRef.create(getSystem(), newShardMgrProps()); SchemaContext schemaContext = mock(SchemaContext.class); Set moduleIdentifierSet = new HashSet<>(); @@ -353,103 +285,65 @@ public class ShardManagerTest { when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - assertTrue(knownModules.contains("foo")); - - assertEquals(1, knownModules.size()); + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); //Create a completely different SchemaContext with only the bar module in it - schemaContext = mock(SchemaContext.class); - moduleIdentifierSet = new HashSet<>(); + //schemaContext = mock(SchemaContext.class); + moduleIdentifierSet.clear(); ModuleIdentifier bar = mock(ModuleIdentifier.class); when(bar.getNamespace()).thenReturn(new URI("bar")); moduleIdentifierSet.add(bar); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertFalse(knownModules.contains("bar")); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - assertEquals(1, knownModules.size()); + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); }}; - - } - - - private void sleep(long period){ - Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS); } - public static class MyJournal extends AsyncWriteJournal { - - private static Map journal = Maps.newTreeMap(); - - public static void addToJournal(Long sequenceNr, Object value){ - journal.put(sequenceNr, value); - } - public static Map get(){ - return journal; - } + private static class TestShardManager extends ShardManager { + private final CountDownLatch recoveryComplete = new CountDownLatch(1); - public static void clear(){ - journal.clear(); + TestShardManager(String shardMrgIDSuffix) { + super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().build()); } - @Override public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max, - final Procedure replayCallback) { - if(journal.size() == 0){ - return Futures.successful(null); - } - return Futures.future(new Callable() { - @Override - public Void call() throws Exception { - for (Map.Entry entry : journal.entrySet()) { - PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, - false, null, null); - replayCallback.apply(persistentMessage); - } - return null; + @Override + public void handleRecover(Object message) throws Exception { + try { + super.handleRecover(message); + } finally { + if(message instanceof RecoveryCompleted) { + recoveryComplete.countDown(); } - }, context().dispatcher()); + } } - @Override public Future doAsyncReadHighestSequenceNr(String s, long l) { - return Futures.successful(-1L); + void waitForRecoveryComplete() { + assertEquals("Recovery complete", true, + Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } + } - @Override public Future doAsyncWriteMessages( - final Iterable persistentReprs) { - return Futures.future(new Callable() { - @Override - public Void call() throws Exception { - for (PersistentRepr repr : persistentReprs){ - if(repr.payload() instanceof ShardManager.SchemaContextModules) { - journal.put(repr.sequenceNr(), repr.payload()); - } - } - return null; - } - }, context().dispatcher()); - } + @SuppressWarnings("serial") + static class TestShardManagerCreator implements Creator { + String shardMrgIDSuffix; - @Override public Future doAsyncWriteConfirmations( - Iterable persistentConfirmations) { - return Futures.successful(null); + TestShardManagerCreator(String shardMrgIDSuffix) { + this.shardMrgIDSuffix = shardMrgIDSuffix; } - @Override public Future doAsyncDeleteMessages(Iterable persistentIds, - boolean b) { - clear(); - return Futures.successful(null); + @Override + public TestShardManager create() throws Exception { + return new TestShardManager(shardMrgIDSuffix); } - @Override public Future doAsyncDeleteMessagesTo(String s, long l, boolean b) { - clear(); - return Futures.successful(null); - } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java index c9a0eaf033..3486753082 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java @@ -7,10 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore.utils; +import static org.junit.Assert.assertEquals; +import java.util.Collections; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Uninterruptibles; import scala.concurrent.Future; import akka.dispatch.Futures; import akka.japi.Procedure; @@ -22,7 +28,9 @@ import akka.persistence.journal.japi.AsyncWriteJournal; public class InMemoryJournal extends AsyncWriteJournal { - private static Map> journals = new ConcurrentHashMap<>(); + private static final Map> journals = new ConcurrentHashMap<>(); + + private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); public static void addEntry(String persistenceId, long sequenceNr, Object data) { Map journal = journals.get(persistenceId); @@ -31,13 +39,29 @@ public class InMemoryJournal extends AsyncWriteJournal { journals.put(persistenceId, journal); } - journal.put(sequenceNr, data); + synchronized (journal) { + journal.put(sequenceNr, data); + } } public static void clear() { journals.clear(); } + public static Map get(String persistenceId) { + Map journal = journals.get(persistenceId); + return journal != null ? journal : Collections.emptyMap(); + } + + public static void waitForDeleteMessagesComplete(String persistenceId) { + assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly( + deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)); + } + + public static void addDeleteMessagesCompleteLatch(String persistenceId) { + deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1)); + } + @Override public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max, final Procedure replayCallback) { @@ -49,10 +73,13 @@ public class InMemoryJournal extends AsyncWriteJournal { return null; } - for (Map.Entry entry : journal.entrySet()) { - PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null); - replayCallback.apply(persistentMessage); + synchronized (journal) { + for (Map.Entry entry : journal.entrySet()) { + PersistentRepr persistentMessage = + new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, + false, null, null); + replayCallback.apply(persistentMessage); + } } return null; @@ -62,12 +89,28 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { - return Futures.successful(new Long(0)); + return Futures.successful(-1L); } @Override - public Future doAsyncWriteMessages(Iterable messages) { - return Futures.successful(null); + public Future doAsyncWriteMessages(final Iterable messages) { + return Futures.future(new Callable() { + @Override + public Void call() throws Exception { + for (PersistentRepr repr : messages) { + Map journal = journals.get(repr.persistenceId()); + if(journal == null) { + journal = Maps.newLinkedHashMap(); + journals.put(repr.persistenceId(), journal); + } + + synchronized (journal) { + journal.put(repr.sequenceNr(), repr.payload()); + } + } + return null; + } + }, context().dispatcher()); } @Override @@ -82,6 +125,24 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + Map journal = journals.get(persistenceId); + if(journal != null) { + synchronized (journal) { + Iterator iter = journal.keySet().iterator(); + while(iter.hasNext()) { + Long n = iter.next(); + if(n <= toSequenceNr) { + iter.remove(); + } + } + } + } + + CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId); + if(latch != null) { + latch.countDown(); + } + return Futures.successful(null); } }