import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
// it does do a persist)
return new Shard(newShardBuilder()) {
@Override
- public void onReceiveCommand(final Object message) throws Exception {
+ public void handleCommand(final Object message) {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
// Got the first ElectionTimeout. We don't forward it to the
// base Shard yet until we've sent the RegisterChangeListener
onFirstElectionTimeout.countDown();
} else {
- super.onReceiveCommand(message);
+ super.handleCommand(message);
}
}
};
public Shard create() throws Exception {
return new Shard(newShardBuilder()) {
@Override
- public void onReceiveCommand(final Object message) throws Exception {
+ public void handleCommand(final Object message) {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
firstElectionTimeout = false;
final ActorRef self = getSelf();
onFirstElectionTimeout.countDown();
} else {
- super.onReceiveCommand(message);
+ super.handleCommand(message);
}
}
};
}};
}
- @SuppressWarnings("serial")
@Test
public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
- final CountDownLatch recoveryComplete = new CountDownLatch(1);
- class TestShard extends Shard {
- TestShard() {
- super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
- peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
- schemaContext(SCHEMA_CONTEXT));
- }
-
- String getPeerAddress(String id) {
- return getRaftActorContext().getPeerAddress(id);
- }
-
- @Override
- protected void onRecoveryComplete() {
- try {
- super.onRecoveryComplete();
- } finally {
- recoveryComplete.countDown();
- }
- }
- }
-
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(new Creator<Shard>() {
- @Override
- public TestShard create() throws Exception {
- return new TestShard();
- }
- })), "testPeerAddressResolved");
-
- assertEquals("Recovery complete", true,
- Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ ShardIdentifier peerID = ShardIdentifier.builder().memberName("member-2")
+ .shardName("inventory").type("config").build();
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
+ peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
final String address = "akka://foobar";
- shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
+ shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
- assertEquals("getPeerAddress", address,
- ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
+ shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
+ OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
+ assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
}};
}
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
payloadForModification(source, writeMod)));
- shard.underlyingActor().onReceiveCommand(applyState);
+ shard.tell(applyState, shard);
+
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
- final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
- assertEquals("Applied state", node, actual);
+ final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+ if(actual != null) {
+ assertEquals("Applied state", node, actual);
+ return;
+ }
+ }
+
+ fail("State was not applied");
}
@Test
}
@Test
- public void testReadyWithImmediateCommit() throws Exception{
+ public void testReadyWithReadWriteImmediateCommit() throws Exception{
testReadyWithImmediateCommit(true);
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
testReadyWithImmediateCommit(false);
}
}
@Test
- public void testCommitWithPersistenceDisabled() throws Throwable {
+ public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
+ testCommitWithPersistenceDisabled(true);
+ }
+
+ @Test
+ public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
testCommitWithPersistenceDisabled(true);
- testCommitWithPersistenceDisabled(false);
}
private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
}
@Test
- public void testCommitWhenTransactionHasNoModifications() {
+ public void testReadWriteCommitWhenTransactionHasNoModifications() {
testCommitWhenTransactionHasNoModifications(true);
+ }
+
+ @Test
+ public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
testCommitWhenTransactionHasNoModifications(false);
}
}
@Test
- public void testCommitWhenTransactionHasModifications() {
+ public void testReadWriteCommitWhenTransactionHasModifications() {
testCommitWhenTransactionHasModifications(true);
+ }
+
+ @Test
+ public void testWriteOnlyCommitWhenTransactionHasModifications() {
testCommitWhenTransactionHasModifications(false);
}
final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+ doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
final String transactionID2 = "tx2";
final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
doReturn(candidate).when(cohort).getCandidate();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
doReturn(candidate).when(cohort).getCandidate();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
public Shard create() throws Exception {
return new Shard(newShardBuilder()) {
@Override
- public void onReceiveCommand(final Object message) throws Exception {
- super.onReceiveCommand(message);
- if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ public void handleCommand(final Object message) {
+ super.handleCommand(message);
+ if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
if(cleaupCheckLatch.get() != null) {
cleaupCheckLatch.get().countDown();
}
public void handleCommand(final Object message) {
super.handleCommand(message);
- if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+ // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
+ if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
latch.get().countDown();
}
}
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testFollowerInitialSyncStatus");
- shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
+ shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
- shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
+ shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
}
"akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
- leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
String leaderPath = waitUntilLeader(followerShard);
assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
"akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
- leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
String leaderPath = waitUntilLeader(followerShard);
assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);