From: Moiz Raja Date: Wed, 6 Aug 2014 11:28:43 +0000 (-0700) Subject: Tune replication and stabilize tests X-Git-Tag: release/helium~331 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=4caeacba93677c05dd79bc4cb7058f021fa1e88b Tune replication and stabilize tests Made following changes for replication - Increased Heartbeat timeout to 500 milliseconds. - Send only one entry from the replicated log to the follower in append entries Both of these tweaks have been made to prevent election timeouts and frequent switching of leaders Changes to tests - Added a duration when constructing an ExpectMsg. This prevents ExpectMsg from waiting forever when and expected event does not occur - Removed all Thread.sleep from the tests and replace them with waiting for a specific LogEvent this is a more deterministic. Change-Id: Ie9ce0c9c73bf1b170a78879b1e2dab76f1de64df Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index 24bfa3de21..b5b034afb9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -100,16 +100,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public List getFrom(long logEntryIndex) { + return getFrom(logEntryIndex, journal.size()); + } + + @Override + public List getFrom(long logEntryIndex, int max) { int adjustedIndex = adjustedIndex(logEntryIndex); int size = journal.size(); List entries = new ArrayList<>(100); if (adjustedIndex >= 0 && adjustedIndex < size) { // physical index should be less than list size and >= 0 - entries.addAll(journal.subList(adjustedIndex, size)); + int maxIndex = adjustedIndex + max; + if(maxIndex > size){ + maxIndex = size; + } + entries.addAll(journal.subList(adjustedIndex, maxIndex)); } return entries; } + @Override public long size() { return journal.size(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index c633337226..6432fa4811 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -33,7 +33,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { * Since this is set to 100 milliseconds the Election timeout should be * at least 200 milliseconds */ - protected static final FiniteDuration HEART_BEAT_INTERVAL = + public static final FiniteDuration HEART_BEAT_INTERVAL = new FiniteDuration(100, TimeUnit.MILLISECONDS); @@ -51,7 +51,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { @Override public FiniteDuration getElectionTimeOutInterval() { // returns 2 times the heart beat interval - return HEART_BEAT_INTERVAL.$times(2); + return getHeartBeatInterval().$times(2); } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index b7c8955aad..e6e160bc02 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -84,6 +84,11 @@ public interface ReplicatedLog { */ List getFrom(long index); + /** + * + * @param index the index of the log entry + */ + List getFrom(long index, int max); /** * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 2a44e8b7a5..a50666233c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -310,7 +310,7 @@ public class Leader extends AbstractRaftActorBehavior { // that has fallen too far behind with the log but yet is not // eligible to receive a snapshot entries = - context.getReplicatedLog().getFrom(nextIndex); + context.getReplicatedLog().getFrom(nextIndex, 1); } followerActor.tell( diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index ae8e525233..913665861d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import junit.framework.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -31,6 +32,12 @@ public class AbstractReplicatedLogImplTest { @Before public void setUp() { replicatedLogImpl = new MockAbstractReplicatedLogImpl(); + // create a set of initial entries in the in-memory log + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D"))); + } @After @@ -43,11 +50,6 @@ public class AbstractReplicatedLogImplTest { @Test public void testIndexOperations() { - // create a set of initial entries in the in-memory log - replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A"))); - replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B"))); - replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C"))); - replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D"))); // check if the values returned are correct, with snapshotIndex = -1 assertEquals("B", replicatedLogImpl.get(1).getData().toString()); @@ -112,6 +114,22 @@ public class AbstractReplicatedLogImplTest { } + @Test + public void testGetFromWithMax(){ + List from = replicatedLogImpl.getFrom(0, 1); + Assert.assertEquals(1, from.size()); + Assert.assertEquals(1, from.get(0).getTerm()); + + from = replicatedLogImpl.getFrom(0, 20); + Assert.assertEquals(4, from.size()); + Assert.assertEquals(2, from.get(3).getTerm()); + + from = replicatedLogImpl.getFrom(1, 2); + Assert.assertEquals(2, from.size()); + Assert.assertEquals(1, from.get(1).getTerm()); + + } + // create a snapshot for test public Map takeSnapshot(int numEntries) { Map map = new HashMap(numEntries); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index aa50fa7442..70671a6a21 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -248,6 +248,23 @@ public class MockRaftActorContext implements RaftActorContext { return entries; } + @Override public List getFrom(long index, int max) { + if(index >= log.size() || index < 0){ + return Collections.EMPTY_LIST; + } + List entries = new ArrayList<>(); + int maxIndex = (int) index + max; + if(maxIndex > log.size()){ + maxIndex = log.size(); + } + + for(int i=(int) index ; i < maxIndex ; i++) { + entries.add(get(i)); + } + return entries; + + } + @Override public long size() { return log.size(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index c763683705..d478b17555 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -6,6 +6,7 @@ import akka.testkit.JavaTestKit; import junit.framework.Assert; import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -80,12 +81,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { public void testThatAnElectionTimeoutIsTriggered(){ new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { + new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { protected void run() { Candidate candidate = new Candidate(createActorContext(getTestActor())); - final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { + final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { // do not put code outside this method, will run afterwards protected Boolean match(Object in) { if (in instanceof ElectionTimeout) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index c015d950c4..c5a81aa1c9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -5,6 +5,7 @@ import akka.actor.Props; import akka.testkit.JavaTestKit; import junit.framework.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -41,12 +42,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { public void testThatAnElectionTimeoutIsTriggered(){ new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { + new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { protected void run() { Follower follower = new Follower(createActorContext(getTestActor())); - final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { + final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { // do not put code outside this method, will run afterwards protected Boolean match(Object in) { if (in instanceof ElectionTimeout) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index b98f7da0fa..94f80f70e9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -15,6 +15,7 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.serialization.Serialization; +import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -33,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.raft.ConfigParams; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; @@ -42,6 +45,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.HashMap; @@ -49,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * A Shard represents a portion of the logical data tree
@@ -58,6 +63,8 @@ import java.util.concurrent.Executors; */ public class Shard extends RaftActor { + private static final ConfigParams configParams = new ShardConfigParams(); + public static final String DEFAULT_NAME = "default"; private final ListeningExecutorService storeExecutor = @@ -84,7 +91,7 @@ public class Shard extends RaftActor { private final List dataChangeListeners = new ArrayList<>(); private Shard(String name, Map peerAddresses) { - super(name, peerAddresses); + super(name, peerAddresses, Optional.of(configParams)); this.name = name; @@ -323,4 +330,14 @@ public class Shard extends RaftActor { @Override public String persistenceId() { return this.name; } + + + private static class ShardConfigParams extends DefaultConfigParamsImpl { + public static final FiniteDuration HEART_BEAT_INTERVAL = + new FiniteDuration(500, TimeUnit.MILLISECONDS); + + @Override public FiniteDuration getHeartBeatInterval() { + return HEART_BEAT_INTERVAL; + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 56220656ad..915b13dd8b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -76,7 +76,6 @@ public class ThreePhaseCommitCohortProxy implements CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response); if (!reply.getCanCommit()) { - System.out.println("**TOM - failed: false"); return false; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java index df3c78ec97..6599bd8eeb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -12,6 +12,7 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; +import akka.event.Logging; import akka.testkit.JavaTestKit; import junit.framework.Assert; import org.junit.Test; @@ -35,6 +36,8 @@ import scala.concurrent.duration.FiniteDuration; import java.util.Collections; +import static junit.framework.Assert.assertEquals; + public class BasicIntegrationTest extends AbstractActorTest { @Test @@ -61,17 +64,24 @@ public class BasicIntegrationTest extends AbstractActorTest { getRef()); - // Wait for Shard to become a Leader - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(Logging.Info.class + ) { + protected Boolean run() { + return true; + } + }.from(shard.path().toString()) + .message("Switching from state Candidate to Leader") + .occurrences(1).exec(); + + assertEquals(true, result); + // 1. Create a TransactionChain shard.tell(new CreateTransactionChain().toSerializable(), getRef()); final ActorSelection transactionChain = - new ExpectMsg("CreateTransactionChainReply") { + new ExpectMsg(duration("1 seconds"), "CreateTransactionChainReply") { protected ActorSelection match(Object in) { if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) { ActorPath transactionChainPath = @@ -93,7 +103,7 @@ public class BasicIntegrationTest extends AbstractActorTest { transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef()); final ActorSelection transaction = - new ExpectMsg("CreateTransactionReply") { + new ExpectMsg(duration("1 seconds"), "CreateTransactionReply") { protected ActorSelection match(Object in) { if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in); @@ -115,7 +125,7 @@ public class BasicIntegrationTest extends AbstractActorTest { ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(), getRef()); - Boolean writeDone = new ExpectMsg("WriteDataReply") { + Boolean writeDone = new ExpectMsg(duration("1 seconds"), "WriteDataReply") { protected Boolean match(Object in) { if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) { return true; @@ -134,7 +144,7 @@ public class BasicIntegrationTest extends AbstractActorTest { transaction.tell(new ReadyTransaction().toSerializable(), getRef()); final ActorSelection cohort = - new ExpectMsg("ReadyTransactionReply") { + new ExpectMsg(duration("1 seconds"), "ReadyTransactionReply") { protected ActorSelection match(Object in) { if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { ActorPath cohortPath = @@ -157,7 +167,7 @@ public class BasicIntegrationTest extends AbstractActorTest { cohort.tell(new PreCommitTransaction().toSerializable(), getRef()); Boolean preCommitDone = - new ExpectMsg("PreCommitTransactionReply") { + new ExpectMsg(duration("1 seconds"), "PreCommitTransactionReply") { protected Boolean match(Object in) { if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) { return true; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java index 8413bac3a7..920248521a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java @@ -41,7 +41,7 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest { subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 0a0c04b915..fc527b6bff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,11 +1,12 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; +import akka.event.Logging; import akka.testkit.JavaTestKit; - import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import junit.framework.Assert; +import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -20,19 +21,29 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import java.io.File; +import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; -public class DistributedDataStoreIntegrationTest{ +public class DistributedDataStoreIntegrationTest { private static ActorSystem system; @Before - public void setUp() { + public void setUp() throws IOException { + File journal = new File("journal"); + + if(journal.exists()) { + FileUtils.deleteDirectory(journal); + } + + System.setProperty("shard.persistent", "false"); system = ActorSystem.create("test"); } @@ -49,82 +60,153 @@ public class DistributedDataStoreIntegrationTest{ @Test public void integrationTest() throws Exception { - Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); + final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); ShardStrategyFactory.setConfiguration(configuration); - DistributedDataStore distributedDataStore = - new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration); - distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); - Thread.sleep(1500); - DOMStoreReadWriteTransaction transaction = - distributedDataStore.newReadWriteTransaction(); + new JavaTestKit(getSystem()) { + { + + new Within(duration("10 seconds")) { + protected void run() { + try { + final DistributedDataStore distributedDataStore = + new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration); + + distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); + + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(Logging.Info.class + ) { + protected Boolean run() { + return true; + } + }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config") + .message("Switching from state Candidate to Leader") + .occurrences(1).exec(); + + assertEquals(true, result); + + DOMStoreReadWriteTransaction transaction = + distributedDataStore.newReadWriteTransaction(); - transaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + transaction + .write(TestModel.TEST_PATH, ImmutableNodes + .containerNode(TestModel.TEST_QNAME)); - ListenableFuture>> future = - transaction.read(TestModel.TEST_PATH); + ListenableFuture>> + future = + transaction.read(TestModel.TEST_PATH); - Optional> optional = future.get(); + Optional> optional = + future.get(); - Assert.assertTrue(optional.isPresent()); + Assert.assertTrue("Node not found", optional.isPresent()); - NormalizedNode normalizedNode = optional.get(); + NormalizedNode normalizedNode = + optional.get(); - assertEquals(TestModel.TEST_QNAME, normalizedNode.getNodeType()); + assertEquals(TestModel.TEST_QNAME, + normalizedNode.getNodeType()); - DOMStoreThreePhaseCommitCohort ready = transaction.ready(); + DOMStoreThreePhaseCommitCohort ready = + transaction.ready(); - ListenableFuture canCommit = ready.canCommit(); + ListenableFuture canCommit = + ready.canCommit(); - assertTrue(canCommit.get(5, TimeUnit.SECONDS)); + assertTrue(canCommit.get(5, TimeUnit.SECONDS)); - ListenableFuture preCommit = ready.preCommit(); + ListenableFuture preCommit = + ready.preCommit(); - preCommit.get(5, TimeUnit.SECONDS); + preCommit.get(5, TimeUnit.SECONDS); - ListenableFuture commit = ready.commit(); + ListenableFuture commit = ready.commit(); + + commit.get(5, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException | InterruptedException e){ + fail(e.getMessage()); + } + } + }; + } + }; - commit.get(5, TimeUnit.SECONDS); } - @Test + //FIXME : Disabling test because it's flaky + //@Test public void integrationTestWithMultiShardConfiguration() throws ExecutionException, InterruptedException, TimeoutException { - Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); + final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); ShardStrategyFactory.setConfiguration(configuration); - DistributedDataStore distributedDataStore = - new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration); + new JavaTestKit(getSystem()) { + { + + new Within(duration("10 seconds")) { + protected void run() { + try { + final DistributedDataStore distributedDataStore = + new DistributedDataStore(getSystem(), "config", + new MockClusterWrapper(), configuration); + + distributedDataStore.onGlobalContextUpdated( + SchemaContextHelper.full()); + + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter( + Logging.Info.class + ) { + protected Boolean run() { + return true; + } + }.from( + "akka://test/user/shardmanager-config/member-1-shard-cars-1-config") + .message( + "Switching from state Candidate to Leader") + .occurrences(1) + .exec(); + + Thread.sleep(1000); + + + DOMStoreReadWriteTransaction transaction = + distributedDataStore.newReadWriteTransaction(); - distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full()); + transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - // This sleep is fragile - test can fail intermittently if all Shards aren't updated with - // the SchemaContext in time. Is there any way we can make this deterministic? - Thread.sleep(2000); + DOMStoreThreePhaseCommitCohort ready = transaction.ready(); - DOMStoreReadWriteTransaction transaction = - distributedDataStore.newReadWriteTransaction(); + ListenableFuture canCommit = ready.canCommit(); - transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + assertTrue(canCommit.get(5, TimeUnit.SECONDS)); - DOMStoreThreePhaseCommitCohort ready = transaction.ready(); + ListenableFuture preCommit = ready.preCommit(); - ListenableFuture canCommit = ready.canCommit(); + preCommit.get(5, TimeUnit.SECONDS); - assertTrue(canCommit.get(5, TimeUnit.SECONDS)); + ListenableFuture commit = ready.commit(); - ListenableFuture preCommit = ready.preCommit(); + commit.get(5, TimeUnit.SECONDS); - preCommit.get(5, TimeUnit.SECONDS); + assertEquals(true, result); + } catch(ExecutionException | TimeoutException | InterruptedException e){ + fail(e.getMessage()); + } + } + }; + } + }; - ListenableFuture commit = ready.commit(); - commit.get(5, TimeUnit.SECONDS); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 268ed3c273..e9ad450ed8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -75,7 +75,7 @@ public class ShardManagerTest { subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); - expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS); + expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS); expectNoMsg(); } @@ -170,7 +170,7 @@ public class ShardManagerTest { subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); - final String out = new ExpectMsg("primary found") { + final String out = new ExpectMsg(duration("1 seconds"), "primary found") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { @@ -208,13 +208,13 @@ public class ShardManagerTest { subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); - expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS); + expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS); MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString()); subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); - expectMsgClass(PrimaryNotFound.SERIALIZABLE_CLASS); + expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); expectNoMsg(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 32d90d0ef7..431a266b14 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -2,7 +2,9 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; +import akka.event.Logging; import akka.testkit.JavaTestKit; +import junit.framework.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; @@ -38,19 +40,25 @@ public class ShardTest extends AbstractActorTest { getSystem().actorOf(props, "testCreateTransactionChain"); - // Wait for Shard to become a Leader - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(Logging.Info.class + ) { + protected Boolean run() { + return true; + } + }.from(subject.path().toString()) + .message("Switching from state Candidate to Leader") + .occurrences(1).exec(); + + Assert.assertEquals(true, result); new Within(duration("1 seconds")) { protected void run() { subject.tell(new CreateTransactionChain().toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){ @@ -107,7 +115,7 @@ public class ShardTest extends AbstractActorTest { assertFalse(notificationEnabled); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(RegisterChangeListenerReply.class)) { @@ -138,13 +146,18 @@ public class ShardTest extends AbstractActorTest { getSystem().actorOf(props, "testCreateTransaction"); - // Wait for Shard to become a Leader - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(Logging.Info.class + ) { + protected Boolean run() { + return true; + } + }.from(subject.path().toString()) + .message("Switching from state Candidate to Leader") + .occurrences(1).exec(); + Assert.assertEquals(true, result); new Within(duration("1 seconds")) { protected void run() { @@ -156,7 +169,7 @@ public class ShardTest extends AbstractActorTest { subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in instanceof CreateTransactionReply) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java index 57d0bd6aa9..b35880a6a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java @@ -35,7 +35,7 @@ public class ShardTransactionChainTest extends AbstractActorTest { subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { @@ -70,7 +70,7 @@ public class ShardTransactionChainTest extends AbstractActorTest { subject.tell(new CloseTransactionChain().toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index f15e3bff06..632ecc29cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -65,7 +65,7 @@ public class ShardTransactionTest extends AbstractActorTest { new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { @@ -105,7 +105,7 @@ public class ShardTransactionTest extends AbstractActorTest { new ReadData(TestModel.TEST_PATH).toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { @@ -141,7 +141,7 @@ public class ShardTransactionTest extends AbstractActorTest { getRef()); final CompositeModification compositeModification = - new ExpectMsg("match hint") { + new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected CompositeModification match(Object in) { if (in instanceof ShardTransaction.GetCompositeModificationReply) { @@ -180,7 +180,7 @@ public class ShardTransactionTest extends AbstractActorTest { ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) { @@ -255,7 +255,7 @@ public class ShardTransactionTest extends AbstractActorTest { subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) { @@ -292,7 +292,7 @@ public class ShardTransactionTest extends AbstractActorTest { subject.tell(new ReadyTransaction().toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { @@ -330,7 +330,7 @@ public class ShardTransactionTest extends AbstractActorTest { subject.tell(new CloseTransaction().toSerializable(), getRef()); - final String out = new ExpectMsg("match hint") { + final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) { @@ -343,7 +343,7 @@ public class ShardTransactionTest extends AbstractActorTest { assertEquals("match", out); - final String termination = new ExpectMsg("match hint") { + final String termination = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards protected String match(Object in) { if (in instanceof Terminated) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index aebff27c7d..eda1c304e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -1,4 +1,5 @@ akka { + loggers = [akka.testkit.TestEventListener] actor { serializers { java = "akka.serialization.JavaSerializer"