Bug 2135: Create ShardInformation on startup 33/12033/5
authortpantelis <tpanteli@brocade.com>
Wed, 8 Oct 2014 11:21:43 +0000 (07:21 -0400)
committertpantelis <tpanteli@brocade.com>
Thu, 9 Oct 2014 01:55:51 +0000 (21:55 -0400)
Modified the ShardManager to create the ShardInformation map on startup,
but without creating the shard actors yet, instead of when the SchemaContext is initialized.

When the SchemaContext is fully initialized then create the shard actors.

On FindLocalShard and FindPrimaryShard messages, only return a valid
response if the shard actor is created and it's initialized.

Change-Id: I361f2af9e53878f62c5890350afd7a5f2877b95c
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java

index e68628dbf5c1fc992afb30ae986fff8ed8f6eef1..157f1cb3771cd71ddd1ddf14d2541bef3a0aefc3 100644 (file)
@@ -24,6 +24,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -42,7 +43,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -94,7 +94,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
-    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+    protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
             DatastoreContext datastoreContext) {
 
         this.type = Preconditions.checkNotNull(type, "type should not be null");
@@ -105,7 +105,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        //createLocalShards(null);
+        createLocalShards();
     }
 
     public static Props props(final String type,
@@ -123,8 +123,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     @Override
     public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
-            findPrimary(
-                FindPrimary.fromSerializable(message));
+            findPrimary(FindPrimary.fromSerializable(message));
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -160,7 +159,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         markShardAsInitialized(shardId.getShardName());
     }
 
-    @VisibleForTesting protected void markShardAsInitialized(String shardName) {
+    private void markShardAsInitialized(String shardName) {
         LOG.debug("Initializing shard [{}]", shardName);
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
@@ -168,8 +167,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    @Override protected void handleRecover(Object message) throws Exception {
-
+    @Override
+    protected void handleRecover(Object message) throws Exception {
         if(message instanceof SchemaContextModules){
             SchemaContextModules msg = (SchemaContextModules) message;
             knownModules.clear();
@@ -186,23 +185,28 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findLocalShard(FindLocalShard message) {
-        ShardInformation shardInformation = localShards.get(message.getShardName());
+        final ShardInformation shardInformation = localShards.get(message.getShardName());
 
         if(shardInformation == null){
             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
             return;
         }
 
-        sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor()));
+        sendResponse(shardInformation, new Supplier<Object>() {
+            @Override
+            public Object get() {
+                return new LocalShardFound(shardInformation.getActor());
+            }
+        });
     }
 
-    private void sendResponse(ShardInformation shardInformation, Object message) {
-        if (!shardInformation.isShardInitialized()) {
+    private void sendResponse(ShardInformation shardInformation,  Supplier<Object> messageSupplier) {
+        if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) {
             getSender().tell(new ActorNotInitialized(), getSelf());
             return;
         }
 
-        getSender().tell(message, getSelf());
+        getSender().tell(messageSupplier.get(), getSelf());
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
@@ -246,12 +250,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
 
-                @Override public void apply(SchemaContextModules param) throws Exception {
+                @Override
+                public void apply(SchemaContextModules param) throws Exception {
                     LOG.info("Sending new SchemaContext to Shards");
-                    if (localShards.size() == 0) {
-                        createLocalShards(schemaContext);
-                    } else {
-                        for (ShardInformation info : localShards.values()) {
+                    for (ShardInformation info : localShards.values()) {
+                        if(info.getActor() == null) {
+                            info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
+                                    info.getPeerAddresses(), datastoreContext, schemaContext),
+                                    info.getShardId().toString()));
+                        } else {
                             info.getActor().tell(message, getSelf());
                         }
                     }
@@ -265,14 +272,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findPrimary(FindPrimary message) {
-        final ActorRef sender = getSender();
         String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
-        ShardInformation info = localShards.get(shardName);
+        final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            ActorPath shardPath = info.getActorPath();
-            sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable());
+            sendResponse(info, new Supplier<Object>() {
+                @Override
+                public Object get() {
+                    return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+                }
+            });
+
             return;
         }
 
@@ -331,7 +342,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * runs
      *
      */
-    private void createLocalShards(SchemaContext schemaContext) {
+    private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
@@ -340,11 +351,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
-            ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext),
-                    shardId.toString());
             localShardActorNames.add(shardId.toString());
-            localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
+            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
@@ -398,63 +406,78 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
-    @Override public String persistenceId() {
+    @Override
+    public String persistenceId() {
         return "shard-manager-" + type;
     }
 
-    @VisibleForTesting public Collection<String> getKnownModules() {
+    @VisibleForTesting
+    Collection<String> getKnownModules() {
         return knownModules;
     }
 
     private class ShardInformation {
+        private final ShardIdentifier shardId;
         private final String shardName;
-        private final ActorRef actor;
-        private final ActorPath actorPath;
+        private ActorRef actor;
+        private ActorPath actorPath;
         private final Map<ShardIdentifier, String> peerAddresses;
-        private boolean shardInitialized = false; //flag that determines if the actor is ready for business
+        private boolean shardInitialized = false; // flag that determines if the actor is ready for business
 
-        private ShardInformation(String shardName, ActorRef actor,
-            Map<ShardIdentifier, String> peerAddresses) {
+        private ShardInformation(String shardName, ShardIdentifier shardId,
+                Map<ShardIdentifier, String> peerAddresses) {
             this.shardName = shardName;
-            this.actor = actor;
-            this.actorPath = actor.path();
+            this.shardId = shardId;
             this.peerAddresses = peerAddresses;
         }
 
-        public String getShardName() {
+        String getShardName() {
             return shardName;
         }
 
-        public ActorRef getActor(){
+        ActorRef getActor(){
             return actor;
         }
 
-        public ActorPath getActorPath() {
+        ActorPath getActorPath() {
             return actorPath;
         }
 
-        public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+        void setActor(ActorRef actor) {
+            this.actor = actor;
+            this.actorPath = actor.path();
+        }
+
+        ShardIdentifier getShardId() {
+            return shardId;
+        }
+
+        Map<ShardIdentifier, String> getPeerAddresses() {
+            return peerAddresses;
+        }
+
+        void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug(
-                        "Sending PeerAddressResolved for peer {} with address {} to {}",
-                        peerId, peerAddress, actor.path());
-                }
-                actor
-                    .tell(new PeerAddressResolved(peerId, peerAddress),
-                        getSelf());
 
+                if(actor != null) {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+                                peerId, peerAddress, actor.path());
+                    }
+
+                    actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+                }
             }
         }
 
-        public boolean isShardInitialized() {
+        boolean isShardInitialized() {
             return shardInitialized;
         }
 
-        public void setShardInitialized(boolean shardInitialized) {
+        void setShardInitialized(boolean shardInitialized) {
             this.shardInitialized = shardInitialized;
         }
     }
@@ -482,6 +505,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     static class SchemaContextModules implements Serializable {
+        private static final long serialVersionUID = 1L;
+
         private final Set<String> modules;
 
         SchemaContextModules(Set<String> modules){
index dcfa35c0dc2f5ad1bbbc985af557fd92e631b99a..5022d97997dfad32ef29ae16865a7e7dc3c2b6e1 100644 (file)
@@ -1,29 +1,20 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.Props;
-import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
-import akka.persistence.PersistentImpl;
-import akka.persistence.PersistentRepr;
-import akka.persistence.journal.japi.AsyncWriteJournal;
+import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.japi.Creator;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -32,316 +23,257 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.Future;
-
 import java.net.URI;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
-import static junit.framework.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class ShardManagerTest {
-    private static ActorSystem system;
-    Configuration mockConfig = new MockConfiguration();
-    private static ActorRef defaultShardMockActor;
-
-    @BeforeClass
-    public static void setUpClass() {
-        Map<String, String> myJournal = new HashMap<>();
-        myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
-        myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
-        Config config = ConfigFactory.load()
-                .withValue("akka.persistence.journal.plugin",
-                        ConfigValueFactory.fromAnyRef("my-journal"))
-                .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
-
-        MyJournal.clear();
+public class ShardManagerTest extends AbstractActorTest {
+    private static int ID_COUNTER = 1;
 
-        system = ActorSystem.create("test", config);
+    private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
+    private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
 
-        String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
-        defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name);
+    private static ActorRef mockShardActor;
 
+    @Before
+    public void setUp() {
+        InMemoryJournal.clear();
 
+        if(mockShardActor == null) {
+            String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
+            mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
+        }
     }
 
-    @AfterClass
-    public static void tearDown() {
-        JavaTestKit.shutdownActorSystem(system);
-        system = null;
+    @After
+    public void tearDown() {
+        InMemoryJournal.clear();
     }
 
-    @Before
-    public void setUpTest(){
-        MyJournal.clear();
+    private Props newShardMgrProps() {
+        return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
+                DatastoreContext.newBuilder().build());
     }
 
     @Test
     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
-        new JavaTestKit(system) {
-            {
-                final Props props = ShardManager
-                        .props("config", new MockClusterWrapper(),
-                                new MockConfiguration(), DatastoreContext.newBuilder().build());
-
-                final ActorRef subject = getSystem().actorOf(props);
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-                subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
 
-                expectMsgEquals(duration("2 seconds"),
-                        new PrimaryNotFound("inventory").toSerializable());
-            }};
+            expectMsgEquals(duration("5 seconds"),
+                    new PrimaryNotFound("non-existent").toSerializable());
+        }};
     }
 
     @Test
     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
 
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+
+            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+        }};
+    }
 
-            final ActorRef subject = getSystem().actorOf(props);
+    @Test
+    public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
-            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-            subject.tell(new ActorInitialized(), defaultShardMockActor);
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
 
-            expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
-        }
-        };
+            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+        }};
     }
 
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            final ActorRef subject = getSystem().actorOf(props);
+            shardManager.tell(new FindLocalShard("non-existent"), getRef());
 
-            subject.tell(new FindLocalShard("inventory"), getRef());
+            LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
 
-            final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
-                @Override
-                protected String match(Object in) {
-                    if (in instanceof LocalShardNotFound) {
-                        return ((LocalShardNotFound) in).getShardName();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get(); // this extracts the received message
-
-            assertEquals("inventory", out);
+            assertEquals("getShardName", "non-existent", notFound.getShardName());
         }};
     }
 
     @Test
     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
-        final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
 
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", mockClusterWrapper,
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
 
-            final ActorRef subject = getSystem().actorOf(props);
+            LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+            assertTrue("Found path contains " + found.getPath().path().toString(),
+                    found.getPath().path().toString().contains("member-1-shard-default-config"));
+        }};
+    }
 
-            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-            subject.tell(new ActorInitialized(), defaultShardMockActor);
+    @Test
+    public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
-            subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            //shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
-                @Override
-                protected ActorRef match(Object in) {
-                    if (in instanceof LocalShardFound) {
-                        return ((LocalShardFound) in).getPath();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get(); // this extracts the received message
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
 
-            assertTrue(out.path().toString(),
-                    out.path().toString().contains("member-1-shard-default-config"));
+            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
         }};
     }
 
     @Test
     public void testOnReceiveMemberUp() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
-
-            final ActorRef subject = getSystem().actorOf(props);
+            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 
-            MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
-
-            final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected String match(Object in) {
-                    if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
-                        PrimaryFound f = PrimaryFound.fromSerializable(in);
-                        return f.getPrimaryPath();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get(); // this extracts the received message
-
-            assertTrue(out, out.contains("member-2-shard-astronauts-config"));
+            PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
+                    PrimaryFound.SERIALIZABLE_CLASS));
+            String path = found.getPrimaryPath();
+            assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
         }};
     }
 
     @Test
     public void testOnReceiveMemberDown() throws Exception {
 
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
-
-            final ActorRef subject = getSystem().actorOf(props);
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
-            MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 
-            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-            expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 
-            MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+            MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
 
-            subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-            expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+            expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
         }};
     }
 
     @Test
-    public void testOnRecoveryJournalIsEmptied(){
-        MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
+    public void testOnRecoveryJournalIsCleaned() {
+        InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
                 ImmutableSet.of("foo")));
-
-        assertEquals(1, MyJournal.get().size());
-
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
-
-            final ActorRef subject = getSystem().actorOf(props);
-
-            // Send message to check that ShardManager is ready
-            subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
-
-            expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
-
-            assertEquals(0, MyJournal.get().size());
+        InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+                ImmutableSet.of("bar")));
+        InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+
+        new JavaTestKit(getSystem()) {{
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+            shardManager.underlyingActor().waitForRecoveryComplete();
+            InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+
+            // Journal entries up to the last one should've been deleted
+            Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
+            synchronized (journal) {
+                assertEquals("Journal size", 1, journal.size());
+                assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
+            }
         }};
     }
 
     @Test
     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
-            final TestActorRef<ShardManager> subject =
-                    TestActorRef.create(system, props);
+        final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
+        InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+                persistedModules));
+        new JavaTestKit(getSystem()) {{
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
 
-            subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
+            shardManager.underlyingActor().waitForRecoveryComplete();
 
-            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+            Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
 
-            assertTrue(knownModules.contains("foo"));
+            assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
         }};
     }
 
     @Test
     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
             throws Exception {
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
-            final TestActorRef<ShardManager> subject =
-                    TestActorRef.create(system, props);
-
-            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+        new JavaTestKit(getSystem()) {{
+            final TestActorRef<ShardManager> shardManager =
+                    TestActorRef.create(getSystem(), newShardMgrProps());
 
-            assertEquals(0, knownModules.size());
-
-            SchemaContext schemaContext = mock(SchemaContext.class);
-            Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+            assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
 
             ModuleIdentifier foo = mock(ModuleIdentifier.class);
             when(foo.getNamespace()).thenReturn(new URI("foo"));
 
+            Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
             moduleIdentifierSet.add(foo);
 
+            SchemaContext schemaContext = mock(SchemaContext.class);
             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
 
-            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
-
-            assertTrue(knownModules.contains("foo"));
+            shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
 
-            assertEquals(1, knownModules.size());
+            assertEquals("getKnownModules", Sets.newHashSet("foo"),
+                    Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
 
             ModuleIdentifier bar = mock(ModuleIdentifier.class);
             when(bar.getNamespace()).thenReturn(new URI("bar"));
 
             moduleIdentifierSet.add(bar);
 
-            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
-
-            assertTrue(knownModules.contains("bar"));
-
-            assertEquals(2, knownModules.size());
+            shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
 
+            assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
+                    Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
         }};
-
     }
 
-
     @Test
     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
             throws Exception {
-        new JavaTestKit(system) {{
-            final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), DatastoreContext.newBuilder().build());
-            final TestActorRef<ShardManager> subject =
-                    TestActorRef.create(system, props);
-
-            Collection<String> knownModules = subject.underlyingActor().getKnownModules();
-
-            assertEquals(0, knownModules.size());
+        new JavaTestKit(getSystem()) {{
+            final TestActorRef<ShardManager> shardManager =
+                    TestActorRef.create(getSystem(), newShardMgrProps());
 
             SchemaContext schemaContext = mock(SchemaContext.class);
             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
@@ -353,103 +285,65 @@ public class ShardManagerTest {
 
             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
 
-            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+            shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
 
-            assertTrue(knownModules.contains("foo"));
-
-            assertEquals(1, knownModules.size());
+            assertEquals("getKnownModules", Sets.newHashSet("foo"),
+                    Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
 
             //Create a completely different SchemaContext with only the bar module in it
-            schemaContext = mock(SchemaContext.class);
-            moduleIdentifierSet = new HashSet<>();
+            //schemaContext = mock(SchemaContext.class);
+            moduleIdentifierSet.clear();
             ModuleIdentifier bar = mock(ModuleIdentifier.class);
             when(bar.getNamespace()).thenReturn(new URI("bar"));
 
             moduleIdentifierSet.add(bar);
 
-            subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
-
-            assertFalse(knownModules.contains("bar"));
+            shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
 
-            assertEquals(1, knownModules.size());
+            assertEquals("getKnownModules", Sets.newHashSet("foo"),
+                    Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
 
         }};
-
-    }
-
-
-    private void sleep(long period){
-        Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
     }
 
-    public static class MyJournal extends AsyncWriteJournal {
-
-        private static Map<Long, Object> journal = Maps.newTreeMap();
-
-        public static void addToJournal(Long sequenceNr, Object value){
-            journal.put(sequenceNr, value);
-        }
 
-        public static Map<Long, Object> get(){
-            return journal;
-        }
+    private static class TestShardManager extends ShardManager {
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
-        public static void clear(){
-            journal.clear();
+        TestShardManager(String shardMrgIDSuffix) {
+            super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
+                    DatastoreContext.newBuilder().build());
         }
 
-        @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
-                                                            final Procedure<PersistentRepr> replayCallback) {
-            if(journal.size() == 0){
-                return Futures.successful(null);
-            }
-            return Futures.future(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    for (Map.Entry<Long, Object> entry : journal.entrySet()) {
-                        PersistentRepr persistentMessage =
-                                new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
-                                        false, null, null);
-                        replayCallback.apply(persistentMessage);
-                    }
-                    return null;
+        @Override
+        public void handleRecover(Object message) throws Exception {
+            try {
+                super.handleRecover(message);
+            } finally {
+                if(message instanceof RecoveryCompleted) {
+                    recoveryComplete.countDown();
                 }
-            }, context().dispatcher());
+            }
         }
 
-        @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
-            return Futures.successful(-1L);
+        void waitForRecoveryComplete() {
+            assertEquals("Recovery complete", true,
+                    Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
         }
+    }
 
-        @Override public Future<Void> doAsyncWriteMessages(
-                final Iterable<PersistentRepr> persistentReprs) {
-            return Futures.future(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    for (PersistentRepr repr : persistentReprs){
-                        if(repr.payload() instanceof ShardManager.SchemaContextModules) {
-                            journal.put(repr.sequenceNr(), repr.payload());
-                        }
-                    }
-                    return null;
-                }
-            }, context().dispatcher());
-        }
+    @SuppressWarnings("serial")
+    static class TestShardManagerCreator implements Creator<TestShardManager> {
+        String shardMrgIDSuffix;
 
-        @Override public Future<Void> doAsyncWriteConfirmations(
-                Iterable<PersistentConfirmation> persistentConfirmations) {
-            return Futures.successful(null);
+        TestShardManagerCreator(String shardMrgIDSuffix) {
+            this.shardMrgIDSuffix = shardMrgIDSuffix;
         }
 
-        @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
-                                                            boolean b) {
-            clear();
-            return Futures.successful(null);
+        @Override
+        public TestShardManager create() throws Exception {
+            return new TestShardManager(shardMrgIDSuffix);
         }
 
-        @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
-            clear();
-            return Futures.successful(null);
-        }
     }
 }
index c9a0eaf0337929408ec4a16d7c6479a4cf329f69..34867530821d70d4ce7085859a3f79e59a6d9a03 100644 (file)
@@ -7,10 +7,16 @@
  */
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static org.junit.Assert.assertEquals;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
 import scala.concurrent.Future;
 import akka.dispatch.Futures;
 import akka.japi.Procedure;
@@ -22,7 +28,9 @@ import akka.persistence.journal.japi.AsyncWriteJournal;
 
 public class InMemoryJournal extends AsyncWriteJournal {
 
-    private static Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+    private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+
+    private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
 
     public static void addEntry(String persistenceId, long sequenceNr, Object data) {
         Map<Long, Object> journal = journals.get(persistenceId);
@@ -31,13 +39,29 @@ public class InMemoryJournal extends AsyncWriteJournal {
             journals.put(persistenceId, journal);
         }
 
-        journal.put(sequenceNr, data);
+        synchronized (journal) {
+            journal.put(sequenceNr, data);
+        }
     }
 
     public static void clear() {
         journals.clear();
     }
 
+    public static Map<Long, Object> get(String persistenceId) {
+        Map<Long, Object> journal = journals.get(persistenceId);
+        return journal != null ? journal : Collections.<Long, Object>emptyMap();
+    }
+
+    public static void waitForDeleteMessagesComplete(String persistenceId) {
+        assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(
+                deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS));
+    }
+
+    public static void addDeleteMessagesCompleteLatch(String persistenceId) {
+        deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
+    }
+
     @Override
     public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
             long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
@@ -49,10 +73,13 @@ public class InMemoryJournal extends AsyncWriteJournal {
                     return null;
                 }
 
-                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
-                    PersistentRepr persistentMessage =
-                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
-                    replayCallback.apply(persistentMessage);
+                synchronized (journal) {
+                    for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+                        PersistentRepr persistentMessage =
+                                new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+                                        false, null, null);
+                        replayCallback.apply(persistentMessage);
+                    }
                 }
 
                 return null;
@@ -62,12 +89,28 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
-        return Futures.successful(new Long(0));
+        return Futures.successful(-1L);
     }
 
     @Override
-    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
-        return Futures.successful(null);
+    public Future<Void> doAsyncWriteMessages(final Iterable<PersistentRepr> messages) {
+        return Futures.future(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                for (PersistentRepr repr : messages) {
+                    Map<Long, Object> journal = journals.get(repr.persistenceId());
+                    if(journal == null) {
+                        journal = Maps.newLinkedHashMap();
+                        journals.put(repr.persistenceId(), journal);
+                    }
+
+                    synchronized (journal) {
+                        journal.put(repr.sequenceNr(), repr.payload());
+                    }
+                }
+                return null;
+            }
+        }, context().dispatcher());
     }
 
     @Override
@@ -82,6 +125,24 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        Map<Long, Object> journal = journals.get(persistenceId);
+        if(journal != null) {
+            synchronized (journal) {
+                Iterator<Long> iter = journal.keySet().iterator();
+                while(iter.hasNext()) {
+                    Long n = iter.next();
+                    if(n <= toSequenceNr) {
+                        iter.remove();
+                    }
+                }
+            }
+        }
+
+        CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId);
+        if(latch != null) {
+            latch.countDown();
+        }
+
         return Futures.successful(null);
     }
 }