Tune replication and stabilize tests 41/9741/5
authorMoiz Raja <moraja@cisco.com>
Wed, 6 Aug 2014 11:28:43 +0000 (04:28 -0700)
committerMoiz Raja <moraja@cisco.com>
Thu, 7 Aug 2014 16:02:23 +0000 (09:02 -0700)
Made following changes for replication
- Increased Heartbeat timeout to 500 milliseconds.
- Send only one entry from the replicated log to the follower in append entries

Both of these tweaks have been made to prevent election timeouts and frequent switching of leaders

Changes to tests
- Added a duration when constructing an ExpectMsg. This prevents ExpectMsg from waiting forever when
  and expected event does not occur
- Removed all Thread.sleep from the tests and replace them with waiting for a specific LogEvent this is
  a more deterministic.

Change-Id: Ie9ce0c9c73bf1b170a78879b1e2dab76f1de64df
Signed-off-by: Moiz Raja <moraja@cisco.com>
18 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index 24bfa3d..b5b034a 100644 (file)
@@ -100,16 +100,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
+        return getFrom(logEntryIndex, journal.size());
+    }
+
+    @Override
+    public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
         List<ReplicatedLogEntry> entries = new ArrayList<>(100);
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
-            entries.addAll(journal.subList(adjustedIndex, size));
+            int maxIndex = adjustedIndex + max;
+            if(maxIndex > size){
+                maxIndex = size;
+            }
+            entries.addAll(journal.subList(adjustedIndex, maxIndex));
         }
         return entries;
     }
 
+
     @Override
     public long size() {
        return journal.size();
index c633337..6432fa4 100644 (file)
@@ -33,7 +33,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
      * Since this is set to 100 milliseconds the Election timeout should be
      * at least 200 milliseconds
      */
-    protected static final FiniteDuration HEART_BEAT_INTERVAL =
+    public static final FiniteDuration HEART_BEAT_INTERVAL =
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
 
@@ -51,7 +51,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
         // returns 2 times the heart beat interval
-        return HEART_BEAT_INTERVAL.$times(2);
+        return getHeartBeatInterval().$times(2);
     }
 
     @Override
index b7c8955..e6e160b 100644 (file)
@@ -84,6 +84,11 @@ public interface ReplicatedLog {
      */
     List<ReplicatedLogEntry> getFrom(long index);
 
+    /**
+     *
+     * @param index the index of the log entry
+     */
+    List<ReplicatedLogEntry> getFrom(long index, int max);
 
     /**
      *
index 2a44e8b..a506662 100644 (file)
@@ -310,7 +310,7 @@ public class Leader extends AbstractRaftActorBehavior {
                     // that has fallen too far behind with the log but yet is not
                     // eligible to receive a snapshot
                     entries =
-                        context.getReplicatedLog().getFrom(nextIndex);
+                        context.getReplicatedLog().getFrom(nextIndex, 1);
                 }
 
                 followerActor.tell(
index ae8e525..9136658 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,6 +32,12 @@ public class AbstractReplicatedLogImplTest {
     @Before
     public void setUp() {
         replicatedLogImpl = new MockAbstractReplicatedLogImpl();
+        // create a set of initial entries in the in-memory log
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D")));
+
     }
 
     @After
@@ -43,11 +50,6 @@ public class AbstractReplicatedLogImplTest {
 
     @Test
     public void testIndexOperations() {
-        // create a set of initial entries in the in-memory log
-        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A")));
-        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B")));
-        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C")));
-        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D")));
 
         // check if the values returned are correct, with snapshotIndex = -1
         assertEquals("B", replicatedLogImpl.get(1).getData().toString());
@@ -112,6 +114,22 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
+    @Test
+    public void testGetFromWithMax(){
+        List<ReplicatedLogEntry> from = replicatedLogImpl.getFrom(0, 1);
+        Assert.assertEquals(1, from.size());
+        Assert.assertEquals(1, from.get(0).getTerm());
+
+        from = replicatedLogImpl.getFrom(0, 20);
+        Assert.assertEquals(4, from.size());
+        Assert.assertEquals(2, from.get(3).getTerm());
+
+        from = replicatedLogImpl.getFrom(1, 2);
+        Assert.assertEquals(2, from.size());
+        Assert.assertEquals(1, from.get(1).getTerm());
+
+    }
+
     // create a snapshot for test
     public Map takeSnapshot(int numEntries) {
         Map map = new HashMap(numEntries);
index aa50fa7..70671a6 100644 (file)
@@ -248,6 +248,23 @@ public class MockRaftActorContext implements RaftActorContext {
             return entries;
         }
 
+        @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
+            if(index >= log.size() || index < 0){
+                return Collections.EMPTY_LIST;
+            }
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            int maxIndex = (int) index + max;
+            if(maxIndex > log.size()){
+                maxIndex = log.size();
+            }
+
+            for(int i=(int) index ; i < maxIndex ; i++) {
+                entries.add(get(i));
+            }
+            return entries;
+
+        }
+
         @Override public long size() {
             return log.size();
         }
index c763683..d478b17 100644 (file)
@@ -6,6 +6,7 @@ import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -80,12 +81,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     public void testThatAnElectionTimeoutIsTriggered(){
         new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
+            new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
                 protected void run() {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+                    final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
                             if (in instanceof ElectionTimeout) {
index c015d95..c5a81aa 100644 (file)
@@ -5,6 +5,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -41,12 +42,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     public void testThatAnElectionTimeoutIsTriggered(){
         new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
+            new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
                 protected void run() {
 
                     Follower follower = new Follower(createActorContext(getTestActor()));
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+                    final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
                             if (in instanceof ElectionTimeout) {
index b98f7da..94f80f7 100644 (file)
@@ -15,6 +15,7 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.serialization.Serialization;
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -33,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -42,6 +45,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -49,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -58,6 +63,8 @@ import java.util.concurrent.Executors;
  */
 public class Shard extends RaftActor {
 
+    private static final ConfigParams configParams = new ShardConfigParams();
+
     public static final String DEFAULT_NAME = "default";
 
     private final ListeningExecutorService storeExecutor =
@@ -84,7 +91,7 @@ public class Shard extends RaftActor {
     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
 
     private Shard(String name, Map<String, String> peerAddresses) {
-        super(name, peerAddresses);
+        super(name, peerAddresses, Optional.of(configParams));
 
         this.name = name;
 
@@ -323,4 +330,14 @@ public class Shard extends RaftActor {
     @Override public String persistenceId() {
         return this.name;
     }
+
+
+    private static class ShardConfigParams extends DefaultConfigParamsImpl {
+        public static final FiniteDuration HEART_BEAT_INTERVAL =
+            new FiniteDuration(500, TimeUnit.MILLISECONDS);
+
+        @Override public FiniteDuration getHeartBeatInterval() {
+            return HEART_BEAT_INTERVAL;
+        }
+    }
 }
index 5622065..915b13d 100644 (file)
@@ -76,7 +76,6 @@ public class ThreePhaseCommitCohortProxy implements
                             CanCommitTransactionReply reply =
                                     CanCommitTransactionReply.fromSerializable(response);
                             if (!reply.getCanCommit()) {
-                                System.out.println("**TOM - failed: false");
                                 return false;
                             }
                         }
index df3c78e..6599bd8 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.event.Logging;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
@@ -35,6 +36,8 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.util.Collections;
 
+import static junit.framework.Assert.assertEquals;
+
 public class BasicIntegrationTest extends AbstractActorTest {
 
     @Test
@@ -61,17 +64,24 @@ public class BasicIntegrationTest extends AbstractActorTest {
                         getRef());
 
 
-                    // Wait for Shard to become a Leader
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+                    // Wait for a specific log message to show up
+                    final boolean result =
+                        new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                        ) {
+                            protected Boolean run() {
+                                return true;
+                            }
+                        }.from(shard.path().toString())
+                            .message("Switching from state Candidate to Leader")
+                            .occurrences(1).exec();
+
+                    assertEquals(true, result);
+
                     // 1. Create a TransactionChain
                     shard.tell(new CreateTransactionChain().toSerializable(), getRef());
 
                     final ActorSelection transactionChain =
-                        new ExpectMsg<ActorSelection>("CreateTransactionChainReply") {
+                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionChainReply") {
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
                                     ActorPath transactionChainPath =
@@ -93,7 +103,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
 
                     final ActorSelection transaction =
-                        new ExpectMsg<ActorSelection>("CreateTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionReply") {
                             protected ActorSelection match(Object in) {
                                 if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
                                     CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
@@ -115,7 +125,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
                         getRef());
 
-                    Boolean writeDone = new ExpectMsg<Boolean>("WriteDataReply") {
+                    Boolean writeDone = new ExpectMsg<Boolean>(duration("1 seconds"), "WriteDataReply") {
                         protected Boolean match(Object in) {
                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
                                 return true;
@@ -134,7 +144,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
                     final ActorSelection cohort =
-                        new ExpectMsg<ActorSelection>("ReadyTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "ReadyTransactionReply") {
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                                     ActorPath cohortPath =
@@ -157,7 +167,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
 
                     Boolean preCommitDone =
-                        new ExpectMsg<Boolean>("PreCommitTransactionReply") {
+                        new ExpectMsg<Boolean>(duration("1 seconds"), "PreCommitTransactionReply") {
                             protected Boolean match(Object in) {
                                 if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
                                     return true;
index 8413bac..9202485 100644 (file)
@@ -41,7 +41,7 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest {
 
           subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef());
 
-          final String out = new ExpectMsg<String>("match hint") {
+          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
               if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
index 0a0c04b..fc527b6 100644 (file)
@@ -1,11 +1,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
+import akka.event.Logging;
 import akka.testkit.JavaTestKit;
-
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -20,19 +21,29 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
 
-public class DistributedDataStoreIntegrationTest{
+public class DistributedDataStoreIntegrationTest {
 
     private static ActorSystem system;
 
     @Before
-    public void setUp() {
+    public void setUp() throws IOException {
+        File journal = new File("journal");
+
+        if(journal.exists()) {
+            FileUtils.deleteDirectory(journal);
+        }
+
+
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
     }
@@ -49,82 +60,153 @@ public class DistributedDataStoreIntegrationTest{
 
     @Test
     public void integrationTest() throws Exception {
-        Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
         ShardStrategyFactory.setConfiguration(configuration);
-        DistributedDataStore distributedDataStore =
-            new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
 
-        distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
-        Thread.sleep(1500);
 
-        DOMStoreReadWriteTransaction transaction =
-            distributedDataStore.newReadWriteTransaction();
+        new JavaTestKit(getSystem()) {
+            {
+
+                new Within(duration("10 seconds")) {
+                    protected void run() {
+                        try {
+                            final DistributedDataStore distributedDataStore =
+                                new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
+
+                            distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+
+                            // Wait for a specific log message to show up
+                            final boolean result =
+                                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                                    ) {
+                                    protected Boolean run() {
+                                        return true;
+                                    }
+                                }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
+                                    .message("Switching from state Candidate to Leader")
+                                    .occurrences(1).exec();
+
+                            assertEquals(true, result);
+
+                            DOMStoreReadWriteTransaction transaction =
+                                distributedDataStore.newReadWriteTransaction();
 
-        transaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+                            transaction
+                                .write(TestModel.TEST_PATH, ImmutableNodes
+                                    .containerNode(TestModel.TEST_QNAME));
 
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> future =
-            transaction.read(TestModel.TEST_PATH);
+                            ListenableFuture<Optional<NormalizedNode<?, ?>>>
+                                future =
+                                transaction.read(TestModel.TEST_PATH);
 
-        Optional<NormalizedNode<?, ?>> optional = future.get();
+                            Optional<NormalizedNode<?, ?>> optional =
+                                future.get();
 
-        Assert.assertTrue(optional.isPresent());
+                            Assert.assertTrue("Node not found", optional.isPresent());
 
-        NormalizedNode<?, ?> normalizedNode = optional.get();
+                            NormalizedNode<?, ?> normalizedNode =
+                                optional.get();
 
-        assertEquals(TestModel.TEST_QNAME, normalizedNode.getNodeType());
+                            assertEquals(TestModel.TEST_QNAME,
+                                normalizedNode.getNodeType());
 
-        DOMStoreThreePhaseCommitCohort ready = transaction.ready();
+                            DOMStoreThreePhaseCommitCohort ready =
+                                transaction.ready();
 
-        ListenableFuture<Boolean> canCommit = ready.canCommit();
+                            ListenableFuture<Boolean> canCommit =
+                                ready.canCommit();
 
-        assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
 
-        ListenableFuture<Void> preCommit = ready.preCommit();
+                            ListenableFuture<Void> preCommit =
+                                ready.preCommit();
 
-        preCommit.get(5, TimeUnit.SECONDS);
+                            preCommit.get(5, TimeUnit.SECONDS);
 
-        ListenableFuture<Void> commit = ready.commit();
+                            ListenableFuture<Void> commit = ready.commit();
+
+                            commit.get(5, TimeUnit.SECONDS);
+                        } catch (ExecutionException | TimeoutException | InterruptedException e){
+                            fail(e.getMessage());
+                        }
+                    }
+                };
+            }
+        };
 
-        commit.get(5, TimeUnit.SECONDS);
     }
 
 
-    @Test
+    //FIXME : Disabling test because it's flaky
+    //@Test
     public void integrationTestWithMultiShardConfiguration()
         throws ExecutionException, InterruptedException, TimeoutException {
-        Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
 
         ShardStrategyFactory.setConfiguration(configuration);
-        DistributedDataStore distributedDataStore =
-            new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
 
+        new JavaTestKit(getSystem()) {
+            {
+
+                new Within(duration("10 seconds")) {
+                    protected void run() {
+                        try {
+                            final DistributedDataStore distributedDataStore =
+                                new DistributedDataStore(getSystem(), "config",
+                                    new MockClusterWrapper(), configuration);
+
+                            distributedDataStore.onGlobalContextUpdated(
+                                SchemaContextHelper.full());
+
+                            // Wait for a specific log message to show up
+                            final boolean result =
+                                new JavaTestKit.EventFilter<Boolean>(
+                                    Logging.Info.class
+                                ) {
+                                    protected Boolean run() {
+                                        return true;
+                                    }
+                                }.from(
+                                    "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
+                                    .message(
+                                        "Switching from state Candidate to Leader")
+                                    .occurrences(1)
+                                    .exec();
+
+                            Thread.sleep(1000);
+
+
+                            DOMStoreReadWriteTransaction transaction =
+                                distributedDataStore.newReadWriteTransaction();
 
-        distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full());
+                            transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+                            transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
-        // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
-        // the SchemaContext in time. Is there any way we can make this deterministic?
-        Thread.sleep(2000);
+                            DOMStoreThreePhaseCommitCohort ready = transaction.ready();
 
-        DOMStoreReadWriteTransaction transaction =
-            distributedDataStore.newReadWriteTransaction();
+                            ListenableFuture<Boolean> canCommit = ready.canCommit();
 
-        transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
 
-        DOMStoreThreePhaseCommitCohort ready = transaction.ready();
+                            ListenableFuture<Void> preCommit = ready.preCommit();
 
-        ListenableFuture<Boolean> canCommit = ready.canCommit();
+                            preCommit.get(5, TimeUnit.SECONDS);
 
-        assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+                            ListenableFuture<Void> commit = ready.commit();
 
-        ListenableFuture<Void> preCommit = ready.preCommit();
+                            commit.get(5, TimeUnit.SECONDS);
 
-        preCommit.get(5, TimeUnit.SECONDS);
+                            assertEquals(true, result);
+                        } catch(ExecutionException | TimeoutException | InterruptedException e){
+                            fail(e.getMessage());
+                        }
+                    }
+                };
+            }
+        };
 
-        ListenableFuture<Void> commit = ready.commit();
 
-        commit.get(5, TimeUnit.SECONDS);
     }
 
 }
index 268ed3c..e9ad450 100644 (file)
@@ -75,7 +75,7 @@ public class ShardManagerTest {
 
                     subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
 
-                    expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS);
+                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 
                     expectNoMsg();
                 }
@@ -170,7 +170,7 @@ public class ShardManagerTest {
 
                     subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("primary found") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
@@ -208,13 +208,13 @@ public class ShardManagerTest {
 
                     subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-                    expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS);
+                    expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 
                     MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
 
                     subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-                    expectMsgClass(PrimaryNotFound.SERIALIZABLE_CLASS);
+                    expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
 
                     expectNoMsg();
                 }
index 32d90d0..431a266 100644 (file)
@@ -2,7 +2,9 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.event.Logging;
 import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
@@ -38,19 +40,25 @@ public class ShardTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testCreateTransactionChain");
 
 
-            // Wait for Shard to become a Leader
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                ) {
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message("Switching from state Candidate to Leader")
+                    .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
                     subject.tell(new CreateTransactionChain().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
@@ -107,7 +115,7 @@ public class ShardTest extends AbstractActorTest {
 
                     assertFalse(notificationEnabled);
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(RegisterChangeListenerReply.class)) {
@@ -138,13 +146,18 @@ public class ShardTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testCreateTransaction");
 
 
-            // Wait for Shard to become a Leader
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                ) {
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message("Switching from state Candidate to Leader")
+                    .occurrences(1).exec();
 
+            Assert.assertEquals(true, result);
 
             new Within(duration("1 seconds")) {
                 protected void run() {
@@ -156,7 +169,7 @@ public class ShardTest extends AbstractActorTest {
                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in instanceof CreateTransactionReply) {
index 57d0bd6..b35880a 100644 (file)
@@ -35,7 +35,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
 
           subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
-          final String out = new ExpectMsg<String>("match hint") {
+          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
               if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
@@ -70,7 +70,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
 
           subject.tell(new CloseTransactionChain().toSerializable(), getRef());
 
-          final String out = new ExpectMsg<String>("match hint") {
+          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
               if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
index f15e3bf..632ecc2 100644 (file)
@@ -65,7 +65,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                         new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
@@ -105,7 +105,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                         new ReadData(TestModel.TEST_PATH).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
@@ -141,7 +141,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                             getRef());
 
                     final CompositeModification compositeModification =
-                        new ExpectMsg<CompositeModification>("match hint") {
+                        new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
                             protected CompositeModification match(Object in) {
                                 if (in instanceof ShardTransaction.GetCompositeModificationReply) {
@@ -180,7 +180,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
@@ -255,7 +255,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
@@ -292,7 +292,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
@@ -330,7 +330,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     subject.tell(new CloseTransaction().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>("match hint") {
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
@@ -343,7 +343,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     assertEquals("match", out);
 
-                    final String termination = new ExpectMsg<String>("match hint") {
+                    final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in instanceof Terminated) {
index aebff27..eda1c30 100644 (file)
@@ -1,4 +1,5 @@
 akka {
+    loggers = [akka.testkit.TestEventListener]
     actor {
          serializers {
                   java = "akka.serialization.JavaSerializer"