*/
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
+import akka.actor.InvalidActorNameException;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActorTest.MockRaftActor;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
*/
public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest {
+ public static class SetPeerAddress {
+ private final String peerId;
+ private final String peerAddress;
+
+ public SetPeerAddress(String peerId, String peerAddress) {
+ this.peerId = peerId;
+ this.peerAddress = peerAddress;
+ }
+
+ public String getPeerId() {
+ return peerId;
+ }
+
+ public String getPeerAddress() {
+ return peerAddress;
+ }
+ }
+
public static class TestRaftActor extends MockRaftActor {
private final TestActorRef<MessageCollectorActor> collectorActor;
private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
- private volatile byte[] snapshot;
- private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- super(id, peerAddresses, Optional.of(config), null);
- this.collectorActor = collectorActor;
- }
-
- public static Props props(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- return Props.create(TestRaftActor.class, id, peerAddresses, config, collectorActor).
- withDispatcher(Dispatchers.DefaultDispatcherId());
+ private TestRaftActor(Builder builder) {
+ super(builder);
+ this.collectorActor = builder.collectorActor;
}
void startDropMessages(Class<?> msgClass) {
return;
}
+ if(message instanceof SetPeerAddress) {
+ setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
+ ((SetPeerAddress) message).getPeerAddress());
+ return;
+ }
+
try {
if(!dropMessages.containsKey(message.getClass())) {
super.handleCommand(message);
}
@Override
- protected void createSnapshot() {
- if(snapshot != null) {
- getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+ public void createSnapshot(ActorRef actorRef) {
+ try {
+ actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- @Override
- protected void applyRecoverySnapshot(byte[] bytes) {
+ public ActorRef collectorActor() {
+ return collectorActor;
}
- void setSnapshot(byte[] snapshot) {
- this.snapshot = snapshot;
+ public static Builder newBuilder() {
+ return new Builder();
}
- public ActorRef collectorActor() {
- return collectorActor;
+ public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
+ private TestActorRef<MessageCollectorActor> collectorActor;
+
+ public Builder collectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+ this.collectorActor = collectorActor;
+ return this;
+ }
+
+ private Builder() {
+ super(TestRaftActor.class);
+ }
}
}
protected long initialTerm = 5;
protected long currentTerm;
+ protected int snapshotBatchCount = 4;
+
+ protected List<MockPayload> expSnapshotState = new ArrayList<>();
+
@After
public void tearDown() {
InMemoryJournal.clear();
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(1);
- configParams.setSnapshotBatchCount(4);
+ configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
return configParams;
}
protected void waitUntilLeader(ActorRef actorRef) {
- RaftActorTest.RaftActorTestKit.waitUntilLeader(actorRef);
+ RaftActorTestKit.waitUntilLeader(actorRef);
}
protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
ConfigParams configParams) {
- TestActorRef<MessageCollectorActor> collectorActor = factory.createTestActor(
+ return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses :
+ Collections.<String, String>emptyMap()).config(configParams));
+ }
+
+ protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) {
+ builder.collectorActor(factory.<MessageCollectorActor>createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- factory.generateActorId(id + "-collector"));
- return factory.createTestActor(TestRaftActor.props(id,
- peerAddresses != null ? peerAddresses : Collections.<String, String>emptyMap(),
- configParams, collectorActor), id);
+ factory.generateActorId(id + "-collector"))).id(id);
+
+ InvalidActorNameException lastEx = null;
+ for(int i = 0; i < 10; i++) {
+ try {
+ return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ } catch (InvalidActorNameException e) {
+ lastEx = e;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ assertNotNull(lastEx);
+ throw lastEx;
}
protected void killActor(TestActorRef<TestRaftActor> leaderActor) {
});
}
+ @SuppressWarnings("unchecked")
protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
- int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+ long lastAppliedIndex, long lastTerm, long lastIndex)
+ throws Exception {
assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
- assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+ List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+ assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState,
+ actualState), expSnapshotState.size(), actualState.size());
+ for(int i = 0; i < expSnapshotState.size(); i++) {
+ assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+ }
}
protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
}
protected String testActorPath(String id){
- return "akka://test/user" + id;
+ return factory.createTestActorPath(id);
+ }
+
+ protected void verifyLeadersTrimmedLog(long lastIndex) {
+ verifyTrimmedLog("Leader", leaderActor, lastIndex, lastIndex - 1);
+ }
+
+ protected void verifyLeadersTrimmedLog(long lastIndex, long replicatedToAllIndex) {
+ verifyTrimmedLog("Leader", leaderActor, lastIndex, replicatedToAllIndex);
+ }
+
+ protected void verifyFollowersTrimmedLog(int num, TestActorRef<TestRaftActor> actorRef, long lastIndex) {
+ verifyTrimmedLog("Follower " + num, actorRef, lastIndex, lastIndex - 1);
+ }
+
+ protected void verifyTrimmedLog(String name, TestActorRef<TestRaftActor> actorRef, long lastIndex,
+ long replicatedToAllIndex) {
+ TestRaftActor actor = actorRef.underlyingActor();
+ RaftActorContext context = actor.getRaftActorContext();
+ long snapshotIndex = lastIndex - 1;
+ assertEquals(name + " snapshot term", snapshotIndex < 0 ? -1 : currentTerm,
+ context.getReplicatedLog().getSnapshotTerm());
+ assertEquals(name + " snapshot index", snapshotIndex, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals(name + " journal log size", 1, context.getReplicatedLog().size());
+ assertEquals(name + " journal last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals(name + " commit index", lastIndex, context.getCommitIndex());
+ assertEquals(name + " last applied", lastIndex, context.getLastApplied());
+ assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
+ actor.getCurrentBehavior().getReplicatedToAllIndex());
}
}