*/
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
+import akka.actor.InvalidActorNameException;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActorTest.MockRaftActor;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
*/
public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest {
+ private static final class MockIdentifier extends AbstractStringIdentifier<MockIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ protected MockIdentifier(String string) {
+ super(string);
+ }
+ }
+
+ public static class SetPeerAddress {
+ private final String peerId;
+ private final String peerAddress;
+
+ public SetPeerAddress(String peerId, String peerAddress) {
+ this.peerId = peerId;
+ this.peerAddress = peerAddress;
+ }
+
+ public String getPeerId() {
+ return peerId;
+ }
+
+ public String getPeerAddress() {
+ return peerAddress;
+ }
+ }
+
public static class TestRaftActor extends MockRaftActor {
private final TestActorRef<MessageCollectorActor> collectorActor;
private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
- private volatile byte[] snapshot;
- private volatile long mockTotalMemory;
-
- private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- super(id, peerAddresses, Optional.of(config), null);
- dataPersistenceProvider = new PersistentDataProvider();
- this.collectorActor = collectorActor;
- }
- public static Props props(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- return Props.create(TestRaftActor.class, id, peerAddresses, config, collectorActor).
- withDispatcher(Dispatchers.DefaultDispatcherId());
+ private TestRaftActor(Builder builder) {
+ super(builder);
+ this.collectorActor = builder.collectorActor;
}
void startDropMessages(Class<?> msgClass) {
dropMessages.remove(msgClass);
}
- void setMockTotalMemory(long mockTotalMemory) {
- this.mockTotalMemory = mockTotalMemory;
- }
-
- @Override
- protected long getTotalMemory() {
- return mockTotalMemory > 0 ? mockTotalMemory : super.getTotalMemory();
+ void setMockTotalMemory(final long mockTotalMemory) {
+ getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
}
@Override
public void handleCommand(Object message) {
if(message instanceof MockPayload) {
MockPayload payload = (MockPayload)message;
- super.persistData(collectorActor, payload.toString(), payload);
+ super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload);
+ return;
+ }
+
+ if(message instanceof SetPeerAddress) {
+ setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
+ ((SetPeerAddress) message).getPeerAddress());
return;
}
}
@Override
- protected void createSnapshot() {
- if(snapshot != null) {
- getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+ public void createSnapshot(ActorRef actorRef) {
+ try {
+ actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- @Override
- protected void applyRecoverySnapshot(byte[] bytes) {
+ public ActorRef collectorActor() {
+ return collectorActor;
}
- void setSnapshot(byte[] snapshot) {
- this.snapshot = snapshot;
+ public static Builder newBuilder() {
+ return new Builder();
}
- public ActorRef collectorActor() {
- return collectorActor;
+ public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
+ private TestActorRef<MessageCollectorActor> collectorActor;
+
+ public Builder collectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+ this.collectorActor = collectorActor;
+ return this;
+ }
+
+ private Builder() {
+ super(TestRaftActor.class);
+ }
}
}
+ protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected final TestActorFactory factory = new TestActorFactory(getSystem());
protected long initialTerm = 5;
protected long currentTerm;
+ protected int snapshotBatchCount = 4;
+
+ protected List<MockPayload> expSnapshotState = new ArrayList<>();
+
@After
public void tearDown() {
InMemoryJournal.clear();
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(1);
- configParams.setSnapshotBatchCount(4);
+ configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
return configParams;
}
}
protected void waitUntilLeader(ActorRef actorRef) {
- RaftActorTest.RaftActorTestKit.waitUntilLeader(actorRef);
+ RaftActorTestKit.waitUntilLeader(actorRef);
}
protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
ConfigParams configParams) {
- TestActorRef<MessageCollectorActor> collectorActor = factory.createTestActor(
+ return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses :
+ Collections.<String, String>emptyMap()).config(configParams));
+ }
+
+ protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) {
+ builder.collectorActor(factory.<MessageCollectorActor>createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- factory.generateActorId(id + "-collector"));
- return factory.createTestActor(TestRaftActor.props(id,
- peerAddresses != null ? peerAddresses : Collections.<String, String>emptyMap(),
- configParams, collectorActor), id);
+ factory.generateActorId(id + "-collector"))).id(id);
+
+ InvalidActorNameException lastEx = null;
+ for(int i = 0; i < 10; i++) {
+ try {
+ return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ } catch (InvalidActorNameException e) {
+ lastEx = e;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ assertNotNull(lastEx);
+ throw lastEx;
}
protected void killActor(TestActorRef<TestRaftActor> leaderActor) {
}
protected void verifyApplyJournalEntries(ActorRef actor, final long expIndex) {
- MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, new Predicate<ApplyJournalEntries>() {
- @Override
- public boolean apply(ApplyJournalEntries msg) {
- return msg.getToIndex() == expIndex;
- }
- });
+ MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, msg -> msg.getToIndex() == expIndex);
}
+ @SuppressWarnings("unchecked")
protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
- int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+ long lastAppliedIndex, long lastTerm, long lastIndex)
+ throws Exception {
assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
- assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+ List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+ assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState,
+ actualState), expSnapshotState.size(), actualState.size());
+ for(int i = 0; i < expSnapshotState.size(); i++) {
+ assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+ }
}
protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor,
String expId, long expTerm, long expIndex, MockPayload payload) {
assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor());
- assertEquals("ApplyState getIdentifier", expId, applyState.getIdentifier());
+
+ final Identifier id = expId == null ? null : new MockIdentifier(expId);
+ assertEquals("ApplyState getIdentifier", id, applyState.getIdentifier());
ReplicatedLogEntry replicatedLogEntry = applyState.getReplicatedLogEntry();
verifyReplicatedLogEntry(replicatedLogEntry, expTerm, expIndex, payload);
}
}
protected String testActorPath(String id){
- return "akka://test/user" + id;
+ return factory.createTestActorPath(id);
+ }
+
+ protected void verifyLeadersTrimmedLog(long lastIndex) {
+ verifyTrimmedLog("Leader", leaderActor, lastIndex, lastIndex - 1);
+ }
+
+ protected void verifyLeadersTrimmedLog(long lastIndex, long replicatedToAllIndex) {
+ verifyTrimmedLog("Leader", leaderActor, lastIndex, replicatedToAllIndex);
+ }
+
+ protected void verifyFollowersTrimmedLog(int num, TestActorRef<TestRaftActor> actorRef, long lastIndex) {
+ verifyTrimmedLog("Follower " + num, actorRef, lastIndex, lastIndex - 1);
+ }
+
+ protected void verifyTrimmedLog(String name, TestActorRef<TestRaftActor> actorRef, long lastIndex,
+ long replicatedToAllIndex) {
+ TestRaftActor actor = actorRef.underlyingActor();
+ RaftActorContext context = actor.getRaftActorContext();
+ long snapshotIndex = lastIndex - 1;
+ assertEquals(name + " snapshot term", snapshotIndex < 0 ? -1 : currentTerm,
+ context.getReplicatedLog().getSnapshotTerm());
+ assertEquals(name + " snapshot index", snapshotIndex, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals(name + " journal log size", 1, context.getReplicatedLog().size());
+ assertEquals(name + " journal last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals(name + " commit index", lastIndex, context.getCommitIndex());
+ assertEquals(name + " last applied", lastIndex, context.getLastApplied());
+ assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
+ actor.getCurrentBehavior().getReplicatedToAllIndex());
}
}