Merge "Bug-2098- sal-compatibility not get up-to-date flow information"
authorMoiz Raja <moraja@cisco.com>
Mon, 27 Oct 2014 23:20:21 +0000 (23:20 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 27 Oct 2014 23:20:21 +0000 (23:20 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/samples/toaster-provider/src/main/java/org/opendaylight/controller/sample/toaster/provider/OpendaylightToaster.java

index a498826e98977d8b3fba89348e3284f0a4781ef3..9a77e4d568961b72f26dec9716b29a5f5f0b9ccd 100644 (file)
@@ -19,8 +19,12 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DataChangeListener extends AbstractUntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class);
+
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
     private boolean notificationsEnabled = false;
 
@@ -29,7 +33,8 @@ public class DataChangeListener extends AbstractUntypedActor {
         this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
     }
 
-    @Override public void handleReceive(Object message) throws Exception {
+    @Override
+    public void handleReceive(Object message) throws Exception {
         if(message instanceof DataChanged){
             dataChanged(message);
         } else if(message instanceof EnableNotification){
@@ -39,18 +44,24 @@ public class DataChangeListener extends AbstractUntypedActor {
 
     private void enableNotification(EnableNotification message) {
         notificationsEnabled = message.isEnabled();
+        LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+                listener);
     }
 
     private void dataChanged(Object message) {
 
         // Do nothing if notifications are not enabled
-        if(!notificationsEnabled){
+        if(!notificationsEnabled) {
+            LOG.debug("Notifications not enabled for listener {} - dropping change notification",
+                    listener);
             return;
         }
 
         DataChanged reply = (DataChanged) message;
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
-            change = reply.getChange();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = reply.getChange();
+
+        LOG.debug("Sending change notification {} to listener {}", change, listener);
+
         this.listener.onDataChanged(change);
 
         // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
index d831b7c1844006fca5bea575ee20d656c4b616c4..789d51a19f88942e3a35ceb7fc4d69cb20c8abcb 100644 (file)
@@ -76,7 +76,6 @@ import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.Nonnull;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -113,7 +112,10 @@ public class Shard extends RaftActor {
 
     private final ShardStats shardMBean;
 
-    private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+    private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
+
+    private final List<DelayedListenerRegistration> delayedListenerRegistrations =
+                                                                       Lists.newArrayList();
 
     private final DatastoreContext datastoreContext;
 
@@ -576,53 +578,60 @@ public class Shard extends RaftActor {
         store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
-    @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+    @VisibleForTesting
+    void updateSchemaContext(SchemaContext schemaContext) {
         store.onGlobalContextUpdated(schemaContext);
     }
 
-    private void registerChangeListener(
-        RegisterChangeListener registerChangeListener) {
+    private void registerChangeListener(RegisterChangeListener registerChangeListener) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("registerDataChangeListener for {}", registerChangeListener
-                .getPath());
+        LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                     NormalizedNode<?, ?>>> registration;
+        if(isLeader()) {
+            registration = doChangeListenerRegistration(registerChangeListener);
+        } else {
+            LOG.debug("Shard is not the leader - delaying registration");
+
+            DelayedListenerRegistration delayedReg =
+                    new DelayedListenerRegistration(registerChangeListener);
+            delayedListenerRegistrations.add(delayedReg);
+            registration = delayedReg;
         }
 
+        ActorRef listenerRegistration = getContext().actorOf(
+                DataChangeListenerRegistration.props(registration));
 
-        ActorSelection dataChangeListenerPath = getContext()
-            .system().actorSelection(
-                registerChangeListener.getDataChangeListenerPath());
+        LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+                    listenerRegistration.path());
+
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+    }
+
+    private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                               NormalizedNode<?, ?>>> doChangeListenerRegistration(
+            RegisterChangeListener registerChangeListener) {
 
+        ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
+                registerChangeListener.getDataChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
         // it will not
-        dataChangeListenerPath
-            .tell(new EnableNotification(isLeader()), getSelf());
+        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
 
         // Now store a reference to the data change listener so it can be notified
         // at a later point if notifications should be enabled or disabled
         dataChangeListeners.add(dataChangeListenerPath);
 
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-            listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
-
-        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            registration = store.registerChangeListener(registerChangeListener.getPath(),
-                listener, registerChangeListener.getScope());
-        ActorRef listenerRegistration =
-            getContext().actorOf(
-                DataChangeListenerRegistration.props(registration));
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(
-                "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
-                , listenerRegistration.path().toString());
-        }
+        LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
-        getSender()
-            .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
-                getSelf());
+        return store.registerChangeListener(registerChangeListener.getPath(), listener,
+                registerChangeListener.getScope());
     }
 
     private boolean isMetricsCaptureEnabled(){
@@ -798,17 +807,28 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override protected void onStateChanged() {
+    @Override
+    protected void onStateChanged() {
+        boolean isLeader = isLeader();
         for (ActorSelection dataChangeListener : dataChangeListeners) {
-            dataChangeListener
-                .tell(new EnableNotification(isLeader()), getSelf());
+            dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
+        }
+
+        if(isLeader) {
+            for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
+                if(!reg.isClosed()) {
+                    reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
+                }
+            }
+
+            delayedListenerRegistrations.clear();
         }
 
         shardMBean.setRaftState(getRaftState().name());
         shardMBean.setCurrentTerm(getCurrentTerm());
 
         // If this actor is no longer the leader close all the transaction chains
-        if(!isLeader()){
+        if(!isLeader){
             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
                 if(LOG.isDebugEnabled()) {
                     LOG.debug(
@@ -862,4 +882,45 @@ public class Shard extends RaftActor {
     ShardStats getShardMBean() {
         return shardMBean;
     }
+
+    private static class DelayedListenerRegistration implements
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+
+        private volatile boolean closed;
+
+        private final RegisterChangeListener registerChangeListener;
+
+        private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                             NormalizedNode<?, ?>>> delegate;
+
+        DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
+            this.registerChangeListener = registerChangeListener;
+        }
+
+        void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                            NormalizedNode<?, ?>>> registration) {
+            this.delegate = registration;
+        }
+
+        boolean isClosed() {
+            return closed;
+        }
+
+        RegisterChangeListener getRegisterChangeListener() {
+            return registerChangeListener;
+        }
+
+        @Override
+        public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+            return delegate != null ? delegate.getInstance() : null;
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+            if(delegate != null) {
+                delegate.close();
+            }
+        }
+    }
 }
index c7c382c82ffa2fc6db82664995d404320674c447..e861165c6ba2592d7d5d14d98e5fb34591344d85 100644 (file)
@@ -533,8 +533,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     static class SchemaContextModules implements Serializable {
-        private static final long serialVersionUID = 1L;
-
         private final Set<String> modules;
 
         SchemaContextModules(Set<String> modules){
index d35c36fb0a2c883649ec7fcccc42ac981f20e256..1cc7ae8ad02f93dd9f80135a47c1b5dfdfc2b58c 100644 (file)
@@ -4,21 +4,18 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -29,8 +26,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
@@ -218,20 +213,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             DistributedDataStore dataStore =
                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
 
-            final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-                                                                changeList = Lists.newArrayList();
-            final CountDownLatch changeLatch = new CountDownLatch(3);
-            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                    new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
-                @Override
-                public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier,
-                                                               NormalizedNode<?, ?>> change) {
-                    changeList.add(change);
-                    changeLatch.countDown();
-                }
-            };
+            MockDataChangeListener listener = new MockDataChangeListener(3);
 
-            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            ListenerRegistration<MockDataChangeListener>
                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
                             DataChangeScope.SUBTREE);
 
@@ -248,17 +232,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             testWriteTransaction(dataStore, listPath,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
 
-            assertEquals("Change notifications complete", true,
-                    Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
-
-            assertTrue("Change 1 does not contain " + TestModel.TEST_PATH,
-                    changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH));
-
-            assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH,
-                    changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH));
-
-            assertTrue("Change 3 does not contain " + listPath,
-                    changeList.get(2).getCreatedData().containsKey(listPath));
+            listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
 
             listenerReg.close();
 
@@ -266,9 +240,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
 
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-
-            assertEquals("Received unexpected change after close", 3, changeList.size());
+            listener.expectNoMoreChanges("Received unexpected change after close");
 
             cleanup(dataStore);
         }};
index f183bb319ee04df852134264ec66a475a53c13ff..03a18ea6c38c39600f2c4bd3a49b848ace89a09d 100644 (file)
@@ -1,12 +1,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -29,7 +30,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
@@ -43,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
@@ -50,6 +51,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -78,6 +82,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -86,6 +91,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -97,17 +103,14 @@ public class ShardTest extends AbstractActorTest {
 
     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
 
-    private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
-            .shardName("inventory").type("config").build();
-
     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
 
-    private static String shardName() {
-        return "shard" + NEXT_SHARD_NUM.getAndIncrement();
-    }
+    private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
 
     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
-            shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).build();
+            shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
+            shardHeartbeatIntervalInMillis(100).build();
 
     @Before
     public void setUp() {
@@ -124,40 +127,149 @@ public class ShardTest extends AbstractActorTest {
     }
 
     private Props newShardProps() {
-        return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+        return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                 dataStoreContext, SCHEMA_CONTEXT);
     }
 
     @Test
-    public void testOnReceiveRegisterListener() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
+    public void testRegisterChangeListener() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps(),  "testRegisterChangeListener");
 
-            subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
+            waitUntilLeader(shard);
 
-            subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+            shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
-            EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
-            assertEquals("isEnabled", false, enable.isEnabled());
+            MockDataChangeListener listener = new MockDataChangeListener(1);
+            ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                    "testRegisterChangeListener-DataChangeListener");
+
+            shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+                    dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
-            assertTrue(reply.getListenerRegistrationPath().toString().matches(
+            String replyPath = reply.getListenerRegistrationPath().toString();
+            assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
+
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents(path);
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
+        // This test tests the timing window in which a change listener is registered before the
+        // shard becomes the leader. We verify that the listener is registered and notified of the
+        // existing data when the shard becomes the leader.
+        new ShardTestKit(getSystem()) {{
+            // For this test, we want to send the RegisterChangeListener message after the shard
+            // has recovered from persistence and before it becomes the leader. So we subclass
+            // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
+            // we know that the shard has been initialized to a follower and has started the
+            // election process. The following 2 CountDownLatches are used to coordinate the
+            // ElectionTimeout with the sending of the RegisterChangeListener message.
+            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+            Creator<Shard> creator = new Creator<Shard>() {
+                boolean firstElectionTimeout = true;
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                            dataStoreContext, SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) {
+                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
+                                // Got the first ElectionTimeout. We don't forward it to the
+                                // base Shard yet until we've sent the RegisterChangeListener
+                                // message. So we signal the onFirstElectionTimeout latch to tell
+                                // the main thread to send the RegisterChangeListener message and
+                                // start a thread to wait on the onChangeListenerRegistered latch,
+                                // which the main thread signals after it has sent the message.
+                                // After the onChangeListenerRegistered is triggered, we send the
+                                // original ElectionTimeout message to proceed with the election.
+                                firstElectionTimeout = false;
+                                final ActorRef self = getSelf();
+                                new Thread() {
+                                    @Override
+                                    public void run() {
+                                        Uninterruptibles.awaitUninterruptibly(
+                                                onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                        self.tell(message, self);
+                                    }
+                                }.start();
+
+                                onFirstElectionTimeout.countDown();
+                            } else {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
+
+            MockDataChangeListener listener = new MockDataChangeListener(1);
+            ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                    "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
+
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(creator)),
+                    "testRegisterChangeListenerWhenNotLeaderInitially");
+
+            // Write initial data into the in-memory store.
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            // Wait until the shard receives the first ElectionTimeout message.
+            assertEquals("Got first ElectionTimeout", true,
+                    onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+            // Now send the RegisterChangeListener and wait for the reply.
+            shard.tell(new RegisterChangeListener(path, dclActor.path(),
+                    AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+
+            RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            // Sanity check - verify the shard is not the leader yet.
+            shard.tell(new FindLeader(), getRef());
+            FindLeaderReply findLeadeReply =
+                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+            // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
+            // with the election process.
+            onChangeListenerRegistered.countDown();
+
+            // Wait for the shard to become the leader and notify our listener with the existing
+            // data in the store.
+            listener.waitForChangeEvents(path);
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testCreateTransaction(){
         new ShardTestKit(getSystem()) {{
-            ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
+            ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
 
-            waitUntilLeader(subject);
+            waitUntilLeader(shard);
 
-            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            subject.tell(new CreateTransaction("txn-1",
+            shard.tell(new CreateTransaction("txn-1",
                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
@@ -166,18 +278,19 @@ public class ShardTest extends AbstractActorTest {
             String path = reply.getTransactionActorPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
-            expectNoMsg();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testCreateTransactionOnChain(){
         new ShardTestKit(getSystem()) {{
-            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
+            final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
 
-            waitUntilLeader(subject);
+            waitUntilLeader(shard);
 
-            subject.tell(new CreateTransaction("txn-1",
+            shard.tell(new CreateTransaction("txn-1",
                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
                     getRef());
 
@@ -187,47 +300,69 @@ public class ShardTest extends AbstractActorTest {
             String path = reply.getTransactionActorPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
-            expectNoMsg();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testPeerAddressResolved(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
+        new ShardTestKit(getSystem()) {{
+            final CountDownLatch recoveryComplete = new CountDownLatch(1);
+            class TestShard extends Shard {
+                TestShard() {
+                    super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+                            dataStoreContext, SCHEMA_CONTEXT);
+                }
 
-            Props props = Shard.props(identifier,
-                    Collections.<ShardIdentifier, String>singletonMap(identifier, null),
-                    dataStoreContext, SCHEMA_CONTEXT);
-            final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
+                Map<String, String> getPeerAddresses() {
+                    return getRaftActorContext().getPeerAddresses();
+                }
 
-            new Within(duration("3 seconds")) {
                 @Override
-                protected void run() {
+                protected void onRecoveryComplete() {
+                    try {
+                        super.onRecoveryComplete();
+                    } finally {
+                        recoveryComplete.countDown();
+                    }
+                }
+            }
 
-                    subject.tell(
-                        new PeerAddressResolved(identifier, "akka://foobar"),
-                        getRef());
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(new Creator<Shard>() {
+                        @Override
+                        public TestShard create() throws Exception {
+                            return new TestShard();
+                        }
+                    })), "testPeerAddressResolved");
 
-                    expectNoMsg();
-                }
-            };
+            //waitUntilLeader(shard);
+            assertEquals("Recovery complete", true,
+                    Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+
+            String address = "akka://foobar";
+            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+
+            assertEquals("getPeerAddresses", address,
+                    ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testApplySnapshot() throws ExecutionException, InterruptedException {
-        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+                "testApplySnapshot");
 
         NormalizedNodeToNodeCodec codec =
             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        writeToStore(ref, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
-        NormalizedNode<?,?> expected = readStore(ref, root);
+        NormalizedNode<?,?> expected = readStore(shard, root);
 
         NormalizedNodeMessages.Container encode = codec.encode(expected);
 
@@ -235,17 +370,19 @@ public class ShardTest extends AbstractActorTest {
                 encode.getNormalizedNode().toByteString().toByteArray(),
                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
 
-        ref.underlyingActor().onReceiveCommand(applySnapshot);
+        shard.underlyingActor().onReceiveCommand(applySnapshot);
 
-        NormalizedNode<?,?> actual = readStore(ref, root);
+        NormalizedNode<?,?> actual = readStore(shard, root);
 
         assertEquals(expected, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
     @Test
     public void testApplyState() throws Exception {
 
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
 
         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -259,6 +396,8 @@ public class ShardTest extends AbstractActorTest {
 
         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
         assertEquals("Applied state", node, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
     @SuppressWarnings("serial")
@@ -279,7 +418,7 @@ public class ShardTest extends AbstractActorTest {
         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
 
-        InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
+        InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
                         root).
                                 getNormalizedNode().toByteString().toByteArray(),
@@ -287,7 +426,7 @@ public class ShardTest extends AbstractActorTest {
 
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                           SCHEMA_CONTEXT))));
@@ -301,11 +440,11 @@ public class ShardTest extends AbstractActorTest {
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
                     SCHEMA_CONTEXT);
-            InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
                     newPayload(mod)));
         }
 
-        InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
                 new ApplyLogEntries(nListEntries));
 
         // Create the actor and wait for recovery complete.
@@ -315,7 +454,7 @@ public class ShardTest extends AbstractActorTest {
         Creator<Shard> creator = new Creator<Shard>() {
             @Override
             public Shard create() throws Exception {
-                return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                         dataStoreContext, SCHEMA_CONTEXT) {
                     @Override
                     protected void onRecoveryComplete() {
@@ -363,6 +502,8 @@ public class ShardTest extends AbstractActorTest {
                 shard.underlyingActor().getShardMBean().getCommitIndex());
         assertEquals("Last applied", nListEntries,
                 shard.underlyingActor().getShardMBean().getLastApplied());
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
     private CompositeModificationPayload newPayload(Modification... mods) {
@@ -433,7 +574,8 @@ public class ShardTest extends AbstractActorTest {
         System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testConcurrentThreePhaseCommits");
 
             waitUntilLeader(shard);
 
@@ -518,7 +660,6 @@ public class ShardTest extends AbstractActorTest {
                 @Override
                 public void onComplete(Throwable error, Object resp) {
                     if(error != null) {
-                        System.out.println(new java.util.Date()+": "+getClass().getSimpleName() + " failure: "+error);
                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
                     } else {
                         try {
@@ -606,7 +747,17 @@ public class ShardTest extends AbstractActorTest {
             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
 
+            for(int i = 0; i < 20 * 5; i++) {
+                long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
+                if(lastLogIndex == 2) {
+                    break;
+                }
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+
             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -614,7 +765,8 @@ public class ShardTest extends AbstractActorTest {
     public void testCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
@@ -681,6 +833,8 @@ public class ShardTest extends AbstractActorTest {
             inOrder.verify(cohort1).preCommit();
             inOrder.verify(cohort1).commit();
             inOrder.verify(cohort2).canCommit();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -688,7 +842,8 @@ public class ShardTest extends AbstractActorTest {
     public void testPreCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testPreCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
@@ -722,6 +877,8 @@ public class ShardTest extends AbstractActorTest {
             InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
             inOrder.verify(cohort).preCommit();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -729,7 +886,8 @@ public class ShardTest extends AbstractActorTest {
     public void testCanCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCanCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
@@ -750,6 +908,8 @@ public class ShardTest extends AbstractActorTest {
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -758,7 +918,8 @@ public class ShardTest extends AbstractActorTest {
         System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testAbortBeforeFinishCommit");
 
             waitUntilLeader(shard);
 
@@ -810,6 +971,8 @@ public class ShardTest extends AbstractActorTest {
 
             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -819,7 +982,8 @@ public class ShardTest extends AbstractActorTest {
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitTimeout");
 
             waitUntilLeader(shard);
 
@@ -877,6 +1041,8 @@ public class ShardTest extends AbstractActorTest {
 
             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
             assertNotNull(listNodePath + " not found", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -886,7 +1052,8 @@ public class ShardTest extends AbstractActorTest {
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitQueueCapacityExceeded");
 
             waitUntilLeader(shard);
 
@@ -935,6 +1102,8 @@ public class ShardTest extends AbstractActorTest {
 
             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -942,10 +1111,13 @@ public class ShardTest extends AbstractActorTest {
     public void testCanCommitBeforeReadyFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCanCommitBeforeReadyFailure");
 
             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -953,7 +1125,8 @@ public class ShardTest extends AbstractActorTest {
     public void testAbortTransaction() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testAbortTransaction");
 
             waitUntilLeader(shard);
 
@@ -1016,6 +1189,8 @@ public class ShardTest extends AbstractActorTest {
             InOrder inOrder = inOrder(cohort1, cohort2);
             inOrder.verify(cohort1).canCommit();
             inOrder.verify(cohort2).canCommit();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -1026,7 +1201,7 @@ public class ShardTest extends AbstractActorTest {
             Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                             dataStoreContext, SCHEMA_CONTEXT) {
                         @Override
                         public void saveSnapshot(Object snapshot) {
@@ -1050,6 +1225,8 @@ public class ShardTest extends AbstractActorTest {
             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
index 9a0e8f9e18416d104f059e316577d377e5c718f2..d08258a2a026ec890ddb736f1456c6682e2a3c5c 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.junit.Assert;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
@@ -15,6 +16,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
@@ -45,17 +47,21 @@ class ShardTestKit extends JavaTestKit {
     }
 
     protected void waitUntilLeader(ActorRef shard) {
+        FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
         for(int i = 0; i < 20 * 5; i++) {
-            Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(5, TimeUnit.SECONDS));
+            Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
             try {
-                FindLeaderReply resp = (FindLeaderReply)Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
                 if(resp.getLeaderActor() != null) {
                     return;
                 }
-            } catch (Exception e) {
+            } catch(TimeoutException e) {
+            } catch(Exception e) {
+                System.err.println("FindLeader threw ex");
                 e.printStackTrace();
             }
 
+
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
         }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java
new file mode 100644 (file)
index 0000000..f2f49d1
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * A mock DataChangeListener implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class MockDataChangeListener implements
+                         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+
+    private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                                                               changeList = Lists.newArrayList();
+    private final CountDownLatch changeLatch;
+    private final int expChangeEventCount;
+
+    public MockDataChangeListener(int expChangeEventCount) {
+        changeLatch = new CountDownLatch(expChangeEventCount);
+        this.expChangeEventCount = expChangeEventCount;
+    }
+
+    @Override
+    public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+        changeList.add(change);
+        changeLatch.countDown();
+    }
+
+    public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
+        assertEquals("Change notifications complete", true,
+                Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+
+        for(int i = 0; i < expPaths.length; i++) {
+            assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),
+                    changeList.get(i).getCreatedData().containsKey(expPaths[i]));
+        }
+    }
+
+    public void expectNoMoreChanges(String assertMsg) {
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        assertEquals(assertMsg, expChangeEventCount, changeList.size());
+    }
+}
index b7518e094d0e61594a391c2fc4a78b181294e00a..332d375282ce99e3fac16b5a3e10616886a56bab 100644 (file)
@@ -141,6 +141,8 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
             {
                 darknessFactor.set( darkness );
             }
+
+            LOG.info("onDataChanged - new Toaster config: {}", toaster);
         }
     }