import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
+import java.io.File;
+import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.opendaylight.yangtools.util.AbstractStringIdentifier;
-import java.io.File;
-import java.io.IOException;
public abstract class AbstractActorTest {
protected static final class MockIdentifier extends AbstractStringIdentifier<MockIdentifier> {
private static ActorSystem system;
@BeforeClass
- public static void setUpClass() throws Exception{
+ public static void setUpClass() throws Exception {
deleteJournal();
System.setProperty("shard.persistent", "false");
system = ActorSystem.create("test");
}
@AfterClass
- public static void tearDownClass() throws Exception{
+ public static void tearDownClass() throws Exception {
deleteJournal();
JavaTestKit.shutdownActorSystem(system);
system = null;
protected static void deleteJournal() throws IOException {
File journal = new File("journal");
- if(journal.exists()) {
+ if (journal.exists()) {
FileUtils.deleteDirectory(journal);
}
}
import static akka.pattern.Patterns.ask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+
import akka.actor.ActorRef;
import akka.actor.InvalidActorNameException;
import akka.actor.PoisonPill;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" })
@Override
public void handleCommand(Object message) {
- if(message instanceof MockPayload) {
- MockPayload payload = (MockPayload)message;
+ if (message instanceof MockPayload) {
+ MockPayload payload = (MockPayload) message;
super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload);
return;
}
- if(message instanceof ServerConfigurationPayload) {
- super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload)message);
+ if (message instanceof ServerConfigurationPayload) {
+ super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message);
return;
}
- if(message instanceof SetPeerAddress) {
+ if (message instanceof SetPeerAddress) {
setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
((SetPeerAddress) message).getPeerAddress());
return;
try {
Predicate drop = dropMessages.get(message.getClass());
- if(drop == null || !drop.test(message)) {
+ if (drop == null || !drop.test(message)) {
super.handleCommand(message);
}
} finally {
- if(!(message instanceof SendHeartBeat)) {
+ if (!(message instanceof SendHeartBeat)) {
try {
collectorActor.tell(message, ActorRef.noSender());
} catch (Exception e) {
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void createSnapshot(ActorRef actorRef) {
try {
actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
} catch (Exception e) {
- e.printStackTrace();
+ Throwables.propagate(e);
}
}
public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
private TestActorRef<MessageCollectorActor> collectorActor;
- public Builder collectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
- this.collectorActor = collectorActor;
+ public Builder collectorActor(TestActorRef<MessageCollectorActor> newCollectorActor) {
+ this.collectorActor = newCollectorActor;
return this;
}
RaftActorTestKit.waitUntilLeader(actorRef);
}
- protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
+ protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> newPeerAddresses,
ConfigParams configParams) {
- return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses :
- Collections.<String, String>emptyMap()).config(configParams));
+ return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(newPeerAddresses != null
+ ? newPeerAddresses : Collections.<String, String>emptyMap()).config(configParams));
}
protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) {
factory.generateActorId(id + "-collector"))).id(id);
InvalidActorNameException lastEx = null;
- for(int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
try {
return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
} catch (InvalidActorNameException e) {
throw lastEx;
}
- protected void killActor(TestActorRef<TestRaftActor> leaderActor) {
+ protected void killActor(TestActorRef<TestRaftActor> actor) {
JavaTestKit testkit = new JavaTestKit(getSystem());
- testkit.watch(leaderActor);
+ testkit.watch(actor);
- leaderActor.tell(PoisonPill.getInstance(), null);
+ actor.tell(PoisonPill.getInstance(), null);
testkit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
- testkit.unwatch(leaderActor);
+ testkit.unwatch(actor);
}
protected void verifyApplyJournalEntries(ActorRef actor, final long expIndex) {
- MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, msg -> msg.getToIndex() == expIndex);
+ MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class,
+ msg -> msg.getToIndex() == expIndex);
}
@SuppressWarnings("unchecked")
List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState,
actualState), expSnapshotState.size(), actualState.size());
- for(int i = 0; i < expSnapshotState.size(); i++) {
+ for (int i = 0; i < expSnapshotState.size(); i++) {
assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
}
}
protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
List<ReplicatedLogEntry> journal = InMemoryJournal.get(persistenceId, ReplicatedLogEntry.class);
assertEquals("Journal ReplicatedLogEntry count", expJournal.size(), journal.size());
- for(int i = 0; i < expJournal.size(); i++) {
+ for (int i = 0; i < expJournal.size(); i++) {
ReplicatedLogEntry expected = expJournal.get(i);
ReplicatedLogEntry actual = journal.get(i);
verifyReplicatedLogEntry(expected, actual.getTerm(), actual.getIndex(), actual.getData());
}
}
- protected MockPayload sendPayloadData(ActorRef leaderActor, String data) {
- return sendPayloadData(leaderActor, data, 0);
+ protected MockPayload sendPayloadData(ActorRef actor, String data) {
+ return sendPayloadData(actor, data, 0);
}
- protected MockPayload sendPayloadData(ActorRef leaderActor, String data, int size) {
+ protected MockPayload sendPayloadData(ActorRef actor, String data, int size) {
MockPayload payload;
- if(size > 0) {
+ if (size > 0) {
payload = new MockPayload(data, size);
} else {
payload = new MockPayload(data);
}
- leaderActor.tell(payload, ActorRef.noSender());
+ actor.tell(payload, ActorRef.noSender());
return payload;
}
assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData());
}
- protected String testActorPath(String id){
+ protected String testActorPath(String id) {
return factory.createTestActorPath(id);
}
actor.getCurrentBehavior().getReplicatedToAllIndex());
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
static void verifyRaftState(ActorRef raftActor, Consumer<OnDemandRaftState> verifier) {
Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
AssertionError lastError = null;
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
try {
OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+
import akka.japi.Procedure;
import java.util.HashMap;
import java.util.List;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
/**
-*
+* Unit tests for AbstractReplicatedLogImplTest.
*/
public class AbstractReplicatedLogImplTest {
long lastIndex = 0;
long lastTerm = 0;
- for(int i = 0; i < numEntries; i++) {
+ for (int i = 0; i < numEntries; i++) {
ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i);
map.put(entry.getIndex(), entry.getData().toString());
lastIndex = entry.getIndex();
return map;
}
- class MockAbstractReplicatedLogImpl extends AbstractReplicatedLogImpl {
- @Override
- public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
- }
+ class MockAbstractReplicatedLogImpl extends AbstractReplicatedLogImpl {
@Override
public boolean removeFromAndPersist(final long index) {
return true;
public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
}
+ @Override
+ public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+ }
+
@Override
public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
public class DefaultConfigParamsImplTest {
@Test
- public void testGetRaftPolicyWithDefault(){
+ public void testGetRaftPolicyWithDefault() {
DefaultConfigParamsImpl params = new DefaultConfigParamsImpl();
assertEquals("Default instance", DefaultRaftPolicy.INSTANCE, params.getRaftPolicy());
}
@Test
- public void testGetRaftPolicyInvalidClassName(){
+ public void testGetRaftPolicyInvalidClassName() {
DefaultConfigParamsImpl params = new DefaultConfigParamsImpl();
params.setCustomRaftPolicyImplementationClass("foobar");
}
@Test
- public void testGetRaftPolicyValidClassNameButInvalidType(){
+ public void testGetRaftPolicyValidClassNameButInvalidType() {
DefaultConfigParamsImpl params = new DefaultConfigParamsImpl();
params.setCustomRaftPolicyImplementationClass("java.lang.String");
}
@Test
- public void testGetRaftPolicyValidClass(){
+ public void testGetRaftPolicyValidClass() {
DefaultConfigParamsImpl params1 = new DefaultConfigParamsImpl();
- params1.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.TestRaftPolicy");
+ params1.setCustomRaftPolicyImplementationClass(
+ "org.opendaylight.controller.cluster.raft.policy.TestRaftPolicy");
RaftPolicy behavior1 = params1.getRaftPolicy();
assertEquals("TestCustomBehavior", TestRaftPolicy.class, behavior1.getClass());
DefaultConfigParamsImpl params2 = new DefaultConfigParamsImpl();
RaftPolicy behavior2 = params2.getRaftPolicy();
- params1.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.TestRaftPolicy");
+ params1.setCustomRaftPolicyImplementationClass(
+ "org.opendaylight.controller.cluster.raft.policy.TestRaftPolicy");
assertEquals("Default instance", DefaultRaftPolicy.INSTANCE, behavior2);
assertEquals("Default instance", DefaultRaftPolicy.INSTANCE, params2.getRaftPolicy());
}
-}
\ No newline at end of file
+}
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.verify;
+
import akka.japi.Procedure;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.TimeUnit;
}
@Test
- public void testOkToReplicate(){
+ public void testOkToReplicate() {
MockRaftActorContext context = new MockRaftActorContext();
context.setCommitIndex(0);
FollowerLogInformation followerLogInformation =
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
-import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
-import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
import akka.actor.Actor;
import akka.actor.ActorRef;
// Send an initial payloads and verify replication.
- MockPayload payload0 = sendPayloadData(leaderActor, "zero");
- MockPayload payload1 = sendPayloadData(leaderActor, "one");
+ final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ final MockPayload payload1 = sendPayloadData(leaderActor, "one");
verifyApplyJournalEntries(leaderCollectorActor, 1);
verifyApplyJournalEntries(follower1CollectorActor, 1);
verifyApplyJournalEntries(follower2CollectorActor, 1);
testLog.info("Sending payload to isolated leader");
- MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
// Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
// is collected but not forwarded to the follower RaftActor.
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
- // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2 and committed.
+ // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
+ // and committed.
testLog.info("Sending payload to new leader");
- MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
verifyApplyJournalEntries(follower1CollectorActor, 2);
verifyApplyJournalEntries(follower2CollectorActor, 2);
// Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
// with a higher term.
- expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.Follower.name()));
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's index 1 entry.
// Submit an initial payload that is committed/applied on all nodes.
- MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
verifyApplyJournalEntries(leaderCollectorActor, 0);
verifyApplyJournalEntries(follower1CollectorActor, 0);
verifyApplyJournalEntries(follower2CollectorActor, 0);
// message is forwarded to the followers.
expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
verifyApplyJournalEntries(leaderCollectorActor, 1);
testLog.info("Sending payload to isolated leader");
- MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
// Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
// is collected but not forwarded to the follower RaftActor.
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
testLog.info("Sending payload to new leader");
- MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
verifyApplyJournalEntries(follower1CollectorActor, 3);
verifyApplyJournalEntries(follower2CollectorActor, 3);
// Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
// with a higher term.
- expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.Follower.name()));
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's entry.
// Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
- for(ApplyState as: applyState) {
- if(as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
+ for (ApplyState as: applyState) {
+ if (as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
fail("Got unexpected ApplyState: " + as);
}
}
// Submit an initial payload that is committed/applied on all nodes.
- MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
verifyApplyJournalEntries(leaderCollectorActor, 0);
verifyApplyJournalEntries(follower1CollectorActor, 0);
verifyApplyJournalEntries(follower2CollectorActor, 0);
// message is forwarded to the followers.
expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
verifyApplyJournalEntries(leaderCollectorActor, 1);
// are collected but not forwarded to the follower RaftActor.
expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
- for(ReplicatedLogEntry e: ae.getEntries()) {
- if(e.getIndex() == 4) {
+ for (ReplicatedLogEntry e: ae.getEntries()) {
+ if (e.getIndex() == 4) {
return true;
}
}
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
testLog.info("Sending 3 payloads to new leader");
- MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
- MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
- MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
+ final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ final MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
+ final MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
verifyApplyJournalEntries(follower1CollectorActor, 5);
verifyApplyJournalEntries(follower2CollectorActor, 5);
// Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
// with a higher term.
- expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.Follower.name()));
// The previous leader has conflicting log entries starting at index 2 with different terms which should get
// replaced by the new leader's entries.
// Ensure the prior leader didn't apply any of its conflicting entries with term 1.
List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
- for(ApplyState as: applyState) {
- if(as.getReplicatedLogEntry().getTerm() == 1) {
+ for (ApplyState as: applyState) {
+ if (as.getReplicatedLogEntry().getTerm() == 1) {
fail("Got unexpected ApplyState: " + as);
}
}
follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
expectFirstMatching(follower1NotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.Leader.name()));
+ rc -> rc.getNewRole().equals(RaftState.Leader.name()));
currentTerm = follower1Context.getTermInformation().getCurrentTerm();
}
leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
leaderActor.underlyingActor().startDropMessages(RequestVote.class);
- follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
- follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
+ ae -> ae.getLeaderId().equals(leaderId));
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
+ ae -> ae.getLeaderId().equals(leaderId));
clearMessages(follower1CollectorActor);
clearMessages(follower1NotifierActor);
followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
followerConfigParams.setElectionTimeoutFactor(1000);
follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
- config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
+ .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
follower1Id, testActorPath(follower1Id)), followerConfigParams);
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).build();
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
factory.generateActorId(leaderId + "-notifier"));
- leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
- config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
+ .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
import static org.junit.Assert.assertNull;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.pattern.Patterns;
private ActorRef follower3CollectorActor;
@Test
- public void testLeaderTransferOnShutDown() throws Throwable {
+ public void testLeaderTransferOnShutDown() throws Exception {
testLog.info("testLeaderTransferOnShutDown starting");
createRaftActors();
testLog.info("sendShutDown for {} ending", actor.path());
}
- private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable {
+ private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
clearMessages(leaderNotifierActor);
clearMessages(follower3NotifierActor);
FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
- Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
+ final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
assertNullLeaderIdChange(leaderNotifierActor);
assertNullLeaderIdChange(follower1NotifierActor);
factory.generateActorId(follower1Id + "-notifier"));
follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
- follower3Id, testActorPath(follower3Id))).
- config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
+ follower3Id, testActorPath(follower3Id)))
+ .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
factory.generateActorId(follower2Id + "-notifier"));
follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
- follower3Id, testActorPath(follower3Id))).
- config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
+ follower3Id, testActorPath(follower3Id)))
+ .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
factory.generateActorId(follower3Id + "-notifier"));
follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
- follower2Id, follower2Actor.path().toString())).
- config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
+ follower2Id, follower2Actor.path().toString()))
+ .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).
- put(follower3Id, follower3Actor.path().toString()).build();
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString())
+ .put(follower3Id, follower3Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
leaderConfigParams.setElectionTimeoutFactor(3);
leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
factory.generateActorId(leaderId + "-notifier"));
- leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
- config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
+ .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
}
@Test
- public void testLeaderTransferAborted() throws Throwable {
+ public void testLeaderTransferAborted() throws Exception {
testLog.info("testLeaderTransferAborted starting");
createRaftActors();
}
@Test
- public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable {
+ public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
/**
* Unit tests for migrated messages on recovery.
private TestActorFactory factory;
@Before
- public void setUp(){
+ public void setUp() {
factory = new TestActorFactory(getSystem());
}
factory.killActor(actor, new JavaTestKit(getSystem()));
- actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config).persistent(Optional.of(false)).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config)
+ .persistent(Optional.of(false)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
mockRaftActor = actor.underlyingActor();
mockRaftActor.waitForRecoveryComplete();
}
};
- TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
- config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
+ .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForRecoveryComplete();
InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig));
TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
- persistent, snapshot -> {
- assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
- assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
- assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
- new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
- });
+ persistent, snapshot -> {
+ assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
+ assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+ assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
+ new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
+ });
return actor;
}
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
Consumer<Snapshot> snapshotVerifier) {
InMemorySnapshotStore.addSnapshotSavedLatch(id);
}
};
- TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
- config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
+ .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForRecoveryComplete();
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayInputStream;
import java.io.IOException;
this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
mock(RaftActorSnapshotCohort.class);
- if(builder.dataPersistenceProvider == null){
+ if (builder.dataPersistenceProvider == null) {
setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
} else {
setPersistence(builder.dataPersistenceProvider);
try {
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- e.printStackTrace();
+ Throwables.propagate(e);
}
}
try {
assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- e.printStackTrace();
+ Throwables.propagate(e);
}
}
- public void waitUntilLeader(){
- for(int i = 0;i < 10; i++){
- if(isLeader()){
+ public void waitUntilLeader() {
+ for (int i = 0; i < 10; i++) {
+ if (isLeader()) {
break;
}
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
private void applySnapshotBytes(byte[] bytes) {
+ if (bytes.length == 0) {
+ return;
+ }
+
try {
Object data = toObject(bytes);
if (data instanceof List) {
state.clear();
state.addAll((List<?>) data);
}
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (ClassNotFoundException | IOException e) {
+ Throwables.propagate(e);
}
}
@Override
protected void handleCommand(final Object message) {
- if(message instanceof RaftActorBehavior) {
+ if (message instanceof RaftActorBehavior) {
super.changeCurrentBehavior((RaftActorBehavior)message);
} else {
super.handleCommand(message);
- if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
+ if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
snapshotCommitted.countDown();
}
}
@Override
protected void pauseLeader(Runnable operation) {
- if(pauseLeaderFunction != null) {
+ if (pauseLeaderFunction != null) {
pauseLeaderFunction.apply(operation);
} else {
super.pauseLeader(operation);
return obj;
}
- public ReplicatedLog getReplicatedLog(){
+ public ReplicatedLog getReplicatedLog() {
return this.getRaftActorContext().getReplicatedLog();
}
return restoreFromSnapshot;
}
- public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config){
+ public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
return builder().id(id).peerAddresses(peerAddresses).config(config).props();
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
- return builder().id(id).peerAddresses(peerAddresses).config(config).
- dataPersistenceProvider(dataPersistenceProvider).props();
+ ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
+ return builder().id(id).peerAddresses(peerAddresses).config(config)
+ .dataPersistenceProvider(dataPersistenceProvider).props();
}
public static Builder builder() {
return (T) this;
}
- public T id(String id) {
- this.id = id;
+ public T id(String newId) {
+ this.id = newId;
return self();
}
- public T peerAddresses(Map<String, String> peerAddresses) {
- this.peerAddresses = peerAddresses;
+ public T peerAddresses(Map<String, String> newPeerAddresses) {
+ this.peerAddresses = newPeerAddresses;
return self();
}
- public T config(ConfigParams config) {
- this.config = config;
+ public T config(ConfigParams newConfig) {
+ this.config = newConfig;
return self();
}
- public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
- this.dataPersistenceProvider = dataPersistenceProvider;
+ public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
+ this.dataPersistenceProvider = newDataPersistenceProvider;
return self();
}
- public T roleChangeNotifier(ActorRef roleChangeNotifier) {
- this.roleChangeNotifier = roleChangeNotifier;
+ public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
+ this.roleChangeNotifier = newRoleChangeNotifier;
return self();
}
- public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
- this.snapshotMessageSupport = snapshotMessageSupport;
+ public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
+ this.snapshotMessageSupport = newSnapshotMessageSupport;
return self();
}
- public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
- this.restoreFromSnapshot = restoreFromSnapshot;
+ public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
+ this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T persistent(Optional<Boolean> persistent) {
- this.persistent = persistent;
+ public T persistent(Optional<Boolean> newPersistent) {
+ this.persistent = newPersistent;
return self();
}
- public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
- this.pauseLeaderFunction = pauseLeaderFunction;
+ public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
+ this.pauseLeaderFunction = newPauseLeaderFunction;
return self();
}
- public T snapshotCohort(RaftActorSnapshotCohort snapshotCohort) {
- this.snapshotCohort = snapshotCohort;
+ public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
+ this.snapshotCohort = newSnapshotCohort;
return self();
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure;
+import com.google.common.base.Throwables;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
}
@Override
- public void update(long currentTerm, String votedFor){
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
+ public void update(long newTerm, String newVotedFor) {
+ this.currentTerm = newTerm;
+ this.votedFor = newVotedFor;
// TODO : Write to some persistent state
}
- @Override public void updateAndPersist(long currentTerm,
- String votedFor) {
- update(currentTerm, votedFor);
+ @Override public void updateAndPersist(long newTerm, String newVotedFor) {
+ update(newTerm, newVotedFor);
}
};
}
- public MockRaftActorContext(){
+ public MockRaftActorContext() {
super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
- public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
+ public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
}
- public void initReplicatedLog(){
+ public void initReplicatedLog() {
SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
long term = getTermInformation().getCurrentTerm();
replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
@Override public ActorSelection getPeerActorSelection(String peerId) {
String peerAddress = getPeerAddress(peerId);
- if(peerAddress != null){
+ if (peerAddress != null) {
return actorSelection(peerAddress);
}
return null;
}
public void setPeerAddresses(Map<String, String> peerAddresses) {
- for(String id: getPeerIds()) {
+ for (String id: getPeerIds()) {
removePeer(id);
}
- for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
+ for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
}
}
}
public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
- @Override
- public void appendAndPersist(
- ReplicatedLogEntry replicatedLogEntry) {
- append(replicatedLogEntry);
- }
-
@Override
public int dataSize() {
return -1;
}
@Override
+ public void appendAndPersist(
+ ReplicatedLogEntry replicatedLogEntry) {
+ append(replicatedLogEntry);
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
append(replicatedLogEntry);
- if(callback != null) {
+ if (callback != null) {
try {
callback.apply(replicatedLogEntry);
} catch (Exception e) {
- e.printStackTrace();
+ Throwables.propagate(e);
}
}
}
public MockPayload() {
}
- public MockPayload(String s) {
- this.value = s;
+ public MockPayload(String data) {
+ this.value = data;
size = value.length();
}
- public MockPayload(String s, int size) {
- this(s);
+ public MockPayload(String data, int size) {
+ this(data);
this.size = size;
}
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((value == null) ? 0 : value.hashCode());
+ result = prime * result + (value == null ? 0 : value.hashCode());
return result;
}
private final long index;
private final Payload data;
- public MockReplicatedLogEntry(long term, long index, Payload data){
+ public MockReplicatedLogEntry(long term, long index, Payload data) {
this.term = term;
this.index = index;
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((data == null) ? 0 : data.hashCode());
- result = prime * result + (int) (index ^ (index >>> 32));
- result = prime * result + (int) (term ^ (term >>> 32));
+ result = prime * result + (data == null ? 0 : data.hashCode());
+ result = prime * result + (int) (index ^ index >>> 32);
+ result = prime * result + (int) (term ^ term >>> 32);
return result;
}
private final ReplicatedLog mockLog = new SimpleReplicatedLog();
public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
- for (int i=start; i<end; i++) {
- this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload(Integer.toString(i))));
+ for (int i = start; i < end; i++) {
+ this.mockLog.append(new ReplicatedLogImplEntry(i, term,
+ new MockRaftActorContext.MockPayload(Integer.toString(i))));
}
return this;
}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import akka.actor.ActorRef;
-import akka.dispatch.Dispatchers;
import scala.concurrent.duration.FiniteDuration;
/**
DefaultConfigParamsImpl follower2ConfigParams = newFollowerConfigParams();
follower2ConfigParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
follower2Actor = newTestRaftActor(follower2Id, TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
- config(follower2ConfigParams).persistent(Optional.of(false)));
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString()))
+ .config(follower2ConfigParams).persistent(Optional.of(false)));
TestRaftActor follower2Instance = follower2Actor.underlyingActor();
follower2Instance.waitForRecoveryComplete();
follower2CollectorActor = follower2Instance.collectorActor();
setupLeaderAndNonVotingFollower();
((DefaultConfigParamsImpl)follower1Context.getConfigParams()).setElectionTimeoutFactor(2);
- ((DefaultConfigParamsImpl)follower1Context.getConfigParams()).
- setHeartBeatInterval(FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)follower1Context.getConfigParams())
+ .setHeartBeatInterval(FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
MessageCollectorActor.clearMessages(roleChangeNotifier);
follower1Actor.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
private void createNewLeaderActor() {
expSnapshotState.clear();
- leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
- config(leaderConfigParams).persistent(Optional.of(false)));
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
+ .config(leaderConfigParams).persistent(Optional.of(false)));
leaderInstance = leaderActor.underlyingActor();
leaderCollectorActor = leaderInstance.collectorActor();
waitUntilLeader(leaderActor);
private void setupLeaderAndNonVotingFollower() {
snapshotBatchCount = 100;
- int initialTerm = 1;
+ int persistedTerm = 1;
// Set up a persisted ServerConfigurationPayload with the leader voting and the follower non-voting.
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false)));
- ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, initialTerm,
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, persistedTerm,
persistedServerConfig);
- InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
+ InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(persistedTerm, leaderId));
InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
- InMemoryJournal.addEntry(follower1Id, 1, new UpdateElectionTerm(initialTerm, leaderId));
+ InMemoryJournal.addEntry(follower1Id, 1, new UpdateElectionTerm(persistedTerm, leaderId));
InMemoryJournal.addEntry(follower1Id, 2, persistedServerConfigEntry);
DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
follower1Actor = newTestRaftActor(follower1Id, follower1Builder.peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId))).config(followerConfigParams).
- persistent(Optional.of(false)));
+ ImmutableMap.of(leaderId, testActorPath(leaderId))).config(followerConfigParams)
+ .persistent(Optional.of(false)));
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).build();
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
- leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
- config(leaderConfigParams).persistent(Optional.of(false)));
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
+ .config(leaderConfigParams).persistent(Optional.of(false)));
followerInstance = follower1Actor.underlyingActor();
follower1CollectorActor = followerInstance.collectorActor();
// Verify leader's context after startup
- currentTerm = initialTerm + 1;
+ currentTerm = persistedTerm + 1;
assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
assertEquals("Leader server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
Sets.newHashSet(leaderContext.getPeerServerInfo(true).getServerConfig()));
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.Props;
assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole());
assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole());
- long previousTerm = currentTerm;
+ final long previousTerm = currentTerm;
currentTerm = follower1Context.getTermInformation().getCurrentTerm();
// Since it went to Leader, it should've appended and successfully replicated a NoopPaylod with the
killActor(follower1Actor);
follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
- config(followerConfigParams));
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
+ .config(followerConfigParams));
follower1Actor.underlyingActor().waitForRecoveryComplete();
follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
- config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
+ .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
follower1Id, testActorPath(follower1Id)), followerConfigParams);
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).build();
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
leaderConfigParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
+
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.google.common.collect.ImmutableMap;
private static void verifyPeerInfo(RaftActorContextImpl context, String peerId, Boolean voting) {
PeerInfo peerInfo = context.getPeerInfo(peerId);
- if(voting != null) {
+ if (voting != null) {
assertNotNull("Expected peer " + peerId, peerInfo);
- assertEquals("getVotingState for " + peerId, voting.booleanValue() ? VotingState.VOTING : VotingState.NON_VOTING,
- peerInfo.getVotingState());
+ assertEquals("getVotingState for " + peerId, voting.booleanValue()
+ ? VotingState.VOTING : VotingState.NON_VOTING, peerInfo.getVotingState());
} else {
assertNull("Unexpected peer " + peerId, peerInfo);
}
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
import akka.japi.Procedure;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+
import akka.dispatch.Dispatchers;
import com.google.common.base.Function;
import org.junit.After;
private void setup(String testName) {
String persistenceId = factory.generateActorId(testName + "-leader-");
- mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(
- config).pauseLeaderFunction(pauseLeaderFunction).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(config)
+ .pauseLeaderFunction(pauseLeaderFunction).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
persistenceId).underlyingActor();
cohort = new RaftActorLeadershipTransferCohort(mockRaftActor);
cohort.addOnComplete(onComplete);
@Test
public void testPauseLeaderTimeout() {
- pauseLeaderFunction = new Function<Runnable, Void>() {
- @Override
- public Void apply(Runnable input) {
- return null;
- }
- };
+ pauseLeaderFunction = input -> null;
setup("testPauseLeaderTimeout");
cohort.init();
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
public void setup() {
MockitoAnnotations.initMocks(this);
- context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
- -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+ context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
+ LOG), -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
support = new RaftActorRecoverySupport(context, mockCohort);
InOrder inOrder = Mockito.inOrder(mockCohort);
inOrder.verify(mockCohort).startLogRecoveryBatch(5);
- for(int i = 0; i < replicatedLog.size() - 1; i++) {
+ for (int i = 0; i < replicatedLog.size() - 1; i++) {
inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
}
InOrder inOrder = Mockito.inOrder(mockCohort);
inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
- for(int i = 0; i < replicatedLog.size(); i++) {
+ for (int i = 0; i < replicatedLog.size(); i++) {
inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
}
assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
}
- @SuppressWarnings("unchecked")
@Test
public void testDataRecoveredWithPersistenceDisabled() {
doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
InMemorySnapshotStore.clear();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void setupNewFollower() {
DefaultConfigParamsImpl configParams = newFollowerConfigParams();
clearMessages(followerActor);
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
- TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+ final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
- assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
+ assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
+ newFollowerActorContext.getPeerIds());
expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
expectFirstMatching(followerActor, ApplyState.class);
actorFactory.generateActorId(LEADER_ID));
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
- RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
- TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+ final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
actorFactory.generateActorId(LEADER_ID));
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
- RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
- TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+ final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
actorFactory.generateActorId(LEADER_ID));
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
- RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
- TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+ final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
Follower newFollower2 = new Follower(follower2ActorContext);
actorFactory.generateActorId(LEADER_ID));
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
- RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
- TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+ final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
// Drop the commit message so the snapshot doesn't complete yet.
leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
- UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
+ final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
UnInitializedFollowerSnapshotReply.class);
// Prevent election timeout when the leader switches to follower
TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
- followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
- props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
+ .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
- noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+ noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
+ testKit.getRef());
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
actorFactory.generateActorId(LEADER_ID));
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
- RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
- TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+ final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
// Drop UnInitializedFollowerSnapshotReply initially
leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
- TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
- newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
+ newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
// Drop AppendEntries to the new follower so consensus isn't reached
newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
- leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
- props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
+ .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(FOLLOWER_ID));
followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
-1, -1, (short)0), leaderActor);
- followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+ followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
+ testKit.getRef());
expectFirstMatching(leaderActor, AddServer.class);
LOG.info("testAddServerForwardedToLeader ending");
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
- followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
- props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
+ .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
- RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
+ RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
+ noLeaderActor.underlyingActor());
ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
- followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
- props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
+ .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
- RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+ RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
+ RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
LOG.info("testRemoveServerWithNoLeader ending");
actorFactory.generateActorId(LEADER_ID));
leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
- RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+ RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
+ RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
LOG.info("testRemoveServerNonExistentServer ending");
TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
- leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
- props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
+ .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(FOLLOWER_ID));
followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
- TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
+ final TestActorRef<MessageCollectorActor> leaderCollector =
+ newLeaderCollectorActor(leaderActor.underlyingActor());
- TestActorRef<MessageCollectorActor> collector =
- actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ TestActorRef<MessageCollectorActor> collector = actorFactory.createTestActor(MessageCollectorActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
+ actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
followerActorId);
leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
- RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+ RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
+ RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
- verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
+ verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
+ votingServer(LEADER_ID));
RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
assertTrue("Expected Leader", currentBehavior instanceof Leader);
initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
- TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
+ final TestActorRef<MessageCollectorActor> leaderCollector =
+ newLeaderCollectorActor(leaderActor.underlyingActor());
- TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
+ final TestActorRef<MessageCollectorActor> followerCollector =
+ actorFactory.createTestActor(MessageCollectorActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
- configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ configParams, NO_PERSISTENCE, followerCollector)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()),
followerActorId);
leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
- RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+ RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
+ RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
actorFactory.generateActorId(LEADER_ID));
leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
- RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+ RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
+ RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
LOG.info("testRemoveServerLeaderWithNoFollowers ending");
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
- FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
- withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
+ FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
+ final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
- withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
+ FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
+ final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
- withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
+ FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
// Send first ChangeServersVotingStatus message
votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
- verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
- votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
+ verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
+ .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
+ nonVotingServer(FOLLOWER_ID2));
MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
- verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
- votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
+ verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
+ .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
+ nonVotingServer(FOLLOWER_ID2));
MessageCollectorActor.clearMessages(leaderCollector);
MessageCollectorActor.clearMessages(follower1Collector);
votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
- verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
- votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
+ verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
+ .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
- verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
- votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
+ verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
+ .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
LOG.info("testChangeServersVotingStatus ending");
}
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
- FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
- withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
+ FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
+ final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
- withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
+ FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
+ final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
- FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
- withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
+ FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
// Send ChangeServersVotingStatus message
nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
- verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
- nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
+ verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
+ .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
- verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
- nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
+ verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
+ .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
- MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
- withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
+ MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
- ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
+ ApplyJournalEntries.class);
assertEquals("getToIndex", 1, apply.getToIndex());
verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
final String node1ID = "node1";
final String node2ID = "node2";
- PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
- peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+ final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
+ ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
+ ? actorFactory.createTestActorPath(node2ID) : null;
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
- CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+ final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
configParams2.setElectionTimeoutFactor(1000000);
final String node1ID = "node1";
final String node2ID = "node2";
- PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
- peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+ final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
+ ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
+ ? actorFactory.createTestActorPath(node2ID) : null;
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
- CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+ final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
- CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+ final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
// Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
// node1 to try to elect itself as leader in order to apply the new server config. However node1's log
final String node1ID = "node1";
final String node2ID = "node2";
- configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
- peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
+ configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
+ ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
+ ? actorFactory.createTestActorPath(node2ID) : null);
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
- CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+ final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
- for(RaftActor raftActor: raftActors) {
- if(raftActor.getRaftState() == expState) {
+ while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ for (RaftActor raftActor : raftActors) {
+ if (raftActor.getRaftState() == expState) {
return;
}
}
id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
}
- static abstract class AbstractMockRaftActor extends MockRaftActor {
+ abstract static class AbstractMockRaftActor extends MockRaftActor {
private volatile TestActorRef<MessageCollectorActor> collectorActor;
private volatile Class<?> dropMessageOfType;
AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
- super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
- persistent(Optional.of(persistent)));
+ super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
+ .persistent(Optional.of(persistent)));
this.collectorActor = collectorActor;
}
@Override
public void handleCommand(Object message) {
- if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
+ if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
super.handleCommand(message);
}
- if(collectorActor != null) {
+ if (collectorActor != null) {
collectorActor.tell(message, getSender());
}
}
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
+ ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
persistent, collectorActor);
setPersistence(false);
RaftActorContext context = getRaftActorContext();
- for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
+ for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
getState().add(entry.getData());
context.getReplicatedLog().append(entry);
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void createSnapshot(ActorRef actorRef) {
try {
actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
- super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
+ super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
+ collectorActor);
setPersistence(false);
}
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
+
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
private TestActorFactory factory;
@Before
- public void setUp(){
+ public void setUp() {
factory = new TestActorFactory(getSystem());
}
}
@Test
- public void testFindLeaderWhenLeaderIsSelf(){
+ public void testFindLeaderWhenLeaderIsSelf() {
RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
kit.waitUntilLeader();
}
public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("follower-");
+ JavaTestKit kit = new JavaTestKit(getSystem());
+ String persistenceId = factory.generateActorId("follower-");
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- // Set the heartbeat interval high to essentially disable election otherwise the test
- // may fail if the actor is switched to Leader and the commitIndex is set to the last
- // log entry.
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ // Set the heartbeat interval high to essentially disable election otherwise the test
+ // may fail if the actor is switched to Leader and the commitIndex is set to the last
+ // log entry.
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
- ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
- peerAddresses, config), persistenceId);
+ ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder()
+ .put("member1", "address").build();
+ ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
+ peerAddresses, config), persistenceId);
- watch(followerActor);
+ kit.watch(followerActor);
- List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
- ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
- new MockRaftActorContext.MockPayload("E"));
- snapshotUnappliedEntries.add(entry1);
+ List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+ new MockRaftActorContext.MockPayload("E"));
+ snapshotUnappliedEntries.add(entry1);
- int lastAppliedDuringSnapshotCapture = 3;
- int lastIndexDuringSnapshotCapture = 4;
+ int lastAppliedDuringSnapshotCapture = 3;
+ int lastIndexDuringSnapshotCapture = 4;
- // 4 messages as part of snapshot, which are applied to state
- ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
+ // 4 messages as part of snapshot, which are applied to state
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+ snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
+ lastAppliedDuringSnapshotCapture, 1);
+ InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
+
+ // add more entries after snapshot is taken
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("F", 2));
+ ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("G", 3));
+ ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("H", 4));
+ entries.add(entry2);
+ entries.add(entry3);
+ entries.add(entry4);
+
+ final int lastAppliedToState = 5;
+ final int lastIndex = 7;
+
+ InMemoryJournal.addEntry(persistenceId, 5, entry2);
+ // 2 entries are applied to state besides the 4 entries in snapshot
+ InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
+ InMemoryJournal.addEntry(persistenceId, 7, entry3);
+ InMemoryJournal.addEntry(persistenceId, 8, entry4);
+
+ // kill the actor
+ followerActor.tell(PoisonPill.getInstance(), null);
+ kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+
+ kit.unwatch(followerActor);
+
+ //reinstate the actor
+ TestActorRef<MockRaftActor> ref = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses, config));
+
+ MockRaftActor mockRaftActor = ref.underlyingActor();
- Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
- snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
- lastAppliedDuringSnapshotCapture, 1);
- InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
+ mockRaftActor.waitForRecoveryComplete();
- // add more entries after snapshot is taken
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
- new MockRaftActorContext.MockPayload("F", 2));
- ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
- new MockRaftActorContext.MockPayload("G", 3));
- ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
- new MockRaftActorContext.MockPayload("H", 4));
- entries.add(entry2);
- entries.add(entry3);
- entries.add(entry4);
+ RaftActorContext context = mockRaftActor.getRaftActorContext();
+ assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+ context.getReplicatedLog().size());
+ assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
+ assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+ assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
- int lastAppliedToState = 5;
- int lastIndex = 7;
+ mockRaftActor.waitForInitializeBehaviorComplete();
- InMemoryJournal.addEntry(persistenceId, 5, entry2);
- // 2 entries are applied to state besides the 4 entries in snapshot
- InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
- InMemoryJournal.addEntry(persistenceId, 7, entry3);
- InMemoryJournal.addEntry(persistenceId, 8, entry4);
+ assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
- // kill the actor
- followerActor.tell(PoisonPill.getInstance(), null);
- expectMsgClass(duration("5 seconds"), Terminated.class);
+ TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
+ }
- unwatch(followerActor);
+ @Test
+ public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
+ String persistenceId = factory.generateActorId("follower-");
- //reinstate the actor
- TestActorRef<MockRaftActor> ref = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config));
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- MockRaftActor mockRaftActor = ref.underlyingActor();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- mockRaftActor.waitForRecoveryComplete();
+ TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.<String, String>builder().put("member1", "address").build(),
+ config, new NonPersistentDataProvider()), persistenceId);
- RaftActorContext context = mockRaftActor.getRaftActorContext();
- assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
- context.getReplicatedLog().size());
- assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
- assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
- assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
- assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
- assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
+ MockRaftActor mockRaftActor = ref.underlyingActor();
- mockRaftActor.waitForInitializeBehaviorComplete();
+ mockRaftActor.waitForRecoveryComplete();
- assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
- }};
+ mockRaftActor.waitForInitializeBehaviorComplete();
- TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
+ assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
}
@Test
- public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("follower-");
+ public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
+ final JavaTestKit kit = new JavaTestKit(getSystem());
+ String persistenceId = factory.generateActorId("follower-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.<String, String>builder().put("member1", "address").build(),
+ config, new NonPersistentDataProvider())
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
- TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
- ImmutableMap.<String, String>builder().put("member1", "address").build(),
- config, new NonPersistentDataProvider()), persistenceId);
+ InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
+ List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
+ assertEquals("UpdateElectionTerm entries", 1, entries.size());
+ final UpdateElectionTerm updateEntry = entries.get(0);
- MockRaftActor mockRaftActor = ref.underlyingActor();
+ factory.killActor(ref, kit);
- mockRaftActor.waitForRecoveryComplete();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
+ new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ factory.generateActorId("follower-"));
- mockRaftActor.waitForInitializeBehaviorComplete();
+ MockRaftActor actor = ref.underlyingActor();
+ actor.waitForRecoveryComplete();
- assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
- }};
- }
+ RaftActorContext newContext = actor.getRaftActorContext();
+ assertEquals("electionTerm", updateEntry.getCurrentTerm(),
+ newContext.getTermInformation().getCurrentTerm());
+ assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
- @Test
- public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("follower-");
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
- config.setElectionTimeoutFactor(1);
-
- InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
-
- TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
- ImmutableMap.<String, String>builder().put("member1", "address").build(),
- config, new NonPersistentDataProvider()).
- withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
-
- InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
- List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
- assertEquals("UpdateElectionTerm entries", 1, entries.size());
- UpdateElectionTerm updateEntry = entries.get(0);
-
- factory.killActor(ref, this);
-
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- ref = factory.createTestActor(MockRaftActor.props(persistenceId,
- ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
- new NonPersistentDataProvider()).
- withDispatcher(Dispatchers.DefaultDispatcherId()),
- factory.generateActorId("follower-"));
-
- MockRaftActor actor = ref.underlyingActor();
- actor.waitForRecoveryComplete();
-
- RaftActorContext newContext = actor.getRaftActorContext();
- assertEquals("electionTerm", updateEntry.getCurrentTerm(),
- newContext.getTermInformation().getCurrentTerm());
- assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
-
- entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
- assertEquals("UpdateElectionTerm entries", 1, entries.size());
- }};
+ entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
+ assertEquals("UpdateElectionTerm entries", 1, entries.size());
}
@Test
RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
- config(config).snapshotMessageSupport(mockSupport).props());
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
+ .config(config).snapshotMessageSupport(mockSupport).props());
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotSuccess);
- SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable());
+ SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L),
+ new Throwable());
doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotFailure);
verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
}
+ @SuppressWarnings("unchecked")
@Test
public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ String persistenceId = factory.generateActorId("leader-");
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
- mockRaftActor.waitForInitializeBehaviorComplete();
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+ Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
- mockRaftActor.waitUntilLeader();
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
- mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
+ mockRaftActor.waitForInitializeBehaviorComplete();
- verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
+ mockRaftActor.waitUntilLeader();
- }
+ mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
- };
+ verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
}
@Test
public void testApplyState() throws Exception {
+ String persistenceId = factory.generateActorId("leader-");
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+ Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
- mockRaftActor.waitForInitializeBehaviorComplete();
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
- ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
- new MockRaftActorContext.MockPayload("F"));
+ mockRaftActor.waitForInitializeBehaviorComplete();
- final Identifier id = new MockIdentifier("apply-state");
- mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
+ ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("F"));
- verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
+ final Identifier id = new MockIdentifier("apply-state");
+ mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
- }
- };
+ verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
}
@Test
public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
- new JavaTestKit(getSystem()) {{
- TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
- Props.create(MessageCollectorActor.class));
- MessageCollectorActor.waitUntilReady(notifierActor);
+ TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+ Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- long heartBeatInterval = 100;
- config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
- config.setElectionTimeoutFactor(20);
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(20);
- String persistenceId = factory.generateActorId("notifier-");
+ String persistenceId = factory.generateActorId("notifier-");
- TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
- config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
- new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- persistenceId);
+ final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
+ .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
+ new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ persistenceId);
- List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
+ List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
- // check if the notifier got a role change from null to Follower
- RoleChanged raftRoleChanged = matches.get(0);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertNull(raftRoleChanged.getOldRole());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+ // check if the notifier got a role change from null to Follower
+ RoleChanged raftRoleChanged = matches.get(0);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertNull(raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
- // check if the notifier got a role change from Follower to Candidate
- raftRoleChanged = matches.get(1);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+ // check if the notifier got a role change from Follower to Candidate
+ raftRoleChanged = matches.get(1);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
- // check if the notifier got a role change from Candidate to Leader
- raftRoleChanged = matches.get(2);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+ // check if the notifier got a role change from Candidate to Leader
+ raftRoleChanged = matches.get(2);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
- LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
- notifierActor, LeaderStateChanged.class);
+ LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+ notifierActor, LeaderStateChanged.class);
- assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
- assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
+ assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+ assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
- notifierActor.underlyingActor().clear();
+ notifierActor.underlyingActor().clear();
- MockRaftActor raftActor = raftActorRef.underlyingActor();
- final String newLeaderId = "new-leader";
- final short newLeaderVersion = 6;
- Follower follower = new Follower(raftActor.getRaftActorContext()) {
- @Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
- setLeaderId(newLeaderId);
- setLeaderPayloadVersion(newLeaderVersion);
- return this;
- }
- };
+ MockRaftActor raftActor = raftActorRef.underlyingActor();
+ final String newLeaderId = "new-leader";
+ final short newLeaderVersion = 6;
+ Follower follower = new Follower(raftActor.getRaftActorContext()) {
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ setLeaderId(newLeaderId);
+ setLeaderPayloadVersion(newLeaderVersion);
+ return this;
+ }
+ };
- raftActor.newBehavior(follower);
+ raftActor.newBehavior(follower);
- leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
- assertEquals(persistenceId, leaderStateChange.getMemberId());
- assertEquals(null, leaderStateChange.getLeaderId());
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(null, leaderStateChange.getLeaderId());
- raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
- assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+ raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
- notifierActor.underlyingActor().clear();
+ notifierActor.underlyingActor().clear();
- raftActor.handleCommand("any");
+ raftActor.handleCommand("any");
- leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
- assertEquals(persistenceId, leaderStateChange.getMemberId());
- assertEquals(newLeaderId, leaderStateChange.getLeaderId());
- assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(newLeaderId, leaderStateChange.getLeaderId());
+ assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
- notifierActor.underlyingActor().clear();
+ notifierActor.underlyingActor().clear();
- raftActor.handleCommand("any");
+ raftActor.handleCommand("any");
- Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
- leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
- assertNull(leaderStateChange);
- }};
+ Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
+ leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertNull(leaderStateChange);
}
@Test
public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
- MessageCollectorActor.waitUntilReady(notifierActor);
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- long heartBeatInterval = 100;
- config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
- config.setElectionTimeoutFactor(1);
-
- String persistenceId = factory.generateActorId("notifier-");
-
- factory.createActor(MockRaftActor.builder().id(persistenceId).
- peerAddresses(ImmutableMap.of("leader", "fake/path")).
- config(config).roleChangeNotifier(notifierActor).props());
-
- List<RoleChanged> matches = null;
- for(int i = 0; i < 5000 / heartBeatInterval; i++) {
- matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
- if(matches.size() == 3) {
- break;
- }
- Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
- }
+ ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
+ String persistenceId = factory.generateActorId("notifier-");
+
+ factory.createActor(MockRaftActor.builder().id(persistenceId)
+ .peerAddresses(ImmutableMap.of("leader", "fake/path"))
+ .config(config).roleChangeNotifier(notifierActor).props());
+
+ List<RoleChanged> matches = null;
+ for (int i = 0; i < 5000 / heartBeatInterval; i++) {
+ matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
assertNotNull(matches);
- assertEquals(2, matches.size());
+ if (matches.size() == 3) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+ }
- // check if the notifier got a role change from null to Follower
- RoleChanged raftRoleChanged = matches.get(0);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertNull(raftRoleChanged.getOldRole());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+ assertNotNull(matches);
+ assertEquals(2, matches.size());
- // check if the notifier got a role change from Follower to Candidate
- raftRoleChanged = matches.get(1);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+ // check if the notifier got a role change from null to Follower
+ RoleChanged raftRoleChanged = matches.get(0);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertNull(raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
- }};
+ // check if the notifier got a role change from Follower to Candidate
+ raftRoleChanged = matches.get(1);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
}
@Test
public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
- String follower1Id = factory.generateActorId("follower-");
-
- ActorRef followerActor1 =
- factory.createActor(Props.create(MessageCollectorActor.class));
+ final String persistenceId = factory.generateActorId("leader-");
+ final String follower1Id = factory.generateActorId("follower-");
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ ActorRef followerActor1 =
+ factory.createActor(Props.create(MessageCollectorActor.class));
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1Id, followerActor1.path().toString());
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1Id, followerActor1.path().toString());
- MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
- leaderActor.getRaftActorContext().setCommitIndex(4);
- leaderActor.getRaftActorContext().setLastApplied(4);
- leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
- leaderActor.waitForInitializeBehaviorComplete();
+ leaderActor.getRaftActorContext().setCommitIndex(4);
+ leaderActor.getRaftActorContext().setLastApplied(4);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
- // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+ leaderActor.waitForInitializeBehaviorComplete();
- Leader leader = new Leader(leaderActor.getRaftActorContext());
- leaderActor.setCurrentBehavior(leader);
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
- MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
- leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- assertEquals(8, leaderActor.getReplicatedLog().size());
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
- leaderActor.getRaftActorContext().getSnapshotManager()
- .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
- new MockRaftActorContext.MockPayload("x")), 4);
+ assertEquals(8, leaderActor.getReplicatedLog().size());
- verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
+ leaderActor.getRaftActorContext().getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4);
- assertEquals(8, leaderActor.getReplicatedLog().size());
+ verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- //fake snapshot on index 5
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
+ assertEquals(8, leaderActor.getReplicatedLog().size());
- assertEquals(8, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ //fake snapshot on index 5
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
- //fake snapshot on index 6
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
- assertEquals(8, leaderActor.getReplicatedLog().size());
+ assertEquals(8, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ //fake snapshot on index 6
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
+ assertEquals(8, leaderActor.getReplicatedLog().size());
- assertEquals(8, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("foo-0"),
- new MockRaftActorContext.MockPayload("foo-1"),
- new MockRaftActorContext.MockPayload("foo-2"),
- new MockRaftActorContext.MockPayload("foo-3"),
- new MockRaftActorContext.MockPayload("foo-4")));
+ assertEquals(8, leaderActor.getReplicatedLog().size());
- leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
- Runtime.getRuntime().totalMemory());
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
- assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
+ Runtime.getRuntime().totalMemory());
- // The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
+ assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
- // capture snapshot reply should remove the snapshotted entries only
- assertEquals(3, leaderActor.getReplicatedLog().size());
- assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
+ // The commit is needed to complete the snapshot creation process
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
- // add another non-replicated entry
- leaderActor.getReplicatedLog().append(
- new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+ // capture snapshot reply should remove the snapshotted entries only
+ assertEquals(3, leaderActor.getReplicatedLog().size());
+ assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
- //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
- assertEquals(2, leaderActor.getReplicatedLog().size());
- assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
+ // add another non-replicated entry
+ leaderActor.getReplicatedLog().append(
+ new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
- }
- };
+ //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
+ assertEquals(2, leaderActor.getReplicatedLog().size());
+ assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
}
@Test
public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("follower-");
- String leaderId = factory.generateActorId("leader-");
+ final String persistenceId = factory.generateActorId("follower-");
+ final String leaderId = factory.generateActorId("leader-");
- ActorRef leaderActor1 =
- factory.createActor(Props.create(MessageCollectorActor.class));
+ ActorRef leaderActor1 =
+ factory.createActor(Props.create(MessageCollectorActor.class));
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(leaderId, leaderActor1.path().toString());
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(leaderId, leaderActor1.path().toString());
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
- MockRaftActor followerActor = mockActorRef.underlyingActor();
- followerActor.getRaftActorContext().setCommitIndex(4);
- followerActor.getRaftActorContext().setLastApplied(4);
- followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+ MockRaftActor followerActor = mockActorRef.underlyingActor();
+ followerActor.getRaftActorContext().setCommitIndex(4);
+ followerActor.getRaftActorContext().setLastApplied(4);
+ followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
- followerActor.waitForInitializeBehaviorComplete();
+ followerActor.waitForInitializeBehaviorComplete();
- Follower follower = new Follower(followerActor.getRaftActorContext());
- followerActor.setCurrentBehavior(follower);
- assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+ Follower follower = new Follower(followerActor.getRaftActorContext());
+ followerActor.setCurrentBehavior(follower);
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
- // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
- MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
- followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
+ // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
- // log has indices 0-5
- assertEquals(6, followerActor.getReplicatedLog().size());
+ // log has indices 0-5
+ assertEquals(6, followerActor.getReplicatedLog().size());
- //snapshot on 4
- followerActor.getRaftActorContext().getSnapshotManager().capture(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
- new MockRaftActorContext.MockPayload("D")), 4);
+ //snapshot on 4
+ followerActor.getRaftActorContext().getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("D")), 4);
- verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
+ verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
- assertEquals(6, followerActor.getReplicatedLog().size());
+ assertEquals(6, followerActor.getReplicatedLog().size());
- //fake snapshot on index 6
- List<ReplicatedLogEntry> entries =
- Arrays.asList(
- (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
- new MockRaftActorContext.MockPayload("foo-6"))
+ //fake snapshot on index 6
+ List<ReplicatedLogEntry> entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("foo-6"))
);
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
- assertEquals(7, followerActor.getReplicatedLog().size());
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
+ assertEquals(7, followerActor.getReplicatedLog().size());
- //fake snapshot on index 7
- assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+ //fake snapshot on index 7
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
- entries =
- Arrays.asList(
- (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
- new MockRaftActorContext.MockPayload("foo-7"))
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("foo-7"))
);
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
- assertEquals(8, followerActor.getReplicatedLog().size());
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
+ assertEquals(8, followerActor.getReplicatedLog().size());
- assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
- ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("foo-0"),
- new MockRaftActorContext.MockPayload("foo-1"),
- new MockRaftActorContext.MockPayload("foo-2"),
- new MockRaftActorContext.MockPayload("foo-3"),
- new MockRaftActorContext.MockPayload("foo-4")));
- followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
- // The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
+ // The commit is needed to complete the snapshot creation process
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
- // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
- assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
- assertEquals(7, followerActor.getReplicatedLog().lastIndex());
+ // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
+ assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
+ assertEquals(7, followerActor.getReplicatedLog().lastIndex());
- entries =
- Arrays.asList(
- (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
- new MockRaftActorContext.MockPayload("foo-7"))
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
+ new MockRaftActorContext.MockPayload("foo-7"))
);
- // send an additional entry 8 with leaderCommit = 7
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
+ // send an additional entry 8 with leaderCommit = 7
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
- // 7 and 8, as lastapplied is 7
- assertEquals(2, followerActor.getReplicatedLog().size());
-
- }
- };
+ // 7 and 8, as lastapplied is 7
+ assertEquals(2, followerActor.getReplicatedLog().size());
}
@Test
public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
- String follower1Id = factory.generateActorId("follower-");
- String follower2Id = factory.generateActorId("follower-");
-
- ActorRef followerActor1 =
- factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
- ActorRef followerActor2 =
- factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1Id, followerActor1.path().toString());
- peerAddresses.put(follower2Id, followerActor2.path().toString());
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
-
- MockRaftActor leaderActor = mockActorRef.underlyingActor();
- leaderActor.getRaftActorContext().setCommitIndex(9);
- leaderActor.getRaftActorContext().setLastApplied(9);
- leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
-
- leaderActor.waitForInitializeBehaviorComplete();
-
- Leader leader = new Leader(leaderActor.getRaftActorContext());
- leaderActor.setCurrentBehavior(leader);
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // create 5 entries in the log
- MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
- leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
-
- //set the snapshot index to 4 , 0 to 4 are snapshotted
- leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
- //setting replicatedToAllIndex = 9, for the log to clear
- leader.setReplicatedToAllIndex(9);
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // set the 2nd follower nextIndex to 1 which has been snapshotted
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // simulate a real snapshot
- leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
- leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
- , RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
-
- //reply from a slow follower does not initiate a fake snapshot
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
- assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
-
- ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("foo-0"),
- new MockRaftActorContext.MockPayload("foo-1"),
- new MockRaftActorContext.MockPayload("foo-2"),
- new MockRaftActorContext.MockPayload("foo-3"),
- new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
-
- assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
-
- //reply from a slow follower after should not raise errors
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
- assertEquals(0, leaderActor.getReplicatedLog().size());
- }
- };
+ final String persistenceId = factory.generateActorId("leader-");
+ final String follower1Id = factory.generateActorId("follower-");
+ final String follower2Id = factory.generateActorId("follower-");
+
+ final ActorRef followerActor1 = factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
+ final ActorRef followerActor2 = factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1Id, followerActor1.path().toString());
+ peerAddresses.put(follower2Id, followerActor2.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(9);
+ leaderActor.getRaftActorContext().setLastApplied(9);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // create 5 entries in the log
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
+ //set the snapshot index to 4 , 0 to 4 are snapshotted
+ leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+ //setting replicatedToAllIndex = 9, for the log to clear
+ leader.setReplicatedToAllIndex(9);
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // set the 2nd follower nextIndex to 1 which has been snapshotted
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // simulate a real snapshot
+ leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
+ leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()),
+ RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+
+ //reply from a slow follower does not initiate a fake snapshot
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
+ assertEquals("Fake snapshot should not happen when Initiate is in progress", 5,
+ leaderActor.getReplicatedLog().size());
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
+ leaderActor.getReplicatedLog().size());
+
+ //reply from a slow follower after should not raise errors
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
+ assertEquals(0, leaderActor.getReplicatedLog().size());
}
@Test
public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("leader-");
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setSnapshotBatchCount(5);
-
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
-
- Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
-
- MockRaftActor leaderActor = mockActorRef.underlyingActor();
- leaderActor.getRaftActorContext().setCommitIndex(3);
- leaderActor.getRaftActorContext().setLastApplied(3);
- leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
-
- leaderActor.waitForInitializeBehaviorComplete();
- for(int i=0;i< 4;i++) {
- leaderActor.getReplicatedLog()
- .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
- new MockRaftActorContext.MockPayload("A")));
- }
+ String persistenceId = factory.generateActorId("leader-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setSnapshotBatchCount(5);
+
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
- Leader leader = new Leader(leaderActor.getRaftActorContext());
- leaderActor.setCurrentBehavior(leader);
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(3);
+ leaderActor.getRaftActorContext().setLastApplied(3);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+ for (int i = 0; i < 4; i++) {
+ leaderActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
+ new MockRaftActorContext.MockPayload("A")));
+ }
- // Simulate an install snaphost to a follower.
- leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
- leaderActor.getReplicatedLog().last(), -1, "member1");
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- // Now send a CaptureSnapshotReply
- mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+ // Simulate an install snaphost to a follower.
+ leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
+ leaderActor.getReplicatedLog().last(), -1, "member1");
- // Trimming log in this scenario is a no-op
- assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
- assertEquals(-1, leader.getReplicatedToAllIndex());
+ // Now send a CaptureSnapshotReply
+ mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
- }};
+ // Trimming log in this scenario is a no-op
+ assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
+ assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+ assertEquals(-1, leader.getReplicatedToAllIndex());
}
@Test
public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("leader-");
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setSnapshotBatchCount(5);
-
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
-
- Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
-
- MockRaftActor leaderActor = mockActorRef.underlyingActor();
- leaderActor.getRaftActorContext().setCommitIndex(3);
- leaderActor.getRaftActorContext().setLastApplied(3);
- leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
- leaderActor.getReplicatedLog().setSnapshotIndex(3);
-
- leaderActor.waitForInitializeBehaviorComplete();
- Leader leader = new Leader(leaderActor.getRaftActorContext());
- leaderActor.setCurrentBehavior(leader);
- leader.setReplicatedToAllIndex(3);
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // Persist another entry (this will cause a CaptureSnapshot to be triggered
- leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
- new MockRaftActorContext.MockPayload("duh"));
+ String persistenceId = factory.generateActorId("leader-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setSnapshotBatchCount(5);
- // Now send a CaptureSnapshotReply
- mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
- // Trimming log in this scenario is a no-op
- assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
- assertEquals(3, leader.getReplicatedToAllIndex());
+ Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
- }};
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(3);
+ leaderActor.getRaftActorContext().setLastApplied(3);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+ leaderActor.getReplicatedLog().setSnapshotIndex(3);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ leader.setReplicatedToAllIndex(3);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // Persist another entry (this will cause a CaptureSnapshot to be triggered
+ leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
+ new MockRaftActorContext.MockPayload("duh"));
+
+ // Now send a CaptureSnapshotReply
+ mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+
+ // Trimming log in this scenario is a no-op
+ assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
+ assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+ assertEquals(3, leader.getReplicatedToAllIndex());
}
@Test
- public void testSwitchBehavior(){
+ public void testSwitchBehavior() {
String persistenceId = factory.generateActorId("leader-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
}
public static ByteString fromObject(Object snapshot) throws Exception {
- ByteArrayOutputStream b = null;
- ObjectOutputStream o = null;
+ ByteArrayOutputStream bos = null;
+ ObjectOutputStream os = null;
try {
- b = new ByteArrayOutputStream();
- o = new ObjectOutputStream(b);
- o.writeObject(snapshot);
- byte[] snapshotBytes = b.toByteArray();
+ bos = new ByteArrayOutputStream();
+ os = new ObjectOutputStream(bos);
+ os.writeObject(snapshot);
+ byte[] snapshotBytes = bos.toByteArray();
return ByteString.copyFrom(snapshotBytes);
} finally {
- if (o != null) {
- o.flush();
- o.close();
+ if (os != null) {
+ os.flush();
+ os.close();
}
- if (b != null) {
- b.close();
+ if (bos != null) {
+ bos.close();
}
}
}
public void testGetSnapshot() throws Exception {
TEST_LOG.info("testGetSnapshot starting");
- JavaTestKit kit = new JavaTestKit(getSystem());
+ final JavaTestKit kit = new JavaTestKit(getSystem());
String persistenceId = factory.generateActorId("test-actor-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
new MockRaftActorContext.MockPayload("C")));
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- ImmutableMap.<String, String>builder().put("member1", "address").build(), config).
- withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ ImmutableMap.<String, String>builder().put("member1", "address").build(), config)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForRecoveryComplete();
// Test with timeout
- mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
+ mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
+ Duration.create(200, TimeUnit.MILLISECONDS));
reset(mockRaftActor.snapshotCohortDelegate);
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
- TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
- config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
+ .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForRecoveryComplete();
persistenceId = factory.generateActorId("test-actor-");
- raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
- config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).
- persistent(Optional.of(Boolean.FALSE)).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
+ .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot))
+ .persistent(Optional.of(Boolean.FALSE)).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForRecoveryComplete();
InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
new MockRaftActorContext.MockPayload("B")));
- TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
- config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
+ .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForRecoveryComplete();
InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
- TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
- config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
+ .config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForInitializeBehaviorComplete();
String persistenceId = factory.generateActorId("test-actor-");
- TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
- config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
+ .config(config).roleChangeNotifier(notifierActor).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
mockRaftActor.waitForInitializeBehaviorComplete();
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
return raftActor;
}
- public boolean waitForLogMessage(final Class<?> logEventClass, String message){
+ public boolean waitForLogMessage(final Class<?> logEventClass, String message) {
// Wait for a specific log message to show up
return
new JavaTestKit.EventFilter<Boolean>(logEventClass
}
- protected void waitUntilLeader(){
+ protected void waitUntilLeader() {
waitUntilLeader(raftActor);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public static void waitUntilLeader(ActorRef actorRef) {
FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
- for(int i = 0; i < 20 * 5; i++) {
+ for (int i = 0; i < 20 * 5; i++) {
Future<Object> future = Patterns.ask(actorRef, FindLeader.INSTANCE, new Timeout(duration));
try {
final Optional<String> maybeLeader = ((FindLeaderReply)Await.result(future, duration)).getLeaderActor();
if (maybeLeader.isPresent()) {
return;
}
- } catch(TimeoutException e) {
- } catch(Exception e) {
+ } catch (Exception e) {
LOG.error("FindLeader failed", e);
}
Assert.fail("Leader not found for actorRef " + actorRef.path());
}
-}
\ No newline at end of file
+}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.TestActorRef;
import org.slf4j.LoggerFactory;
/**
- * Recovery Integration Test for single node
+ * Recovery Integration Test for single node.
*/
public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrationTest {
public void testJournalReplayAfterSnapshotWithSingleNode() throws Exception {
String persistenceId = factory.generateActorId("singleNode");
- TestActorRef<AbstractRaftActorIntegrationTest.TestRaftActor> singleNodeActorRef = newTestRaftActor(persistenceId,
- ImmutableMap.<String, String>builder().build(), leaderConfigParams);
+ TestActorRef<AbstractRaftActorIntegrationTest.TestRaftActor> singleNodeActorRef =
+ newTestRaftActor(persistenceId, ImmutableMap.<String, String>builder().build(), leaderConfigParams);
waitUntilLeader(singleNodeActorRef);
ActorRef singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor();
- RaftActorContext singleNodeContext = singleNodeActorRef.underlyingActor().getRaftActorContext();
+ final RaftActorContext singleNodeContext = singleNodeActorRef.underlyingActor().getRaftActorContext();
- MockRaftActorContext.MockPayload payload0 = sendPayloadData(singleNodeActorRef, "zero");
- MockRaftActorContext.MockPayload payload1 = sendPayloadData(singleNodeActorRef, "one");
- MockRaftActorContext.MockPayload payload2 = sendPayloadData(singleNodeActorRef, "two");
+ final MockRaftActorContext.MockPayload payload0 = sendPayloadData(singleNodeActorRef, "zero");
+ final MockRaftActorContext.MockPayload payload1 = sendPayloadData(singleNodeActorRef, "one");
+ final MockRaftActorContext.MockPayload payload2 = sendPayloadData(singleNodeActorRef, "two");
MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 3);
// this should trigger a snapshot
- MockRaftActorContext.MockPayload payload3 = sendPayloadData(singleNodeActorRef, "three");
+ final MockRaftActorContext.MockPayload payload3 = sendPayloadData(singleNodeActorRef, "three");
MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 4);
//add 2 more
- MockRaftActorContext.MockPayload payload4 = sendPayloadData(singleNodeActorRef, "four");
- MockRaftActorContext.MockPayload payload5 = sendPayloadData(singleNodeActorRef, "five");
+ final MockRaftActorContext.MockPayload payload4 = sendPayloadData(singleNodeActorRef, "four");
+ final MockRaftActorContext.MockPayload payload5 = sendPayloadData(singleNodeActorRef, "five");
// Wait for snapshot complete.
assertEquals("Last applied", 5, singleNodeContext.getLastApplied());
- assertEquals("Incorrect State after snapshot success is received ",
- Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState());
+ assertEquals("Incorrect State after snapshot success is received ", Lists.newArrayList(payload0, payload1,
+ payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState());
// we get 2 log entries (4 and 5 indexes) and 3 ApplyJournalEntries (for 3, 4, and 5 indexes)
assertEquals(5, InMemoryJournal.get(persistenceId).size());
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
assertEquals(1, persistedSnapshots.size());
+ @SuppressWarnings("unchecked")
List<Object> snapshottedState = (List<Object>)MockRaftActor.toObject(persistedSnapshots.get(0).getState());
- assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3), snapshottedState);
+ assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3),
+ snapshottedState);
//recovery logic starts
killActor(singleNodeActorRef);
singleNodeActorRef.underlyingActor().waitForRecoveryComplete();
- assertEquals("Incorrect State after Recovery ",
- Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState());
+ assertEquals("Incorrect State after Recovery ", Lists.newArrayList(payload0, payload1, payload2, payload3,
+ payload4, payload5), singleNodeActorRef.underlyingActor().getState());
}
}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
newFollowerConfigParams());
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1Id, follower1Actor.path().toString());
- peerAddresses.put(follower2Id, "");
+ Map<String, String> leaderPeerAddresses = new HashMap<>();
+ leaderPeerAddresses.put(follower1Id, follower1Actor.path().toString());
+ leaderPeerAddresses.put(follower2Id, "");
leaderConfigParams = newLeaderConfigParams();
- leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+ leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
// This should trigger a snapshot.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
// Send another payload.
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
// Now deliver the AppendEntries to the follower
follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
// Block these messages initially so we can control the sequence.
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
// This should trigger a snapshot.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
// Send another payload.
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
// Verify the leader applies the 3rd payload state.
MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
// Wait for the follower to persist the snapshot.
MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
- List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
+ final List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
int expIndex = 1;
int expTerm = 2;
- try(FileInputStream fis = new FileInputStream("src/test/resources/helium-serialized-ReplicatedLogImplEntry")) {
+ try (FileInputStream fis = new FileInputStream("src/test/resources/helium-serialized-ReplicatedLogImplEntry")) {
ObjectInputStream ois = new ObjectInputStream(fis);
ReplicatedLogImplEntry entry = (ReplicatedLogImplEntry) ois.readObject();
* Use this method to generate a file with a serialized ReplicatedLogImplEntry instance to be
* used in tests that verify backwards compatible de-serialization.
*/
+ @SuppressWarnings("unused")
private static void generateSerializedFile() throws IOException {
String expPayloadData = "This is a test";
int expIndex = 1;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+
import akka.japi.Procedure;
import java.util.Collections;
import org.hamcrest.BaseMatcher;
ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
- MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
- MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
+ final MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+ final MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
log.appendAndPersist(logEntry1);
verifyPersist(logEntry1);
verifyNoMoreInteractions(mockPersistence);
}
- public Matcher<DeleteEntries> match(final DeleteEntries actual){
+ public Matcher<DeleteEntries> match(final DeleteEntries actual) {
return new BaseMatcher<DeleteEntries>() {
@Override
- public boolean matches(Object o) {
- DeleteEntries other = (DeleteEntries) o;
+ public boolean matches(Object obj) {
+ DeleteEntries other = (DeleteEntries) obj;
return actual.getFromIndex() == other.getFromIndex();
}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
import java.util.List;
follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
follower1Id, testActorPath(follower1Id)), followerConfigParams);
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).build();
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
* 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
* scenario, the follower consensus and application of state is delayed until after the snapshot
* completes.
- * @throws Exception
*/
private void testFirstSnapshot() throws Exception {
testLog.info("testFirstSnapshot starting");
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
// The leader's persisted journal log should be cleared since we snapshotted.
- List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
+ List<ReplicatedLogImplEntry> persistedLeaderJournal =
+ InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
// Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
/**
* Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
* consensus occurs and the leader applies the state.
- * @throws Exception
*/
private void testSecondSnapshot() throws Exception {
testLog.info("testSecondSnapshot starting");
payload7 = sendPayloadData(leaderActor, "seven");
// Capture the CaptureSnapshotReply message so we can send it later.
- CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
- CaptureSnapshotReply.class);
+ final CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshotReply.class);
// Wait for the state to be applied in the leader.
ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
// Verify the followers apply all 4 new log entries.
- List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 4);
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor,
+ ApplyState.class, 4);
verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
- RaftActorContext follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+ follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm());
assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
MessageCollectorActor.clearMessages(follower2CollectorActor);
MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
- RaftActorContext follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+ follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm());
assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size());
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
- Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).build();
+ Map<String, String> leaderPeerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
- leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+ leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
waitUntilLeader(leaderActor);
assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
assertEquals("Leader replicatedToAllIndex", -1, leader.getReplicatedToAllIndex());
- testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}", follower2Id);
+ testLog.info(
+ "testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}",
+ follower2Id);
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
* lagging but not enough for the leader to trim its log from the last applied index. Follower 2's log
* will be behind by several entries and, when it is resumed, it should be caught up via AppendEntries
* sent by the leader.
- *
- * @throws Exception
*/
@Test
public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception {
verifyApplyState(applyStates.get(3), null, null, currentTerm, 5, payload5);
// Verify the leader did not try to install a snapshot to catch up follower 2.
- InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor, InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
+ InstallSnapshot.class);
Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
// Ensure there's at least 1 more heartbeat.
* lagging where the leader trims its log from the last applied index. Follower 2's log
* will be behind by several entries and, when it is resumed, it should be caught up via a snapshot
* installed by the leader.
- *
- * @throws Exception
*/
@Test
public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() throws Exception {
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
// Send 5 payloads - the second should cause a leader snapshot.
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
- MockPayload payload5 = sendPayloadData(leaderActor, "five");
- MockPayload payload6 = sendPayloadData(leaderActor, "six");
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload5 = sendPayloadData(leaderActor, "five");
+ final MockPayload payload6 = sendPayloadData(leaderActor, "six");
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
MessageCollectorActor.clearMessages(leaderCollectorActor);
- testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: sending 1 more payload to trigger second snapshot");
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: "
+ + "sending 1 more payload to trigger second snapshot");
// Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
* leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
* be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed
* by the leader.
- *
- * @throws Exception
*/
@Test
public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() throws Exception {
InMemoryJournal.waitForWriteMessagesComplete(leaderId);
// Verify a snapshot is not triggered.
- CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor,
+ CaptureSnapshot.class);
Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
expSnapshotState.add(payload1);
/**
* Send another payload to verify another snapshot is not done since the last snapshot trimmed the
* first log entry so the memory threshold should not be exceeded.
- *
- * @throws Exception
*/
private void verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot() throws Exception {
ApplyState applyState;
/**
* Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
- *
- * @throws Exception
*/
private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
@Nullable ServerConfigurationPayload expServerConfig) throws Exception {
- List<Snapshot> persistedSnapshots;
- List<ReplicatedLogEntry> unAppliedEntry;
- ApplySnapshot applySnapshot;
- InstallSnapshot installSnapshot;
-
testLog.info("testInstallSnapshotToLaggingFollower starting");
MessageCollectorActor.clearMessages(leaderCollectorActor);
// RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
// This is OK - the next snapshot should delete it. In production, even if the system restarted
// before another snapshot, they would both get applied which wouldn't hurt anything.
- persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+ List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
- unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
+ List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
int snapshotSize = persistedSnapshot.getState().length;
- int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
+ final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
+ + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
- installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
+ InstallSnapshot.class);
assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
int index = 1;
- for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
+ for (InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
}
// Verify follower 2 applies the snapshot.
- applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
- verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
- assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
+ ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
+ ApplySnapshot.class);
+ verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm,
+ lastAppliedIndex);
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 0,
+ applySnapshot.getSnapshot().getUnAppliedEntries().size());
// Wait for the snapshot to complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
// the log. In addition replicatedToAllIndex should've advanced.
verifyLeadersTrimmedLog(lastAppliedIndex);
- if(expServerConfig != null) {
+ if (expServerConfig != null) {
Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
assertEquals("Leader snapshot server config", expServerInfo,
new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
/**
* Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
* snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
- *
- * @throws Exception
*/
private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
- List<ApplyState> applyStates;
- ApplyState applyState;
-
testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}",
leader.getReplicatedToAllIndex());
// Wait for the snapshot to complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
- applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
verifyApplyState(applyState, leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
// Verify the leader's last persisted snapshot (previous ones may not be purged yet).
MockPayload payload6 = sendPayloadData(leaderActor, "six");
// Verify the leader applies the 2 log entries.
- applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5);
verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
new ReplicatedLogImplEntry(6, currentTerm, payload6)));
// Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
- List<ApplyJournalEntries> persistedApplyJournalEntries = InMemoryJournal.get(leaderId, ApplyJournalEntries.class);
+ List<ApplyJournalEntries> persistedApplyJournalEntries =
+ InMemoryJournal.get(leaderId, ApplyJournalEntries.class);
boolean found = false;
- for(ApplyJournalEntries entry: persistedApplyJournalEntries) {
- if(entry.getToIndex() == 6) {
+ for (ApplyJournalEntries entry: persistedApplyJournalEntries) {
+ if (entry.getToIndex() == 6) {
found = true;
break;
}
}
- Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6), found);
+ Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6),
+ found);
// Verify follower 1 applies the 3 log entries.
applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
/**
* Kill the leader actor, reinstate it and verify the recovered journal.
*/
- private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex, long firstJournalEntryIndex) {
+ private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
+ long firstJournalEntryIndex) {
testLog.info("testLeaderReinstatement starting");
killActor(leaderActor);
assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
assertEquals("Leader last applied", lastIndex, leaderContext.getLastApplied());
- for(long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) {
+ for (long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) {
verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(i), currentTerm, i,
expSnapshotState.get((int) i));
}
private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
// Send the payloads.
- for(String d: data) {
+ for (String d: data) {
expSnapshotState.add(sendPayloadData(leaderActor, d));
}
- int nEntries = data.length;
+ int numEntries = data.length;
// Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
- List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, nEntries);
- for(int i = 0; i < expSnapshotState.size(); i++) {
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor,
+ ApplyState.class, numEntries);
+ for (int i = 0; i < expSnapshotState.size(); i++) {
MockPayload payload = expSnapshotState.get(i);
verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
}
// Verify follower 1 applies each log entry.
- applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, nEntries);
- for(int i = 0; i < expSnapshotState.size(); i++) {
+ applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, numEntries);
+ for (int i = 0; i < expSnapshotState.size(); i++) {
MockPayload payload = expSnapshotState.get(i);
verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
}
// Verify follower 2 applies each log entry.
- applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, nEntries);
- for(int i = 0; i < expSnapshotState.size(); i++) {
+ applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, numEntries);
+ for (int i = 0; i < expSnapshotState.size(); i++) {
MockPayload payload = expSnapshotState.get(i);
verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
}
// The leader should have performed fake snapshots to trim the log to the last index replicated to
// all followers.
- verifyLeadersTrimmedLog(nEntries - 1);
+ verifyLeadersTrimmedLog(numEntries - 1);
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import org.apache.commons.lang.SerializationUtils;
new ServerConfigurationPayload.ServerInfo("1", true),
new ServerConfigurationPayload.ServerInfo("2", false)));
org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload cloned =
- (org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload) SerializationUtils.clone(expected);
+ (org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload)
+ SerializationUtils.clone(expected);
assertEquals("getServerConfig", ImmutableSet.of(
new org.opendaylight.controller.cluster.raft.persisted.ServerInfo("1", true),
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+
import akka.actor.ActorRef;
import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
private TestActorRef<MessageCollectorActor> actorRef;
@Before
- public void setUp(){
+ public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(false).when(mockRaftActorContext).hasFollowers();
}
@After
- public void tearDown(){
+ public void tearDown() {
factory.close();
}
@Test
- public void testConstruction(){
+ public void testConstruction() {
assertEquals(false, snapshotManager.isCapturing());
}
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
- System.out.println(captureSnapshot);
-
// LastIndex and LastTerm are picked up from the lastLogEntry
assertEquals(-1L, captureSnapshot.getLastIndex());
assertEquals(-1L, captureSnapshot.getLastTerm());
}
@Test
- public void testCaptureWithCreateProcedureError () throws Exception {
+ public void testCaptureWithCreateProcedureError() throws Exception {
doThrow(new RuntimeException("mock")).when(mockProcedure).run();
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
}
@Test
- public void testPersistWhenReplicatedToAllIndexMinusOne(){
+ public void testPersistWhenReplicatedToAllIndexMinusOne() {
doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
}
@Test
- public void testPersistWhenReplicatedToAllIndexNotMinus(){
+ public void testPersistWhenReplicatedToAllIndexNotMinus() {
doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
}
@Test
- public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+ public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold() {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
}
@Test
- public void testPersistSendInstallSnapshot(){
+ public void testPersistSendInstallSnapshot() {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
}
@Test
- public void testCallingPersistWithoutCaptureWillDoNothing(){
+ public void testCallingPersistWithoutCaptureWillDoNothing() {
snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
}
+
@Test
- public void testCallingPersistTwiceWillDoNoHarm(){
+ public void testCallingPersistTwiceWillDoNoHarm() {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
}
@Test
- public void testCommit(){
+ public void testCommit() {
doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
// when replicatedToAllIndex = -1
verify(mockDataPersistenceProvider).deleteMessages(50L);
- ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+ ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor =
+ ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
}
@Test
- public void testCommitBeforePersist(){
+ public void testCommitBeforePersist() {
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
}
@Test
- public void testCommitBeforeCapture(){
+ public void testCommitBeforeCapture() {
snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
}
@Test
- public void testCallingCommitMultipleTimesCausesNoHarm(){
+ public void testCallingCommitMultipleTimesCausesNoHarm() {
doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
// when replicatedToAllIndex = -1
}
@Test
- public void testRollback(){
+ public void testRollback() {
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
@Test
- public void testRollbackBeforePersist(){
+ public void testRollbackBeforePersist() {
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
}
@Test
- public void testRollbackBeforeCapture(){
+ public void testRollbackBeforeCapture() {
snapshotManager.rollback();
verify(mockReplicatedLog, never()).snapshotRollback();
}
@Test
- public void testCallingRollbackMultipleTimesCausesNoHarm(){
+ public void testCallingRollbackMultipleTimesCausesNoHarm() {
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
long retIndex = snapshotManager.trimLog(10);
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
long retIndex = snapshotManager.trimLog(10);
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
long retIndex = snapshotManager.trimLog(10);
}
@Test
- public void testTrimLogAfterCapture(){
+ public void testTrimLogAfterCapture() {
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(20L).when(mockRaftActorContext).getLastApplied();
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
snapshotManager.trimLog(10);
}
@Test
- public void testTrimLogAfterCaptureToInstall(){
+ public void testTrimLogAfterCaptureToInstall() {
boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9, "follower-1");
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(20L).when(mockRaftActorContext).getLastApplied();
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
snapshotManager.trimLog(10);
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
@Test
public void testBackwardsCompatibleDeserializationFromLithium() throws Exception {
Snapshot expSnapshot = newLithiumSnapshot();
- try(FileInputStream fis = new FileInputStream("src/test/resources/lithium-serialized-Snapshot")) {
+ try (FileInputStream fis = new FileInputStream("src/test/resources/lithium-serialized-Snapshot")) {
ObjectInputStream ois = new ObjectInputStream(fis);
Snapshot snapshot = (Snapshot) ois.readObject();
assertEquals("lastTerm", expSnapshot.getLastTerm(), snapshot.getLastTerm());
assertEquals("lastAppliedIndex", expSnapshot.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
assertEquals("lastAppliedTerm", expSnapshot.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
- assertEquals("unAppliedEntries size", expSnapshot.getUnAppliedEntries().size(), snapshot.getUnAppliedEntries().size());
+ assertEquals("unAppliedEntries size", expSnapshot.getUnAppliedEntries().size(),
+ snapshot.getUnAppliedEntries().size());
assertArrayEquals("state", expSnapshot.getState(), snapshot.getState());
assertEquals("electionTerm", 0, snapshot.getElectionTerm());
assertEquals("electionVotedFor", null, snapshot.getElectionVotedFor());
* Use this method to generate a file with a serialized Snapshot instance to be
* used in tests that verify backwards compatible de-serialization.
*/
+ @SuppressWarnings("unused")
private static void generateSerializedFile(Snapshot snapshot, String fileName) throws IOException {
FileOutputStream fos = new FileOutputStream("src/test/resources/" + fileName);
ObjectOutputStream oos = new ObjectOutputStream(fos);
/**
* TestActorFactory provides methods to create both normal and test actors and to kill them when the factory is closed
- * The ideal usage for TestActorFactory is with try with resources, <br/>
+ * The ideal usage for TestActorFactory is with try with resources.
+ * <p/>
* For example <br/>
* <pre>
* try (TestActorFactory factory = new TestActorFactory(getSystem())){
* </pre>
*/
public class TestActorFactory implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(TestActorFactory.class);
+
private final ActorSystem system;
List<ActorRef> createdActors = new LinkedList<>();
- Logger LOG = LoggerFactory.getLogger(getClass());
private static int actorCount = 1;
- public TestActorFactory(ActorSystem system){
+ public TestActorFactory(ActorSystem system) {
this.system = system;
}
/**
- * Create a normal actor with an auto-generated name
+ * Create a normal actor with an auto-generated name.
*
- * @param props
- * @return
+ * @param props the actor Props
+ * @return the ActorRef
*/
- public ActorRef createActor(Props props){
+ public ActorRef createActor(Props props) {
ActorRef actorRef = system.actorOf(props);
return addActor(actorRef);
}
/**
- * Create a normal actor with the passed in name
- * @param props
+ * Create a normal actor with the passed in name.
+ *
+ * @param props the actor Props
* @param actorId name of actor
- * @return
+ * @return the ActorRef
*/
- public ActorRef createActor(Props props, String actorId){
+ public ActorRef createActor(Props props, String actorId) {
ActorRef actorRef = system.actorOf(props, actorId);
return addActor(actorRef);
}
/**
- * Create a test actor with the passed in name
- * @param props
- * @param actorId
- * @param <T>
- * @return
+ * Create a test actor with the passed in name.
+ *
+ * @param props the actor Props
+ * @param actorId name of actor
+ * @param <T> the actor type
+ * @return the ActorRef
*/
@SuppressWarnings("unchecked")
- public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId){
+ public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId) {
TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId);
return (TestActorRef<T>) addActor(actorRef);
}
+ /**
+ * Create a test actor with an auto-generated name.
+ *
+ * @param props the actor Props
+ * @param <T> the actor type
+ * @return the TestActorRef
+ */
+ @SuppressWarnings("unchecked")
+ public <T extends Actor> TestActorRef<T> createTestActor(Props props) {
+ TestActorRef<T> actorRef = TestActorRef.create(system, props);
+ return (TestActorRef<T>) addActor(actorRef);
+ }
+
private <T extends ActorRef> ActorRef addActor(T actorRef) {
createdActors.add(actorRef);
verifyActorReady(actorRef);
return actorRef;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void verifyActorReady(ActorRef actorRef) {
// Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite
// in a state yet to receive messages or isn't actually created yet. This seems to happen with
Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS);
Throwable lastError = null;
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
try {
ActorSelection actorSelection = system.actorSelection(actorRef.path().toString());
Future<Object> future = Patterns.ask(actorSelection, new Identify(""), timeout);
}
/**
- * Create a test actor with an auto-generated name
- * @param props
- * @param <T>
- * @return
- */
- @SuppressWarnings("unchecked")
- public <T extends Actor> TestActorRef<T> createTestActor(Props props){
- TestActorRef<T> actorRef = TestActorRef.create(system, props);
- return (TestActorRef<T>) addActor(actorRef);
- }
-
- /**
- * Generate a friendly but unique actor id/name
- * @param prefix
- * @return
+ * Generate a friendly but unique actor id/name.
+ *
+ * @param prefix the name prefix
+ * @return the actor name
*/
- public String generateActorId(String prefix){
+ public String generateActorId(String prefix) {
return prefix + actorCount++;
}
killActor(actor, kit, true);
}
- public String createTestActorPath(String actorId){
- return "akka://test/user/" + actorId;
- }
-
private void killActor(ActorRef actor, JavaTestKit kit, boolean remove) {
LOG.info("Killing actor {}", actor);
kit.watch(actor);
actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
kit.expectTerminated(JavaTestKit.duration("5 seconds"), actor);
- if(remove) {
+ if (remove) {
createdActors.remove(actor);
}
}
+ public String createTestActorPath(String actorId) {
+ return "akka://test/user/" + actorId;
+ }
+
@Override
public void close() {
JavaTestKit kit = new JavaTestKit(system);
- for(ActorRef actor : createdActors) {
+ for (ActorRef actor : createdActors) {
killActor(actor, kit, false);
}
}
-}
\ No newline at end of file
+}
public void testSerialization() {
DeleteEntries deleteEntries = new DeleteEntries(11);
org.opendaylight.controller.cluster.raft.persisted.DeleteEntries clone =
- (org.opendaylight.controller.cluster.raft.persisted.DeleteEntries) SerializationUtils.clone(deleteEntries);
+ (org.opendaylight.controller.cluster.raft.persisted.DeleteEntries)
+ SerializationUtils.clone(deleteEntries);
Assert.assertEquals("getFromIndex", 11, clone.getFromIndex());
Assert.assertEquals("isMigrated", true, clone.isMigrated());
public void testSerialization() {
UpdateElectionTerm expected = new UpdateElectionTerm(5, "member1");
org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm clone =
- (org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm) SerializationUtils.clone(expected);
+ (org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm)
+ SerializationUtils.clone(expected);
Assert.assertEquals("getCurrentTerm", 5, clone.getCurrentTerm());
Assert.assertEquals("getVotedFor", "member1", clone.getVotedFor());
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
@Override
public void onReceive(Object message) throws Exception {
// Ignore scheduled SendHeartBeat messages.
- if(message instanceof SendHeartBeat) {
+ if (message instanceof SendHeartBeat) {
return;
}
try {
- if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
+ if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
if (nextBehavior != null) {
RaftActorBehavior oldBehavior = behavior;
behavior = nextBehavior;
- if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
+ if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
behaviorStateChangeLatch.countDown();
}
}
super.onReceive(message);
CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
- if(latch != null) {
+ if (latch != null) {
latch.countDown();
}
}
}
void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
- for(Object m: getAllMatching(getSelf(), msgClass)) {
+ for (Object m: getAllMatching(getSelf(), msgClass)) {
getSelf().tell(m, sender);
}
}
assertEquals(name + " behavior state", expState, actor.behavior.state());
}
- void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers) throws Exception {
+ void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
+ throws Exception {
// Leader sends immediate heartbeats - we don't care about it so ignore it.
// Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
// haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
Leader leader = null;
AssertionError lastAssertError = null;
- for(int i = 1; i <= 3; i++) {
+ for (int i = 1; i <= 3; i++) {
actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
leader = new Leader(context);
}
}
- if(lastAssertError != null) {
+ if (lastAssertError != null) {
throw lastAssertError;
}
}
TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
- TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), name);
+ TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
MessageCollectorActor.waitUntilReady(actor);
return actor;
}
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.HashMap;
* When we removed scheduling of heartbeat in the AbstractLeader constructor we ended up with a situation where
* if no follower responded to an initial AppendEntries heartbeats would not be sent to it. This test verifies
* that regardless of whether followers respond or not we schedule heartbeats.
- *
- * @throws Exception
*/
@Test
public void testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries() throws Exception {
logStart("testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries");
- new JavaTestKit(getSystem()) {{
- String leaderActorId = actorFactory.generateActorId("leader");
- String follower1ActorId = actorFactory.generateActorId("follower");
- String follower2ActorId = actorFactory.generateActorId("follower");
- TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
- actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
- ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
- ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+ actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+ final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
- configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
- leaderActorContext.setConfigParams(configParams);
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+ leaderActorContext.setConfigParams(configParams);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1ActorId,
- follower1Actor.path().toString());
- peerAddresses.put(follower2ActorId,
- follower2Actor.path().toString());
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
- leaderActorContext.setPeerAddresses(peerAddresses);
- RaftActorBehavior leader = createBehavior(leaderActorContext);
+ leaderActorContext.setPeerAddresses(peerAddresses);
- leaderActor.underlyingActor().setBehavior(leader);
+ RaftActorBehavior leader = createBehavior(leaderActorContext);
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ leaderActor.underlyingActor().setBehavior(leader);
- List<SendHeartBeat> allMessages = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- // Need more than 1 heartbeat to be delivered because we waited for 1 second with heartbeat interval 200ms
- assertTrue(String.format("%s messages is less than expected", allMessages.size()),
- allMessages.size() > 1);
+ List<SendHeartBeat> allMessages = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
- }};
+ // Need more than 1 heartbeat to be delivered because we waited for 1 second with heartbeat interval 200ms
+ assertTrue(String.format("%s messages is less than expected", allMessages.size()),
+ allMessages.size() > 1);
}
-
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
@After
public void tearDown() throws Exception {
- if(behavior != null) {
+ if (behavior != null) {
behavior.close();
}
/**
* This test checks that when a new Raft RPC message is received with a newer
* term the RaftActor gets into the Follower state.
- *
- * @throws Exception
*/
@Test
public void testHandleRaftRPCWithNewerTerm() throws Exception {
/**
* This test verifies that when an AppendEntries is received with a term that
* is less that the currentTerm of the RaftActor then the RaftActor does not
- * change it's state and it responds back with a failure
- *
- * @throws Exception
+ * change it's state and it responds back with a failure.
*/
@Test
public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
+ final AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
behavior = createBehavior(context);
/**
* This test verifies that when a RaftActor receives a RequestVote message
* with a term that is greater than it's currentTerm but a less up-to-date
- * log then the receiving RaftActor will not grant the vote to the sender
+ * log then the receiving RaftActor will not grant the vote to the sender.
*/
@Test
public void testHandleRequestVoteWhenSenderLogLessUptoDate() {
/**
* This test verifies that the receiving RaftActor will not grant a vote
* to a sender if the sender's term is lesser than the currentTerm of the
- * recipient RaftActor
+ * recipient RaftActor.
*/
@Test
public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
assertEquals(1, context.getReplicatedLog().size());
- //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+ // 5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and
+ // 1 will only get purged
context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
context.setLastApplied(2);
abstractBehavior.performSnapshotWithoutCapture(3);
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
- Payload p = new MockRaftActorContext.MockPayload("");
- setLastLogEntry(actorContext, 1, 0, p);
+ Payload payload = new MockRaftActorContext.MockPayload("");
+ setLastLogEntry(actorContext, 1, 0, payload);
actorContext.getTermInformation().update(1, "test");
RaftActorBehavior origBehavior = createBehavior(actorContext);
protected ByteString toByteString(Map<String, String> state) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try(ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(state);
return ByteString.copyFrom(bos.toByteArray());
} catch (IOException e) {
}
protected RaftPolicy createRaftPolicy(final boolean automaticElectionsEnabled,
- final boolean applyModificationToStateBeforeConsensus){
+ final boolean applyModificationToStateBeforeConsensus) {
return new RaftPolicy() {
@Override
public boolean automaticElectionsEnabled() {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
@Override
@After
public void tearDown() throws Exception {
- if(candidate != null) {
+ if (candidate != null) {
candidate.close();
}
}
@Test
- public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
+ public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself() {
RaftActorContext raftActorContext = createActorContext();
long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
candidate = new Candidate(raftActorContext);
- assertEquals("getCurrentTerm", expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
+ assertEquals("getCurrentTerm", expectedTerm + 1, raftActorContext.getTermInformation().getCurrentTerm());
assertEquals("getVotedFor", raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
}
@Test
- public void testThatAnElectionTimeoutIsTriggered(){
- MockRaftActorContext actorContext = createActorContext();
- candidate = new Candidate(actorContext);
+ public void testThatAnElectionTimeoutIsTriggered() {
+ MockRaftActorContext actorContext = createActorContext();
+ candidate = new Candidate(actorContext);
- MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class,
- actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
+ MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class,
+ actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
}
@Test
- public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
+ public void testHandleElectionTimeoutWhenThereAreZeroPeers() {
RaftActorContext raftActorContext = createActorContext();
candidate = new Candidate(raftActorContext);
}
@Test
- public void testHandleElectionTimeoutWhenThereAreTwoNodeCluster(){
+ public void testHandleElectionTimeoutWhenThereAreTwoNodeCluster() {
MockRaftActorContext raftActorContext = createActorContext();
raftActorContext.setPeerAddresses(setupPeers(1));
candidate = new Candidate(raftActorContext);
}
@Test
- public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
+ public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodeCluster() {
MockRaftActorContext raftActorContext = createActorContext();
raftActorContext.setLastApplied(raftActorContext.getReplicatedLog().lastIndex());
raftActorContext.setPeerAddresses(setupPeers(2));
}
@Test
- public void testBecomePreLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
+ public void testBecomePreLeaderOnReceivingMajorityVotesInThreeNodeCluster() {
MockRaftActorContext raftActorContext = createActorContext();
raftActorContext.setLastApplied(-1);
raftActorContext.setPeerAddresses(setupPeers(2));
}
@Test
- public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster(){
+ public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster() {
MockRaftActorContext raftActorContext = createActorContext();
raftActorContext.getTermInformation().update(2L, "other");
- raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().
- createEntries(0, 5, 1).build());
+ raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder()
+ .createEntries(0, 5, 1).build());
raftActorContext.setCommitIndex(raftActorContext.getReplicatedLog().lastIndex());
raftActorContext.setLastApplied(raftActorContext.getReplicatedLog().lastIndex());
raftActorContext.setPeerAddresses(setupPeers(4));
}
@Test
- public void testBecomeLeaderOnReceivingMajorityVotesWithNonVotingPeers(){
+ public void testBecomeLeaderOnReceivingMajorityVotesWithNonVotingPeers() {
ElectionTerm mockElectionTerm = Mockito.mock(ElectionTerm.class);
Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
}
@Test
- public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
MockRaftActorContext context = createActorContext();
Stopwatch stopwatch = Stopwatch.createStarted();
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
+ final AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
behavior = createBehavior(context);
private Map<String, String> setupPeers(final int count) {
Map<String, String> peerMap = new HashMap<>();
peerActors = new TestActorRef[count];
- for(int i = 0; i < count; i++) {
+ for (int i = 0; i < count; i++) {
peerActors[i] = actorFactory.createTestActor(Props.create(MessageCollectorActor.class),
actorFactory.generateActorId("peer"));
- peerMap.put("peer" + (i+1), peerActors[i].path().toString());
+ peerMap.put("peer" + (i + 1), peerActors[i].path().toString());
}
return peerMap;
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
final ActorRef actorRef, final RaftRPC rpc) throws Exception {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
- if(rpc instanceof RequestVote) {
- assertEquals("New votedFor", ((RequestVote)rpc).getCandidateId(), actorContext.getTermInformation().getVotedFor());
+ if (rpc instanceof RequestVote) {
+ assertEquals("New votedFor", ((RequestVote)rpc).getCandidateId(),
+ actorContext.getTermInformation().getVotedFor());
} else {
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+
import akka.actor.ActorRef;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
// Create member 2's behavior initially as Follower
member2Context = newRaftActorContext("member2", member2ActorRef,
- ImmutableMap.<String,String>builder().
- put("member1", member1ActorRef.path().toString()).
- put("member3", member3ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member1", member1ActorRef.path().toString())
+ .put("member3", member3ActorRef.path().toString()).build());
DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
member2Context.setConfigParams(member2ConfigParams);
// Create member 3's behavior initially as Follower
member3Context = newRaftActorContext("member3", member3ActorRef,
- ImmutableMap.<String,String>builder().
- put("member1", member1ActorRef.path().toString()).
- put("member2", member2ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member1", member1ActorRef.path().toString())
+ .put("member2", member2ActorRef.path().toString()).build());
DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
member3Context.setConfigParams(member3ConfigParams);
// Create member 1's behavior initially as Leader
member1Context = newRaftActorContext("member1", member1ActorRef,
- ImmutableMap.<String,String>builder().
- put("member2", member2ActorRef.path().toString()).
- put("member3", member3ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member2", member2ActorRef.path().toString())
+ .put("member3", member3ActorRef.path().toString()).build());
DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
member1Context.setConfigParams(member1ConfigParams);
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
@Override
@After
public void tearDown() throws Exception {
- if(follower != null) {
+ if (follower != null) {
follower.close();
}
}
@Override
- protected MockRaftActorContext createActorContext(ActorRef actorRef){
+ protected MockRaftActorContext createActorContext(ActorRef actorRef) {
MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
context.setPayloadVersion(payloadVersion );
return context;
}
@Test
- public void testThatAnElectionTimeoutIsTriggered(){
+ public void testThatAnElectionTimeoutIsTriggered() {
MockRaftActorContext actorContext = createActorContext();
follower = new Follower(actorContext);
logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
MockRaftActorContext context = createActorContext();
- ((DefaultConfigParamsImpl) context.getConfigParams()).
- setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl) context.getConfigParams())
+ .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
follower = new Follower(context);
context.setCurrentBehavior(follower);
- Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
- getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
+ .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
-1, -1, (short) 1));
RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
assertTrue(raftBehavior instanceof Follower);
- Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
- getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
+ .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
-1, -1, (short) 1));
}
@Test
- public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
MockRaftActorContext context = createActorContext();
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
assertFalse(syncStatus.isInitialSyncDone());
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
assertFalse(syncStatus.isInitialSyncDone());
}
@Test
- public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
+ throws Exception {
logStart("testHandleFirstAppendEntries");
MockRaftActorContext context = createActorContext();
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
assertFalse(syncStatus.isInitialSyncDone());
}
@Test
- public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
+ throws Exception {
logStart("testHandleFirstAppendEntries");
MockRaftActorContext context = createActorContext();
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
assertFalse(syncStatus.isInitialSyncDone());
}
@Test
- public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
- logStart("testHandleFirstAppendEntries");
+ public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
+ throws Exception {
+ logStart(
+ "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
MockRaftActorContext context = createActorContext();
context.getReplicatedLog().clear(0,2);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
assertFalse(syncStatus.isInitialSyncDone());
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
assertFalse(syncStatus.isInitialSyncDone());
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
assertFalse(syncStatus.isInitialSyncDone());
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
- FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+ FollowerInitialSyncUpStatus.class);
assertFalse(syncStatus.isInitialSyncDone());
* with a commitIndex that is greater than what has been applied to the
* state machine of the RaftActor, the RaftActor applies the state and
* sets it current applied state to the commitIndex of the sender.
- *
- * @throws Exception
*/
@Test
public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
* This test verifies that when an AppendEntries is received a specific prevLogTerm
* which does not match the term that is in RaftActors log entry at prevLogIndex
* then the RaftActor does not change it's state and it returns a failure.
- *
- * @throws Exception
*/
@Test
public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
* This test verifies that when a new AppendEntries message is received with
* new entries and the logs of the sender and receiver match that the new
* entries get added to the log and the log is incremented by the number of
- * entries received in appendEntries
- *
- * @throws Exception
+ * entries received in appendEntries.
*/
@Test
public void testHandleAppendEntriesAddNewEntries() {
* This test verifies that when a new AppendEntries message is received with
* new entries and the logs of the sender and receiver are out-of-sync that
* the log is first corrected by removing the out of sync entries from the
- * log and then adding in the new entries sent with the AppendEntries message
+ * log and then adding in the new entries sent with the AppendEntries message.
*/
@Test
public void testHandleAppendEntriesCorrectReceiverLogEntries() {
}
@Test
- public void testHandleAppendEntriesPreviousLogEntryMissing(){
+ public void testHandleAppendEntriesPreviousLogEntryMissing() {
logStart("testHandleAppendEntriesPreviousLogEntryMissing");
- MockRaftActorContext context = createActorContext();
+ final MockRaftActorContext context = createActorContext();
// Prepare the receivers log
MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
}
@Test
- public void testHandleAppendEntriesAfterInstallingSnapshot(){
+ public void testHandleAppendEntriesAfterInstallingSnapshot() {
logStart("testHandleAppendAfterInstallingSnapshot");
MockRaftActorContext context = createActorContext();
/**
* This test verifies that when InstallSnapshot is received by
* the follower its applied correctly.
- *
- * @throws Exception
*/
@Test
public void testHandleInstallSnapshot() throws Exception {
int chunkIndex = 1;
InstallSnapshot lastInstallSnapshot = null;
- for(int i = 0; i < totalChunks; i++) {
+ for (int i = 0; i < totalChunks; i++) {
byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
chunkIndex = 1;
- for(InstallSnapshotReply reply: replies) {
+ for (InstallSnapshotReply reply: replies) {
assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
assertEquals("getTerm", 1, reply.getTerm());
assertEquals("isSuccess", true, reply.isSuccess());
/**
* Verify that when an AppendEntries is sent to a follower during a snapshot install
* the Follower short-circuits the processing of the AppendEntries message.
- *
- * @throws Exception
*/
@Test
public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
int chunkIndex = 1;
InstallSnapshot lastInstallSnapshot = null;
- for(int i = 0; i < totalChunks; i++) {
+ for (int i = 0; i < totalChunks; i++) {
byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
}
@Test
- public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
MockRaftActorContext context = createActorContext();
Stopwatch stopwatch = Stopwatch.createStarted();
}
@Test
- public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
+ public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
MockRaftActorContext context = createActorContext();
- context.setConfigParams(new DefaultConfigParamsImpl(){
+ context.setConfigParams(new DefaultConfigParamsImpl() {
@Override
public FiniteDuration getElectionTimeOutInterval() {
return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
}
@Test
- public void testFollowerSchedulesElectionIfNonVoting(){
+ public void testFollowerSchedulesElectionIfNonVoting() {
MockRaftActorContext context = createActorContext();
context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
}
@Test
- public void testElectionScheduledWhenAnyRaftRPCReceived(){
+ public void testElectionScheduledWhenAnyRaftRPCReceived() {
MockRaftActorContext context = createActorContext();
follower = createBehavior(context);
follower.handleMessage(leaderActor, new RaftRPC() {
}
@Test
- public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
+ public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
MockRaftActorContext context = createActorContext();
follower = createBehavior(context);
follower.handleMessage(leaderActor, "non-raft-rpc");
verify(follower, never()).scheduleElection(any(FiniteDuration.class));
}
- public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
+ public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
int snapshotLength = bs.size();
int start = offset;
int size = chunkSize;
new MockRaftActorContext.MockPayload(data));
}
- private ByteString createSnapshot(){
+ private ByteString createSnapshot() {
HashMap<String, String> followerSnapshot = new HashMap<>();
followerSnapshot.put("1", "A");
followerSnapshot.put("2", "B");
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
@Override
@After
public void tearDown() throws Exception {
- if(isolatedLeader != null) {
+ if (isolatedLeader != null) {
isolatedLeader.close();
}
assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state());
// in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
- RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+ RaftActorBehavior newBehavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0));
- assertEquals("Raft state", RaftState.Leader, behavior.state());
+ assertEquals("Raft state", RaftState.Leader, newBehavior.state());
isolatedLeader.close();
- isolatedLeader = (AbstractLeader) behavior;
+ isolatedLeader = (AbstractLeader) newBehavior;
- behavior = isolatedLeader.handleMessage(senderActor,
+ newBehavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
+ isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0 ));
- assertEquals("Raft state", RaftState.Leader, behavior.state());
+ assertEquals("Raft state", RaftState.Leader, newBehavior.state());
}
@Test
String followerAddress3 = "akka://test/user/$c";
String followerAddress4 = "akka://test/user/$d";
- MockRaftActorContext leaderActorContext = createActorContext();
+ final MockRaftActorContext leaderActorContext = createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put("follower-1", followerAddress1);
peerAddresses.put("follower-2", followerAddress2);
assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state());
// in a 5 member cluster, atleast 2 followers need to be active and return a reply
- RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+ RaftActorBehavior newBehavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
+ isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0 ));
- assertEquals("Raft state", RaftState.IsolatedLeader, behavior.state());
+ assertEquals("Raft state", RaftState.IsolatedLeader, newBehavior.state());
- behavior = isolatedLeader.handleMessage(senderActor,
+ newBehavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
+ isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0 ));
- assertEquals("Raft state", RaftState.Leader, behavior.state());
+ assertEquals("Raft state", RaftState.Leader, newBehavior.state());
isolatedLeader.close();
- isolatedLeader = (AbstractLeader) behavior;
+ isolatedLeader = (AbstractLeader) newBehavior;
- behavior = isolatedLeader.handleMessage(senderActor,
+ newBehavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
+ isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0 ));
- assertEquals("Raft state", RaftState.Leader, behavior.state());
+ assertEquals("Raft state", RaftState.Leader, newBehavior.state());
}
@Test
// if an append-entries reply is received by the isolated-leader, and that reply
// has a term > than its own term, then IsolatedLeader switches to Follower
// bowing itself to another leader in the cluster
- RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+ RaftActorBehavior newBehavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1, (short)0));
- assertEquals("Raft state", RaftState.Follower, behavior.state());
+ assertEquals("Raft state", RaftState.Follower, newBehavior.state());
- behavior.close();
+ newBehavior.close();
}
}
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
@Override
@After
public void tearDown() throws Exception {
- if(leader != null) {
+ if (leader != null) {
leader.close();
}
MockRaftActorContext actorContext = createActorContextWithFollower();
actorContext.setCommitIndex(-1);
- short payloadVersion = (short)5;
actorContext.setPayloadVersion(payloadVersion);
long term = 1;
actorContext.setCurrentBehavior(leader);
// Leader should send an immediate heartbeat with no entries as follower is inactive.
- long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ final long lastIndex = actorContext.getReplicatedLog().lastIndex();
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertEquals("getTerm", term, appendEntries.getTerm());
assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
followerActor.underlyingActor().clear();
// Sleep for the heartbeat interval so AppendEntries is sent.
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
- getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
return sendReplicate(actorContext, 1, index);
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
term, index, payload);
assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
- assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
+ assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
}
@Test
followerActor.underlyingActor().clear();
- for(int i=0;i<5;i++) {
- sendReplicate(actorContext, lastIndex+i+1);
+ for (int i = 0; i < 5; i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
}
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
followerActor.underlyingActor().clear();
- for(int i=0;i<3;i++) {
- sendReplicate(actorContext, lastIndex+i+1);
+ for (int i = 0; i < 3; i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
leader.handleMessage(followerActor, new AppendEntriesReply(
FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
}
- for(int i=3;i<5;i++) {
+ for (int i = 3; i < 5; i++) {
sendReplicate(actorContext, lastIndex + i + 1);
}
// get sent to the follower - but not the 5th
assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
- for(int i=0;i<4;i++) {
+ for (int i = 0; i < 4; i++) {
long expected = allMessages.get(i).getEntries().get(0).getIndex();
- assertEquals(expected, i+2);
+ assertEquals(expected, i + 2);
}
}
followerActor.underlyingActor().clear();
- sendReplicate(actorContext, lastIndex+1);
+ sendReplicate(actorContext, lastIndex + 1);
// Wait slightly longer than heartbeat duration
Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
assertEquals(1, allMessages.get(0).getEntries().size());
- assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+ assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
assertEquals(1, allMessages.get(1).getEntries().size());
- assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+ assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
}
followerActor.underlyingActor().clear();
- for(int i=0;i<3;i++) {
+ for (int i = 0; i < 3; i++) {
Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
- sendReplicate(actorContext, lastIndex+1);
+ sendReplicate(actorContext, lastIndex + 1);
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
leaderActor, ApplyState.class);
assertEquals("ApplyState count", newLogIndex, applyStateList.size());
- for(int i = 0; i <= newLogIndex - 1; i++ ) {
+ for (int i = 0; i <= newLogIndex - 1; i++ ) {
ApplyState applyState = applyStateList.get(i);
assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
- MockRaftActorContext actorContext = createActorContextWithFollower();
+ final MockRaftActorContext actorContext = createActorContextWithFollower();
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
final int commitIndex = 3;
final int snapshotIndex = 2;
- final int newEntryIndex = 4;
final int snapshotTerm = 1;
- final int currentTerm = 2;
// set the snapshot variables in replicatedlog
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
public void testSendAppendEntriesSnapshotScenario() throws Exception {
logStart("testSendAppendEntriesSnapshotScenario");
- MockRaftActorContext actorContext = createActorContextWithFollower();
+ final MockRaftActorContext actorContext = createActorContextWithFollower();
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
// set the snapshot as absent and check if capture-snapshot is invoked.
leader.setSnapshot(null);
- for(int i=0;i<4;i++) {
+ for (int i = 0; i < 4; i++) {
actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
new MockRaftActorContext.MockPayload("X" + i)));
}
public void testInstallSnapshot() throws Exception {
logStart("testInstallSnapshot");
- MockRaftActorContext actorContext = createActorContextWithFollower();
+ final MockRaftActorContext actorContext = createActorContextWithFollower();
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
// check if installsnapshot gets called with the correct values.
- InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+ InstallSnapshot.class);
assertNotNull(installSnapshot.getData());
assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
public void testForceInstallSnapshot() throws Exception {
logStart("testForceInstallSnapshot");
- MockRaftActorContext actorContext = createActorContextWithFollower();
+ final MockRaftActorContext actorContext = createActorContextWithFollower();
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
// check if installsnapshot gets called with the correct values.
- InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+ InstallSnapshot.class);
assertNotNull(installSnapshot.getData());
assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
fts.setSnapshotBytes(bs);
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
- while(!fts.isLastChunk(fts.getChunkIndex())) {
+ while (!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
fts.incrementChunkIndex();
}
final int snapshotTerm = 1;
final int currentTerm = 2;
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
@Override
public int getSnapshotChunkSize() {
return 50;
leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
- InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+ InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
@Test
- public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
MockRaftActorContext actorContext = createActorContextWithFollower();
final int snapshotTerm = 1;
final int currentTerm = 2;
- actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
public int getSnapshotChunkSize() {
return 50;
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
- InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+ InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
- InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+ InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
installSnapshot.getLastChunkHashCode().get().intValue());
- int hashCode = Arrays.hashCode(installSnapshot.getData());
+ final int hashCode = Arrays.hashCode(installSnapshot.getData());
followerActor.underlyingActor().clear();
assertEquals(bs.size(), barray.length);
- int chunkIndex=0;
- for (int i=0; i < barray.length; i = i + 50) {
- int j = i + 50;
+ int chunkIndex = 0;
+ for (int i = 0; i < barray.length; i = i + 50) {
+ int length = i + 50;
chunkIndex++;
if (i + 50 > barray.length) {
- j = barray.length;
+ length = barray.length;
}
byte[] chunk = fts.getNextChunk();
- assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
+ assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
fts.markSendStatus(true);
return createActorContext(LEADER_ID, actorRef);
}
- private MockRaftActorContext createActorContextWithFollower() {
- MockRaftActorContext actorContext = createActorContext();
- actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
- followerActor.path().toString()).build());
- return actorContext;
- }
-
private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
return context;
}
+ private MockRaftActorContext createActorContextWithFollower() {
+ MockRaftActorContext actorContext = createActorContext();
+ actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
+ followerActor.path().toString()).build());
+ return actorContext;
+ }
+
private MockRaftActorContext createFollowerActorContextWithLeader() {
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
- MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
- MockRaftActorContext leaderActorContext = createActorContext();
+ final MockRaftActorContext leaderActorContext = createActorContext();
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
}
@Test
- public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
leaderActorContext.setCommitIndex(leaderCommitIndex);
leaderActorContext.setLastApplied(leaderCommitIndex);
- ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
- ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+ final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
leader = new Leader(leaderActorContext);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
MessageCollectorActor.clearMessages(followerActor);
MessageCollectorActor.clearMessages(leaderActor);
leaderActorContext.setCommitIndex(leaderCommitIndex);
leaderActorContext.setLastApplied(leaderCommitIndex);
- ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
- ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
leader = new Leader(leaderActorContext);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
MessageCollectorActor.clearMessages(followerActor);
MessageCollectorActor.clearMessages(leaderActor);
}
@Test
- public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
leaderActorContext.setCommitIndex(leaderCommitIndex);
leaderActorContext.setLastApplied(leaderCommitIndex);
- ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
- ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
leader = new Leader(leaderActorContext);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
MessageCollectorActor.clearMessages(followerActor);
MessageCollectorActor.clearMessages(leaderActor);
}
@Test
- public void testHandleAppendEntriesReplyWithNewerTerm(){
+ public void testHandleAppendEntriesReplyWithNewerTerm() {
logStart("testHandleAppendEntriesReplyWithNewerTerm");
MockRaftActorContext leaderActorContext = createActorContext();
leaderActor.underlyingActor().setBehavior(leader);
leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
assertEquals(false, appendEntriesReply.isSuccess());
assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
}
@Test
- public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
+ public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
MockRaftActorContext leaderActorContext = createActorContext();
leaderActor.underlyingActor().setBehavior(leader);
leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
assertEquals(false, appendEntriesReply.isSuccess());
assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
- short payloadVersion = 5;
AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
}
@Test
- public void testHandleAppendEntriesReplyUnknownFollower(){
+ public void testHandleAppendEntriesReplyUnknownFollower() {
logStart("testHandleAppendEntriesReplyUnknownFollower");
MockRaftActorContext leaderActorContext = createActorContext();
leaderActorContext.setCommitIndex(leaderCommitIndex);
leaderActorContext.setLastApplied(leaderCommitIndex);
- ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
- ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
- ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
- ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+ final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+ final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
leader = new Leader(leaderActorContext);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
MessageCollectorActor.clearMessages(followerActor);
MessageCollectorActor.clearMessages(leaderActor);
leader.handleMessage(followerActor, appendEntriesReply);
- List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
+ AppendEntries.class, 2);
MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
appendEntries = appendEntriesList.get(0);
}
@Test
- public void testHandleRequestVoteReply(){
+ public void testHandleRequestVoteReply() {
logStart("testHandleRequestVoteReply");
MockRaftActorContext leaderActorContext = createActorContext();
MockRaftActorContext leaderActorContext = createActorContext();
leader = new Leader(leaderActorContext);
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
- assertTrue(behavior instanceof Leader);
+ RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue(newBehavior instanceof Leader);
}
@Test
leader = new Leader(leaderActorContext);
leader.getFollower(FOLLOWER_ID).markFollowerActive();
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
- assertTrue("Expected Leader", behavior instanceof Leader);
+ RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Expected Leader", newBehavior instanceof Leader);
}
- private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
leader.markFollowerActive("follower-1");
leader.markFollowerActive("follower-2");
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
- assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
+ RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
// kill 1 follower and verify if that got killed
final JavaTestKit probe = new JavaTestKit(getSystem());
leader.markFollowerInActive("follower-1");
leader.markFollowerActive("follower-2");
- behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
- assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
+ newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Behavior not instance of Leader when majority of followers are active",
+ newBehavior instanceof Leader);
// kill 2nd follower and leader should change to Isolated leader
followerActor2.tell(PoisonPill.getInstance(), null);
public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
logStart("testIsolatedLeaderCheckTwoFollowers");
- RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
+ RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
- behavior instanceof IsolatedLeader);
+ newBehavior instanceof IsolatedLeader);
}
@Test
public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
- RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
+ RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
- behavior instanceof Leader);
+ newBehavior instanceof Leader);
}
@Test
public void testLaggingFollowerStarvation() throws Exception {
logStart("testLaggingFollowerStarvation");
- new JavaTestKit(getSystem()) {{
- String leaderActorId = actorFactory.generateActorId("leader");
- String follower1ActorId = actorFactory.generateActorId("follower");
- String follower2ActorId = actorFactory.generateActorId("follower");
- TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
- actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
- ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
- ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+ final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
- configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
- leaderActorContext.setConfigParams(configParams);
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+ leaderActorContext.setConfigParams(configParams);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1ActorId,
- follower1Actor.path().toString());
- peerAddresses.put(follower2ActorId,
- follower2Actor.path().toString());
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
- leaderActorContext.setPeerAddresses(peerAddresses);
- leaderActorContext.getTermInformation().update(1, leaderActorId);
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
- RaftActorBehavior leader = createBehavior(leaderActorContext);
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getTermInformation().update(1, leaderActorId);
- leaderActor.underlyingActor().setBehavior(leader);
+ leader = createBehavior(leaderActorContext);
- for(int i=1;i<6;i++) {
- // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
- RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
- assertTrue(newBehavior == leader);
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
- }
+ leaderActor.underlyingActor().setBehavior(leader);
- // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
- List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+ for (int i = 1; i < 6; i++) {
+ // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
+ new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
+ assertTrue(newBehavior == leader);
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
- assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
- heartbeats.size() > 1);
+ // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+ List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
- // Check if follower-2 got AppendEntries during this time and was not starved
- List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+ assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+ heartbeats.size() > 1);
- assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
- appendEntries.size() > 1);
+ // Check if follower-2 got AppendEntries during this time and was not starved
+ List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
- }};
+ assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+ appendEntries.size() > 1);
}
@Test
TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
- leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+ leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
+ VotingState.NON_VOTING);
leader = new Leader(leaderActorContext);
leaderActorContext.setCurrentBehavior(leader);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
MessageCollectorActor.clearMessages(followerActor);
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
- getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
verify(mockTransferCohort, never()).transferComplete();
// Send heartbeats to time out the transfer.
- for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
- getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
private final long electionTimeOutIntervalMillis;
private final int snapshotChunkSize;
- public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+ MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
super();
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
this.snapshotChunkSize = snapshotChunkSize;
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+
import akka.actor.ActorRef;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
// term and return a RequestVoteReply but should not grant the vote.
candidateElectionTerm += 2;
- for(int i = 0; i < 2; i++) {
+ for (int i = 0; i < 2; i++) {
member1Actor.clear();
member1Actor.expectMessageClass(RequestVote.class, 1);
member2Actor.clear();
// Create member 3's behavior initially as a Candidate.
member3Context = newRaftActorContext("member3", member3ActorRef,
- ImmutableMap.<String,String>builder().
- put("member1", member1ActorRef.path().toString()).
- put("member2", member2ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member1", member1ActorRef.path().toString())
+ .put("member2", member2ActorRef.path().toString()).build());
DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
member3Context.setConfigParams(member3ConfigParams);
// start a new term so Candidate member 3's current term will be greater than the leader's
// current term.
- for(int i = 0; i < numCandidateElections - 1; i++) {
+ for (int i = 0; i < numCandidateElections - 1; i++) {
member3ActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
}
// Create member 2's behavior as Follower.
member2Context = newRaftActorContext("member2", member2ActorRef,
- ImmutableMap.<String,String>builder().
- put("member1", member1ActorRef.path().toString()).
- put("member3", member3ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member1", member1ActorRef.path().toString())
+ .put("member3", member3ActorRef.path().toString()).build());
DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
member2Context.setConfigParams(member2ConfigParams);
// Create member 1's behavior as Leader.
member1Context = newRaftActorContext("member1", member1ActorRef,
- ImmutableMap.<String,String>builder().
- put("member2", member2ActorRef.path().toString()).
- put("member3", member3ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member2", member2ActorRef.path().toString())
+ .put("member3", member3ActorRef.path().toString()).build());
DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
member1Context.setConfigParams(member1ConfigParams);
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+
import akka.actor.ActorRef;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
// Create member 2's behavior initially as Follower
member2Context = newRaftActorContext("member2", member2ActorRef,
- ImmutableMap.<String,String>builder().
- put("member1", member1ActorRef.path().toString()).
- put("member3", member3ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member1", member1ActorRef.path().toString())
+ .put("member3", member3ActorRef.path().toString()).build());
DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
member2Context.setConfigParams(member2ConfigParams);
// Create member 3's behavior initially as Follower
member3Context = newRaftActorContext("member3", member3ActorRef,
- ImmutableMap.<String,String>builder().
- put("member1", member1ActorRef.path().toString()).
- put("member2", member2ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member1", member1ActorRef.path().toString())
+ .put("member2", member2ActorRef.path().toString()).build());
DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
member3Context.setConfigParams(member3ConfigParams);
// Create member 1's behavior initially as Leader
member1Context = newRaftActorContext("member1", member1ActorRef,
- ImmutableMap.<String,String>builder().
- put("member2", member2ActorRef.path().toString()).
- put("member3", member3ActorRef.path().toString()).build());
+ ImmutableMap.<String,String>builder()
+ .put("member2", member2ActorRef.path().toString())
+ .put("member3", member3ActorRef.path().toString()).build());
DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
member1Context.setConfigParams(member1ConfigParams);
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
byte[] chunk3;
@Before
- public void setup(){
+ public void setup() {
data = new HashMap<>();
data.put("key1", "value1");
data.put("key2", "value2");
try {
tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
Assert.fail();
- } catch(SnapshotTracker.InvalidChunkException e){
+ } catch (SnapshotTracker.InvalidChunkException e) {
e.getMessage().startsWith("Invalid chunk");
}
try {
tracker3.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
Assert.fail();
- } catch(SnapshotTracker.InvalidChunkException e){
-
+ } catch (SnapshotTracker.InvalidChunkException e) {
+ // expected
}
// Out of sequence chunk indexes won't work
tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
try {
- tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+ tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 2, chunk2, Optional.<Integer>absent());
Assert.fail();
- } catch(SnapshotTracker.InvalidChunkException e){
-
+ } catch (SnapshotTracker.InvalidChunkException e) {
+ // expected
}
// No exceptions will be thrown when invalid chunk is added with the right sequence
// Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
Assert.fail();
- }catch(SnapshotTracker.InvalidChunkException e){
-
+ } catch (SnapshotTracker.InvalidChunkException e) {
+ // expected
}
}
try {
tracker1.getSnapshot();
Assert.fail();
- } catch(IllegalStateException e){
-
+ } catch (IllegalStateException e) {
+ // expected
}
SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader");
assertEquals(chunks, tracker1.getCollectedChunks());
}
- public byte[] getNextChunk (ByteString bs, int offset, int size){
+ public byte[] getNextChunk(ByteString bs, int offset, int size) {
int snapshotLength = bs.size();
int start = offset;
if (size > snapshotLength) {
}
private static ByteString toByteString(Map<String, String> state) {
- ByteArrayOutputStream b = null;
- ObjectOutputStream o = null;
+ ByteArrayOutputStream bos = null;
+ ObjectOutputStream os = null;
try {
try {
- b = new ByteArrayOutputStream();
- o = new ObjectOutputStream(b);
- o.writeObject(state);
- byte[] snapshotBytes = b.toByteArray();
+ bos = new ByteArrayOutputStream();
+ os = new ObjectOutputStream(bos);
+ os.writeObject(state);
+ byte[] snapshotBytes = bos.toByteArray();
return ByteString.copyFrom(snapshotBytes);
} finally {
- if (o != null) {
- o.flush();
- o.close();
+ if (os != null) {
+ os.flush();
+ os.close();
}
- if (b != null) {
- b.close();
+ if (bos != null) {
+ bos.close();
}
}
} catch (IOException e) {
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import akka.actor.Props;
import akka.testkit.TestActorRef;
import org.junit.After;
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("listener"));
@After
- public void tearDown(){
+ public void tearDown() {
actorFactory.close();
}
}
@Test
- public void testConstructorActorShouldNotBeNull(){
+ public void testConstructorActorShouldNotBeNull() {
try {
new SyncStatusTracker(null, "commit-tracker", 10);
fail("A NullPointerException was expected");
- } catch(NullPointerException e){
+ } catch (NullPointerException e) {
assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("actor "));
}
}
@Test
- public void testConstructorIdShouldNotBeNull(){
+ public void testConstructorIdShouldNotBeNull() {
try {
new SyncStatusTracker(listener, null, 10);
fail("A NullPointerException was expected");
- } catch(NullPointerException e){
+ } catch (NullPointerException e) {
assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("id "));
}
}
@Test
- public void testConstructorSyncThresholdShouldNotBeNegative(){
+ public void testConstructorSyncThresholdShouldNotBeNegative() {
try {
new SyncStatusTracker(listener, "commit-tracker", -1);
fail("An IllegalArgumentException was expected");
- } catch(IllegalArgumentException e){
+ } catch (IllegalArgumentException e) {
assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("syncThreshold "));
}
}
-
-}
\ No newline at end of file
+}
package org.opendaylight.controller.cluster.raft.messages;
import static org.junit.Assert.assertEquals;
+
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
package org.opendaylight.controller.cluster.raft.messages;
import static org.junit.Assert.assertEquals;
+
import java.util.Arrays;
import java.util.Iterator;
import org.apache.commons.lang.SerializationUtils;
package org.opendaylight.controller.cluster.raft.messages;
import static org.junit.Assert.assertEquals;
+
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+
import com.google.common.base.Optional;
import java.io.Serializable;
import java.util.Arrays;
@Test
public void testSerialization() {
byte[] data = new byte[1000];
- int j = 0;
- for(int i = 0; i < data.length; i++) {
+ for (int i = 0, j = 0; i < data.length; i++) {
data[i] = (byte)j;
- if(++j >= 255) {
+ if (++j >= 255) {
j = 0;
}
}
verifyInstallSnapshot(expected, actual);
expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6);
- actual = (InstallSnapshot) SerializationUtils.clone((Serializable) expected.toSerializable(RaftVersions.CURRENT_VERSION));
+ actual = (InstallSnapshot) SerializationUtils.clone((Serializable) expected.toSerializable(
+ RaftVersions.CURRENT_VERSION));
verifyInstallSnapshot(expected, actual);
}
assertEquals("getLastChunkHashCode present", expected.getLastChunkHashCode().isPresent(),
actual.getLastChunkHashCode().isPresent());
- if(expected.getLastChunkHashCode().isPresent()) {
+ if (expected.getLastChunkHashCode().isPresent()) {
assertEquals("getLastChunkHashCode", expected.getLastChunkHashCode().get(),
actual.getLastChunkHashCode().get());
}
assertEquals("getServerConfig present", expected.getServerConfig().isPresent(),
actual.getServerConfig().isPresent());
- if(expected.getServerConfig().isPresent()) {
+ if (expected.getServerConfig().isPresent()) {
assertEquals("getServerConfig", expected.getServerConfig().get().getServerConfig(),
actual.getServerConfig().get().getServerConfig());
}
package org.opendaylight.controller.cluster.raft.persisted;
import static org.junit.Assert.assertEquals;
+
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
package org.opendaylight.controller.cluster.raft.persisted;
import static org.junit.Assert.assertEquals;
+
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+
import java.util.Arrays;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true)));
assertTrue(expected.size() > 0);
}
-}
\ No newline at end of file
+}
package org.opendaylight.controller.cluster.raft.persisted;
import static org.junit.Assert.assertEquals;
+
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import akka.actor.UntypedActor;
-public class DoNothingActor extends UntypedActor{
+public class DoNothingActor extends UntypedActor {
@Override public void onReceive(Object message) throws Exception {
}
import akka.actor.UntypedActor;
/**
- * The EchoActor simply responds back with the same message that it receives
+ * The EchoActor simply responds back with the same message that it receives.
*/
public class EchoActor extends UntypedActor {
package org.opendaylight.controller.cluster.raft.utils;
import static org.junit.Assert.assertTrue;
+
import akka.actor.Props;
import java.util.ArrayList;
import java.util.List;
@Override
public void onReceive(Object message) throws Exception {
- if(behavior != null) {
+ if (behavior != null) {
behaviorChanges.add(behavior.handleMessage(sender(), message));
}
super.onReceive(message);
return Props.create(ForwardMessageToBehaviorActor.class);
}
- public void setBehavior(RaftActorBehavior behavior){
+ public void setBehavior(RaftActorBehavior behavior) {
this.behavior = behavior;
}
return behaviorChanges.get(behaviorChanges.size() - 1);
}
- public List<RaftActorBehavior> getBehaviorChanges(){
+ public List<RaftActorBehavior> getBehaviorChanges() {
return behaviorChanges;
}
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
final CountDownLatch latch;
final Class<?> ofType;
- public WriteMessagesComplete(int count, Class<?> ofType) {
+ WriteMessagesComplete(int count, Class<?> ofType) {
this.latch = new CountDownLatch(count);
this.ofType = ofType;
}
static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class);
- private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+ private static final Map<String, Map<Long, Object>> JOURNALS = new ConcurrentHashMap<>();
- private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> DELETE_MESSAGES_COMPLETE_LATCHES = new ConcurrentHashMap<>();
- private static final Map<String, WriteMessagesComplete> writeMessagesComplete = new ConcurrentHashMap<>();
+ private static final Map<String, WriteMessagesComplete> WRITE_MESSAGES_COMPLETE = new ConcurrentHashMap<>();
- private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> BLOCK_READ_MESSAGES_LATCHES = new ConcurrentHashMap<>();
private static Object deserialize(Object data) {
return data instanceof byte[] ? SerializationUtils.deserialize((byte[])data) : data;
}
public static void addEntry(String persistenceId, long sequenceNr, Object data) {
- Map<Long, Object> journal = journals.get(persistenceId);
- if(journal == null) {
+ Map<Long, Object> journal = JOURNALS.get(persistenceId);
+ if (journal == null) {
journal = Maps.newLinkedHashMap();
- journals.put(persistenceId, journal);
+ JOURNALS.put(persistenceId, journal);
}
synchronized (journal) {
- journal.put(sequenceNr, data instanceof Serializable ?
- SerializationUtils.serialize((Serializable) data) : data);
+ journal.put(sequenceNr, data instanceof Serializable
+ ? SerializationUtils.serialize((Serializable) data) : data);
}
}
public static void clear() {
- journals.clear();
+ JOURNALS.clear();
}
@SuppressWarnings("unchecked")
public static <T> List<T> get(String persistenceId, Class<T> type) {
- Map<Long, Object> journalMap = journals.get(persistenceId);
- if(journalMap == null) {
+ Map<Long, Object> journalMap = JOURNALS.get(persistenceId);
+ if (journalMap == null) {
return Collections.<T>emptyList();
}
synchronized (journalMap) {
List<T> journal = new ArrayList<>(journalMap.size());
- for(Object entry: journalMap.values()) {
+ for (Object entry: journalMap.values()) {
Object data = deserialize(entry);
- if(type.isInstance(data)) {
+ if (type.isInstance(data)) {
journal.add((T) data);
}
}
}
public static Map<Long, Object> get(String persistenceId) {
- Map<Long, Object> journalMap = journals.get(persistenceId);
+ Map<Long, Object> journalMap = JOURNALS.get(persistenceId);
return journalMap != null ? journalMap : Collections.<Long, Object>emptyMap();
}
public static void dumpJournal(String persistenceId) {
StringBuilder builder = new StringBuilder(String.format("Journal log for %s:", persistenceId));
- Map<Long, Object> journalMap = journals.get(persistenceId);
- if(journalMap != null) {
+ Map<Long, Object> journalMap = JOURNALS.get(persistenceId);
+ if (journalMap != null) {
synchronized (journalMap) {
- for(Map.Entry<Long, Object> e: journalMap.entrySet()) {
+ for (Map.Entry<Long, Object> e: journalMap.entrySet()) {
builder.append("\n ").append(e.getKey()).append(" = ").append(e.getValue());
}
}
}
public static void waitForDeleteMessagesComplete(String persistenceId) {
- if(!Uninterruptibles.awaitUninterruptibly(deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+ if (!Uninterruptibles.awaitUninterruptibly(DELETE_MESSAGES_COMPLETE_LATCHES.get(persistenceId),
+ 5, TimeUnit.SECONDS)) {
throw new AssertionError("Delete messages did not complete");
}
}
public static void waitForWriteMessagesComplete(String persistenceId) {
- if(!Uninterruptibles.awaitUninterruptibly(writeMessagesComplete.get(persistenceId).latch, 5, TimeUnit.SECONDS)) {
+ if (!Uninterruptibles.awaitUninterruptibly(WRITE_MESSAGES_COMPLETE.get(persistenceId).latch,
+ 5, TimeUnit.SECONDS)) {
throw new AssertionError("Journal write messages did not complete");
}
}
public static void addDeleteMessagesCompleteLatch(String persistenceId) {
- deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
+ DELETE_MESSAGES_COMPLETE_LATCHES.put(persistenceId, new CountDownLatch(1));
}
public static void addWriteMessagesCompleteLatch(String persistenceId, int count) {
- writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, null));
+ WRITE_MESSAGES_COMPLETE.put(persistenceId, new WriteMessagesComplete(count, null));
}
public static void addWriteMessagesCompleteLatch(String persistenceId, int count, Class<?> ofType) {
- writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, ofType));
+ WRITE_MESSAGES_COMPLETE.put(persistenceId, new WriteMessagesComplete(count, ofType));
}
public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
- blockReadMessagesLatches.put(persistenceId, latch);
+ BLOCK_READ_MESSAGES_LATCHES.put(persistenceId, latch);
}
@Override
final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
LOG.trace("doAsyncReplayMessages for {}: fromSequenceNr: {}, toSequenceNr: {}", persistenceId,
fromSequenceNr,toSequenceNr);
- return Futures.future(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId);
- if(blockLatch != null) {
- Uninterruptibles.awaitUninterruptibly(blockLatch);
- }
+ return Futures.future(() -> {
+ CountDownLatch blockLatch = BLOCK_READ_MESSAGES_LATCHES.remove(persistenceId);
+ if (blockLatch != null) {
+ Uninterruptibles.awaitUninterruptibly(blockLatch);
+ }
- Map<Long, Object> journal = journals.get(persistenceId);
- if (journal == null) {
- return null;
- }
+ Map<Long, Object> journal = JOURNALS.get(persistenceId);
+ if (journal == null) {
+ return null;
+ }
- synchronized (journal) {
- int count = 0;
- for (Map.Entry<Long,Object> entry : journal.entrySet()) {
- if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) {
- PersistentRepr persistentMessage =
- new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId,
- null, false, null, null);
- replayCallback.accept(persistentMessage);
- }
+ synchronized (journal) {
+ int count = 0;
+ for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+ if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) {
+ PersistentRepr persistentMessage =
+ new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId,
+ null, false, null, null);
+ replayCallback.accept(persistentMessage);
}
}
-
- return null;
}
+
+ return null;
}, context().dispatcher());
}
LOG.trace("doAsyncReadHighestSequenceNr for {}: fromSequenceNr: {}", persistenceId, fromSequenceNr);
// Akka calls this during recovery.
- Map<Long, Object> journal = journals.get(persistenceId);
- if(journal == null) {
+ Map<Long, Object> journal = JOURNALS.get(persistenceId);
+ if (journal == null) {
return Futures.successful(fromSequenceNr);
}
synchronized (journal) {
long highest = -1;
for (Long seqNr : journal.keySet()) {
- if(seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) {
+ if (seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) {
highest = seqNr.longValue();
}
}
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
- return Futures.future(new Callable<Iterable<Optional<Exception>>>() {
- @Override
- public Iterable<Optional<Exception>> call() throws Exception {
- for (AtomicWrite write : messages) {
- // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc
- PersistentRepr[] array = new PersistentRepr[write.payload().size()];
- write.payload().copyToArray(array);
- for(PersistentRepr repr: array) {
- LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
- repr.sequenceNr(), repr.payload());
-
- addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
-
- WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId());
- if(complete != null) {
- if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
- complete.latch.countDown();
- }
+ return Futures.future(() -> {
+ for (AtomicWrite write : messages) {
+ // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc
+ PersistentRepr[] array = new PersistentRepr[write.payload().size()];
+ write.payload().copyToArray(array);
+ for (PersistentRepr repr: array) {
+ LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
+ repr.sequenceNr(), repr.payload());
+
+ addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
+
+ WriteMessagesComplete complete = WRITE_MESSAGES_COMPLETE.get(repr.persistenceId());
+ if (complete != null) {
+ if (complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
+ complete.latch.countDown();
}
}
}
-
- return Collections.emptyList();
}
+
+ return Collections.emptyList();
}, context().dispatcher());
}
@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
- Map<Long, Object> journal = journals.get(persistenceId);
- if(journal != null) {
+ Map<Long, Object> journal = JOURNALS.get(persistenceId);
+ if (journal != null) {
synchronized (journal) {
Iterator<Long> iter = journal.keySet().iterator();
- while(iter.hasNext()) {
- Long n = iter.next();
- if(n <= toSequenceNr) {
+ while (iter.hasNext()) {
+ Long num = iter.next();
+ if (num <= toSequenceNr) {
iter.remove();
}
}
}
}
- CountDownLatch latch = deleteMessagesCompleteLatches.get(persistenceId);
- if(latch != null) {
+ CountDownLatch latch = DELETE_MESSAGES_COMPLETE_LATCHES.get(persistenceId);
+ if (latch != null) {
latch.countDown();
}
static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class);
+ private static final Map<String, CountDownLatch> SNAPSHOT_SAVED_LATCHES = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> SNAPSHOT_DELETED_LATCHES = new ConcurrentHashMap<>();
private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
- private static final Map<String, CountDownLatch> snapshotSavedLatches = new ConcurrentHashMap<>();
- private static final Map<String, CountDownLatch> snapshotDeletedLatches = new ConcurrentHashMap<>();
public static void addSnapshot(String persistentId, Object snapshot) {
List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
- if(snapshotList == null) {
+ if (snapshotList == null) {
snapshotList = new ArrayList<>();
snapshots.put(persistentId, snapshotList);
}
@SuppressWarnings("unchecked")
public static <T> List<T> getSnapshots(String persistentId, Class<T> type) {
List<StoredSnapshot> stored = snapshots.get(persistentId);
- if(stored == null) {
+ if (stored == null) {
return Collections.emptyList();
}
List<T> retList;
synchronized (stored) {
retList = Lists.newArrayListWithCapacity(stored.size());
- for(StoredSnapshot s: stored) {
- if(type.isInstance(s.data)) {
+ for (StoredSnapshot s: stored) {
+ if (type.isInstance(s.data)) {
retList.add((T) s.data);
}
}
}
public static void addSnapshotSavedLatch(String persistenceId) {
- snapshotSavedLatches.put(persistenceId, new CountDownLatch(1));
+ SNAPSHOT_SAVED_LATCHES.put(persistenceId, new CountDownLatch(1));
}
public static void addSnapshotDeletedLatch(String persistenceId) {
- snapshotDeletedLatches.put(persistenceId, new CountDownLatch(1));
+ SNAPSHOT_DELETED_LATCHES.put(persistenceId, new CountDownLatch(1));
}
public static <T> T waitForSavedSnapshot(String persistenceId, Class<T> type) {
- if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+ if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_SAVED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) {
throw new AssertionError("Snapshot was not saved");
}
}
public static void waitForDeletedSnapshot(String persistenceId) {
- if(!Uninterruptibles.awaitUninterruptibly(snapshotDeletedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+ if (!Uninterruptibles.awaitUninterruptibly(SNAPSHOT_DELETED_LATCHES.get(persistenceId), 5, TimeUnit.SECONDS)) {
throw new AssertionError("Snapshot was not deleted");
}
}
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId,
SnapshotSelectionCriteria snapshotSelectionCriteria) {
List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
- if(snapshotList == null){
+ if (snapshotList == null) {
return Futures.successful(Optional.<SelectedSnapshot>empty());
}
- synchronized(snapshotList) {
- for(int i = snapshotList.size() - 1; i >= 0; i--) {
+ synchronized (snapshotList) {
+ for (int i = snapshotList.size() - 1; i >= 0; i--) {
StoredSnapshot snapshot = snapshotList.get(i);
- if(matches(snapshot, snapshotSelectionCriteria)) {
+ if (matches(snapshot, snapshotSelectionCriteria)) {
return Futures.successful(Optional.of(new SelectedSnapshot(snapshot.metadata,
snapshot.data)));
}
}
private static boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) {
- return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr() &&
- snapshot.metadata.timestamp() <= criteria.maxTimestamp();
+ return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr()
+ && snapshot.metadata.timestamp() <= criteria.maxTimestamp();
}
@Override
- public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+ public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object obj) {
List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(),
- snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), o);
+ snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), obj);
- if(snapshotList == null){
+ if (snapshotList == null) {
snapshotList = new ArrayList<>();
snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
}
synchronized (snapshotList) {
- snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
+ snapshotList.add(new StoredSnapshot(snapshotMetadata, obj));
}
- CountDownLatch latch = snapshotSavedLatches.get(snapshotMetadata.persistenceId());
- if(latch != null) {
+ CountDownLatch latch = SNAPSHOT_SAVED_LATCHES.get(snapshotMetadata.persistenceId());
+ if (latch != null) {
latch.countDown();
}
if (snapshotList != null) {
synchronized (snapshotList) {
- for(int i=0;i<snapshotList.size(); i++){
+ for (int i = 0; i < snapshotList.size(); i++) {
StoredSnapshot snapshot = snapshotList.get(i);
- if(metadata.equals(snapshot.metadata)){
+ if (metadata.equals(snapshot.metadata)) {
snapshotList.remove(i);
break;
}
criteria.maxSequenceNr(), criteria.maxTimestamp());
List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
- if(snapshotList != null){
+ if (snapshotList != null) {
synchronized (snapshotList) {
Iterator<StoredSnapshot> iter = snapshotList.iterator();
- while(iter.hasNext()) {
- StoredSnapshot s = iter.next();
- if(matches(s, criteria)) {
+ while (iter.hasNext()) {
+ StoredSnapshot stored = iter.next();
+ if (matches(stored, criteria)) {
LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}",
- s.metadata.sequenceNr(), s.metadata.timestamp(), s.data);
+ stored.metadata.sequenceNr(), stored.metadata.timestamp(), stored.data);
iter.remove();
}
}
}
- CountDownLatch latch = snapshotDeletedLatches.get(persistenceId);
- if(latch != null) {
+ CountDownLatch latch = SNAPSHOT_DELETED_LATCHES.get(persistenceId);
+ if (latch != null) {
latch.countDown();
}
import akka.util.Timeout;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-
public class MessageCollectorActor extends UntypedActor {
private static final String ARE_YOU_READY = "ARE_YOU_READY";
public static final String GET_ALL_MESSAGES = "messages";
private final List<Object> messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
- if(message.equals(ARE_YOU_READY)) {
+ if (message.equals(ARE_YOU_READY)) {
getSender().tell("yes", getSelf());
return;
}
- if(GET_ALL_MESSAGES.equals(message)) {
+ if (GET_ALL_MESSAGES.equals(message)) {
getSender().tell(new ArrayList<>(messages), getSelf());
- } else if(CLEAR_MESSAGES.equals(message)) {
+ } else if (CLEAR_MESSAGES.equals(message)) {
clear();
- } else if(message != null) {
+ } else if (message != null) {
messages.add(message);
}
}
messages.clear();
}
+ @SuppressWarnings("unchecked")
private static List<Object> getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
}
/**
- * Get the first message that matches the specified class
- * @param actor
- * @param clazz
- * @return
+ * Get the first message that matches the specified class.
+ *
+ * @param actor the MessageCollectorActor reference
+ * @param clazz the class to match
+ * @return the first matching message
*/
public static <T> T getFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
List<Object> allMessages = getAllMessages(actor);
- for(Object message : allMessages){
- if(message.getClass().equals(clazz)){
+ for (Object message : allMessages) {
+ if (message.getClass().equals(clazz)) {
return clazz.cast(message);
}
}
return null;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count) {
return expectMatching(actor, clazz, count, msg -> true);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count,
Predicate<T> matcher) {
int timeout = 5000;
+ Exception lastEx = null;
List<T> messages = Collections.emptyList();
- for(int i = 0; i < timeout / 50; i++) {
+ for (int i = 0; i < timeout / 50; i++) {
try {
messages = getAllMatching(actor, clazz);
Iterables.removeIf(messages, Predicates.not(matcher));
- if(messages.size() >= count) {
+ if (messages.size() >= count) {
return messages;
}
- } catch (Exception e) {}
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- Assert.fail(String.format("Expected %d messages of type %s. Actual received was %d: %s", count, clazz,
- messages.size(), messages));
- return null;
+ throw new AssertionError(String.format("Expected %d messages of type %s. Actual received was %d: %s", count,
+ clazz, messages.size(), messages), lastEx);
}
public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
return expectFirstMatching(actor, clazz, 5000);
}
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, long timeout) {
+ Exception lastEx = null;
int count = (int) (timeout / 50);
- for(int i = 0; i < count; i++) {
+ for (int i = 0; i < count; i++) {
try {
T message = getFirstMatching(actor, clazz);
- if(message != null) {
+ if (message != null) {
return message;
}
- } catch (Exception e) {}
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- Assert.fail("Did not receive message of type " + clazz);
- return null;
+ throw new AssertionError("Did not receive message of type " + clazz, lastEx);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, Predicate<T> matcher) {
int timeout = 5000;
+ Exception lastEx = null;
T lastMessage = null;
- for(int i = 0; i < timeout / 50; i++) {
+ for (int i = 0; i < timeout / 50; i++) {
try {
List<T> messages = getAllMatching(actor, clazz);
- for(T msg: messages) {
- if(matcher.apply(msg)) {
+ for (T msg : messages) {
+ if (matcher.apply(msg)) {
return msg;
}
lastMessage = msg;
}
- } catch (Exception e) {}
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- Assert.fail(String.format("Expected specific message of type %s. Last message received was: %s", clazz, lastMessage));
- return null;
+ throw new AssertionError(String.format("Expected specific message of type %s. Last message received was: %s",
+ clazz, lastMessage), lastEx);
}
public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz) {
assertNoneMatching(actor, clazz, 5000);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz, long timeout) {
+ Exception lastEx = null;
int count = (int) (timeout / 50);
- for(int i = 0; i < count; i++) {
+ for (int i = 0; i < count; i++) {
try {
T message = getFirstMatching(actor, clazz);
- if(message != null) {
+ if (message != null) {
Assert.fail("Unexpected message received" + message.toString());
return;
}
- } catch (Exception e) {}
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
+ if (lastEx != null) {
+ Throwables.propagate(lastEx);
+ }
+
return;
}
List<T> output = Lists.newArrayList();
- for(Object message : allMessages){
- if(message.getClass().equals(clazz)){
+ for (Object message : allMessages) {
+ if (message.getClass().equals(clazz)) {
output.add(clazz.cast(message));
}
}
public static void waitUntilReady(ActorRef actor) throws Exception {
long timeout = 500;
FiniteDuration duration = Duration.create(timeout, TimeUnit.MILLISECONDS);
- for(int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
try {
Await.ready(Patterns.ask(actor, ARE_YOU_READY, timeout), duration);
return;
} catch (TimeoutException e) {
+ // will fall through below
}
}