import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
ShardTestKit kit = new ShardTestKit(getSystem());
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
+ shardBatchedModificationCount(5);
String peerId = actorFactory.generateActorId("leader");
TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
MockLeader leader = peer.underlyingActor();
assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
leader.modificationsReceived, 5, TimeUnit.SECONDS));
- verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
+ LOCAL_MEMBER_NAME);
+
+ shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
- leader.modificationsReceived = new CountDownLatch(2);
+ // Test with initial commit timeout and subsequent retry.
+
+ leader.modificationsReceived = new CountDownLatch(1);
leader.sendReply = false;
shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
leader.modificationsReceived, 5, TimeUnit.SECONDS));
- verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
+ verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
+ LOCAL_MEMBER_NAME);
+
+ // Send a bunch of registration messages quickly and verify.
+
+ int max = 100;
+ leader.delay = 4;
+ leader.modificationsReceived = new CountDownLatch(max);
+ List<YangInstanceIdentifier> entityIds = new ArrayList<>();
+ for(int i = 1; i <= max; i++) {
+ YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
+ entityIds.add(id);
+ shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
+ }
+
+ assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+ leader.modificationsReceived, 10, TimeUnit.SECONDS));
+
+ // Sleep a little to ensure no additional BatchedModifications are received.
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+ List<Modification> receivedMods = leader.getAndClearReceivedModifications();
+ for(int i = 0; i < max; i++) {
+ verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
+ }
+
+ assertEquals("# modifications received", max, receivedMods.size());
}
private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
}
- private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
+ private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
YangInstanceIdentifier entityId, String candidateName) throws Exception {
- assertEquals("BatchedModifications size", 1, mods.getModifications().size());
- assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
- verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
+ assertEquals("BatchedModifications size", 1, mods.size());
+ verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
+ }
+
+ private void verifyBatchedEntityCandidate(Modification mod, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) throws Exception {
+ assertEquals("Modification type", MergeModification.class, mod.getClass());
+ verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
entityId, candidateName);
}
public static class MockLeader extends UntypedActor {
volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
- volatile BatchedModifications receivedModifications;
+ List<Modification> receivedModifications = new ArrayList<>();
volatile boolean sendReply = true;
+ volatile long delay;
@Override
public void onReceive(Object message) {
if(message instanceof BatchedModifications) {
- receivedModifications = (BatchedModifications) message;
- modificationsReceived.countDown();
+ if(delay > 0) {
+ Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
+ }
+
if(sendReply) {
+ BatchedModifications mods = (BatchedModifications) message;
+ synchronized (receivedModifications) {
+ for(int i = 0; i < mods.getModifications().size(); i++) {
+ receivedModifications.add(mods.getModifications().get(i));
+ modificationsReceived.countDown();
+ }
+ }
+
getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
} else {
sendReply = true;
}
}
}
+
+ List<Modification> getAndClearReceivedModifications() {
+ synchronized (receivedModifications) {
+ List<Modification> ret = new ArrayList<>(receivedModifications);
+ receivedModifications.clear();
+ return ret;
+ }
+ }
}
}