import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
}};
}
- @SuppressWarnings("serial")
@Test
public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
- final CountDownLatch recoveryComplete = new CountDownLatch(1);
- class TestShard extends Shard {
- TestShard() {
- super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
- peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
- schemaContext(SCHEMA_CONTEXT));
- }
-
- String getPeerAddress(String id) {
- return getRaftActorContext().getPeerAddress(id);
- }
-
- @Override
- protected void onRecoveryComplete() {
- try {
- super.onRecoveryComplete();
- } finally {
- recoveryComplete.countDown();
- }
- }
- }
-
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(new Creator<Shard>() {
- @Override
- public TestShard create() throws Exception {
- return new TestShard();
- }
- })), "testPeerAddressResolved");
-
- assertEquals("Recovery complete", true,
- Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ ShardIdentifier peerID = ShardIdentifier.builder().memberName("member-2")
+ .shardName("inventory").type("config").build();
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
+ peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
final String address = "akka://foobar";
- shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
+ shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
- assertEquals("getPeerAddress", address,
- ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
+ shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
+ OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
+ assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
}};
}
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
payloadForModification(source, writeMod)));
- shard.underlyingActor().onReceiveCommand(applyState);
+ shard.tell(applyState, shard);
+
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
+
+ final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+ if(actual != null) {
+ assertEquals("Applied state", node, actual);
+ return;
+ }
+ }
- final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
- assertEquals("Applied state", node, actual);
+ fail("State was not applied");
}
@Test