import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.GeneratedMessage;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MockRaftActorContext implements RaftActorContext {
+public class MockRaftActorContext extends RaftActorContextImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(MockRaftActorContext.class);
- private String id;
private ActorSystem system;
- private ActorRef actor;
- private long index = 0;
- private long lastApplied = 0;
- private final ElectionTerm electionTerm;
- private ReplicatedLog replicatedLog;
- private Map<String, String> peerAddresses = new HashMap<>();
- private ConfigParams configParams;
- private boolean snapshotCaptureInitiated;
- private SnapshotManager snapshotManager;
-
- public MockRaftActorContext(){
- electionTerm = new ElectionTerm() {
+ private RaftPolicy raftPolicy;
+ private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
+
+ private static ElectionTerm newElectionTerm() {
+ return new ElectionTerm() {
private long currentTerm = 1;
private String votedFor = "";
}
@Override
- public void update(long currentTerm, String votedFor){
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
+ public void update(final long newTerm, final String newVotedFor) {
+ this.currentTerm = newTerm;
+ this.votedFor = newVotedFor;
// TODO : Write to some persistent state
}
- @Override public void updateAndPersist(long currentTerm,
- String votedFor) {
- update(currentTerm, votedFor);
+ @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
+ update(newTerm, newVotedFor);
}
};
+ }
+
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
- configParams = new DefaultConfigParamsImpl();
+ public MockRaftActorContext() {
+ super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
+ MoreExecutors.directExecutor());
+ setReplicatedLog(new MockReplicatedLogBuilder().build());
}
- public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
- this();
- this.id = id;
+ public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
+ super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
+ MoreExecutors.directExecutor());
+
this.system = system;
- this.actor = actor;
initReplicatedLog();
}
- public void initReplicatedLog(){
- this.replicatedLog = new SimpleReplicatedLog();
+ public void initReplicatedLog() {
+ SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
long term = getTermInformation().getCurrentTerm();
- this.replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
- this.replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
+ setReplicatedLog(replicatedLog);
+ setCommitIndex(replicatedLog.lastIndex());
+ setLastApplied(replicatedLog.lastIndex());
}
- @Override public ActorRef actorOf(Props props) {
+ @Override public ActorRef actorOf(final Props props) {
return system.actorOf(props);
}
- @Override public ActorSelection actorSelection(String path) {
+ @Override public ActorSelection actorSelection(final String path) {
return system.actorSelection(path);
}
- @Override public String getId() {
- return id;
- }
-
- @Override public ActorRef getActor() {
- return actor;
- }
-
- @Override public ElectionTerm getTermInformation() {
- return electionTerm;
- }
-
- public void setIndex(long index){
- this.index = index;
- }
-
- @Override public long getCommitIndex() {
- return index;
+ @Override public ActorSystem getActorSystem() {
+ return this.system;
}
- @Override public void setCommitIndex(long commitIndex) {
- this.index = commitIndex;
+ @Override public ActorSelection getPeerActorSelection(final String peerId) {
+ String peerAddress = getPeerAddress(peerId);
+ if (peerAddress != null) {
+ return actorSelection(peerAddress);
+ }
+ return null;
}
- @Override public void setLastApplied(long lastApplied){
- this.lastApplied = lastApplied;
- }
+ public void setPeerAddresses(final Map<String, String> peerAddresses) {
+ for (String id: getPeerIds()) {
+ removePeer(id);
+ }
- @Override public long getLastApplied() {
- return lastApplied;
+ for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
+ addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
+ }
}
@Override
- // FIXME : A lot of tests try to manipulate the replicated log by setting it using this method
- // This is OK to do if the underlyingActor is not RafActor or a derived class. If not then you should not
- // used this way to manipulate the log because the RaftActor actually has a field replicatedLog
- // which it creates internally and sets on the RaftActorContext
- // The only right way to manipulate the replicated log therefore is to get it from either the RaftActor
- // or the RaftActorContext and modify the entries in there instead of trying to replace it by using this setter
- // Simple assertion that will fail if you do so
- // ReplicatedLog log = new ReplicatedLogImpl();
- // raftActor.underlyingActor().getRaftActorContext().setReplicatedLog(log);
- // assertEquals(log, raftActor.underlyingActor().getReplicatedLog())
- public void setReplicatedLog(ReplicatedLog replicatedLog) {
- this.replicatedLog = replicatedLog;
- }
+ public SnapshotManager getSnapshotManager() {
+ SnapshotManager snapshotManager = super.getSnapshotManager();
+ snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
- @Override public ReplicatedLog getReplicatedLog() {
- return replicatedLog;
- }
+ snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
+ @Override
+ public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
+ return ByteState.of(snapshotBytes.read());
+ }
- @Override public ActorSystem getActorSystem() {
- return this.system;
- }
+ @Override
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
+ }
- @Override public Logger getLogger() {
- return LoggerFactory.getLogger(getClass());
- }
+ @Override
+ public void applySnapshot(final State snapshotState) {
+ }
+ });
- @Override public Map<String, String> getPeerAddresses() {
- return peerAddresses;
+ return snapshotManager;
}
- @Override public String getPeerAddress(String peerId) {
- return peerAddresses.get(peerId);
+ public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+ this.createSnapshotProcedure = createSnapshotProcedure;
}
- @Override public void addToPeers(String name, String address) {
- peerAddresses.put(name, address);
+ @Override
+ public RaftPolicy getRaftPolicy() {
+ return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
}
- @Override public void removePeer(String name) {
- peerAddresses.remove(name);
+ public void setRaftPolicy(final RaftPolicy raftPolicy) {
+ this.raftPolicy = raftPolicy;
}
- @Override public ActorSelection getPeerActorSelection(String peerId) {
- String peerAddress = getPeerAddress(peerId);
- if(peerAddress != null){
- return actorSelection(peerAddress);
+ public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
+ @Override
+ public int dataSize() {
+ return -1;
}
- return null;
- }
-
- @Override public void setPeerAddress(String peerId, String peerAddress) {
- Preconditions.checkState(peerAddresses.containsKey(peerId));
- peerAddresses.put(peerId, peerAddress);
- }
-
- public void setPeerAddresses(Map<String, String> peerAddresses) {
- this.peerAddresses = peerAddresses;
- }
- @Override
- public ConfigParams getConfigParams() {
- return configParams;
- }
-
- @Override
- public SnapshotManager getSnapshotManager() {
- if(this.snapshotManager == null){
- this.snapshotManager = new SnapshotManager(this, getLogger());
+ @Override
+ public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
}
- return this.snapshotManager;
- }
- public void setConfigParams(ConfigParams configParams) {
- this.configParams = configParams;
- }
-
- public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
- @Override public void appendAndPersist(
- ReplicatedLogEntry replicatedLogEntry) {
- append(replicatedLogEntry);
+ @Override
+ public boolean shouldCaptureSnapshot(final long logIndex) {
+ return false;
}
@Override
- public int dataSize() {
- return -1;
+ public boolean removeFromAndPersist(final long index) {
+ return removeFrom(index) >= 0;
}
- @Override public void removeFromAndPersist(long index) {
- removeFrom(index);
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+ final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
+ append(replicatedLogEntry);
+
+ if (callback != null) {
+ callback.accept(replicatedLogEntry);
+ }
+
+ return true;
}
}
public MockPayload() {
}
- public MockPayload(String s) {
- this.value = s;
+ public MockPayload(final String data) {
+ this.value = data;
size = value.length();
}
- public MockPayload(String s, int size) {
- this(s);
+ public MockPayload(final String data, final int size) {
+ this(data);
this.size = size;
}
- @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
- Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<GeneratedMessage.GeneratedExtension, String>();
- map.put(MockPayloadMessages.value, value);
- return map;
- }
-
- @Override public Payload decode(
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payloadProtoBuff) {
- String value = payloadProtoBuff.getExtension(MockPayloadMessages.value);
- this.value = value;
- return this;
- }
-
@Override
public int size() {
return size;
}
- @Override public String getClientPayloadClassName() {
- return MockPayload.class.getName();
- }
-
@Override
public String toString() {
return value;
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((value == null) ? 0 : value.hashCode());
+ result = prime * result + (value == null ? 0 : value.hashCode());
return result;
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
}
}
- public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
- private static final long serialVersionUID = 1L;
-
- private final long term;
- private final long index;
- private final Payload data;
-
- public MockReplicatedLogEntry(long term, long index, Payload data){
-
- this.term = term;
- this.index = index;
- this.data = data;
- }
-
- @Override public Payload getData() {
- return data;
- }
-
- @Override public long getTerm() {
- return term;
- }
-
- @Override public long getIndex() {
- return index;
- }
-
- @Override
- public int size() {
- return getData().size();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((data == null) ? 0 : data.hashCode());
- result = prime * result + (int) (index ^ (index >>> 32));
- result = prime * result + (int) (term ^ (term >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj;
- if (data == null) {
- if (other.data != null) {
- return false;
- }
- } else if (!data.equals(other.data)) {
- return false;
- }
- if (index != other.index) {
- return false;
- }
- if (term != other.term) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index)
- .append(", data=").append(data).append("]");
- return builder.toString();
- }
- }
-
public static class MockReplicatedLogBuilder {
private final ReplicatedLog mockLog = new SimpleReplicatedLog();
- public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
- for (int i=start; i<end; i++) {
- this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload("foo" + i)));
+ public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
+ for (int i = start; i < end; i++) {
+ this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
+ new MockRaftActorContext.MockPayload(Integer.toString(i))));
}
return this;
}
- public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
- this.mockLog.append(new ReplicatedLogImplEntry(index, term, payload));
+ public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
+ this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
return this;
}
return this.mockLog;
}
}
+
+ @Override
+ public void setCurrentBehavior(final RaftActorBehavior behavior) {
+ super.setCurrentBehavior(behavior);
+ }
}