import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure;
-import com.google.protobuf.GeneratedMessage;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public void update(long currentTerm, String votedFor){
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
+ public void update(long newTerm, String newVotedFor) {
+ this.currentTerm = newTerm;
+ this.votedFor = newVotedFor;
// TODO : Write to some persistent state
}
- @Override public void updateAndPersist(long currentTerm,
- String votedFor) {
- update(currentTerm, votedFor);
+ @Override public void updateAndPersist(long newTerm, String newVotedFor) {
+ update(newTerm, newVotedFor);
}
};
}
- public MockRaftActorContext(){
- super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<String, String>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+ public MockRaftActorContext() {
+ super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
+ new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
- public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
- super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<String, String>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+ public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
+ super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
+ new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
+ applyState -> actor.tell(applyState, actor), LOG);
this.system = system;
}
- public void initReplicatedLog(){
+ public void initReplicatedLog() {
SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
long term = getTermInformation().getCurrentTerm();
- replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
- replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
setReplicatedLog(replicatedLog);
+ setCommitIndex(replicatedLog.lastIndex());
+ setLastApplied(replicatedLog.lastIndex());
}
@Override public ActorRef actorOf(Props props) {
@Override public ActorSelection getPeerActorSelection(String peerId) {
String peerAddress = getPeerAddress(peerId);
- if(peerAddress != null){
+ if (peerAddress != null) {
return actorSelection(peerAddress);
}
return null;
}
public void setPeerAddresses(Map<String, String> peerAddresses) {
- for(String id: getPeerIds()) {
+ for (String id: getPeerIds()) {
removePeer(id);
}
- for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
+ for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
}
}
@Override
public SnapshotManager getSnapshotManager() {
SnapshotManager snapshotManager = super.getSnapshotManager();
- snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
+ snapshotManager.setCreateSnapshotConsumer(out -> { });
+
+ snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
+ @Override
+ public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+ return ByteState.of(snapshotBytes.read());
+ }
+
+ @Override
+ public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ }
+
+ @Override
+ public void applySnapshot(State snapshotState) {
+ }
+ });
+
return snapshotManager;
}
}
public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
- @Override
- public void appendAndPersist(
- ReplicatedLogEntry replicatedLogEntry) {
- append(replicatedLogEntry);
- }
-
@Override
public int dataSize() {
return -1;
public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
}
- @Override public void removeFromAndPersist(long index) {
- removeFrom(index);
+ @Override
+ public boolean shouldCaptureSnapshot(long logIndex) {
+ return false;
+ }
+
+ @Override
+ public boolean removeFromAndPersist(long index) {
+ return removeFrom(index) >= 0;
}
@Override
- public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
+ boolean doAsync) {
append(replicatedLogEntry);
- if(callback != null) {
+ if (callback != null) {
try {
callback.apply(replicatedLogEntry);
} catch (Exception e) {
- e.printStackTrace();
+ Throwables.propagate(e);
}
}
+
+ return true;
}
}
public MockPayload() {
}
- public MockPayload(String s) {
- this.value = s;
+ public MockPayload(String data) {
+ this.value = data;
size = value.length();
}
- public MockPayload(String s, int size) {
- this(s);
+ public MockPayload(String data, int size) {
+ this(data);
this.size = size;
}
- @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
- return null;
- }
-
- @Override public Payload decode(
- AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payloadProtoBuff) {
- return null;
- }
-
@Override
public int size() {
return size;
}
- @Override public String getClientPayloadClassName() {
- return MockPayload.class.getName();
- }
-
@Override
public String toString() {
return value;
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((value == null) ? 0 : value.hashCode());
+ result = prime * result + (value == null ? 0 : value.hashCode());
return result;
}
}
}
- public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
- private static final long serialVersionUID = 1L;
-
- private final long term;
- private final long index;
- private final Payload data;
-
- public MockReplicatedLogEntry(long term, long index, Payload data){
-
- this.term = term;
- this.index = index;
- this.data = data;
- }
-
- @Override public Payload getData() {
- return data;
- }
-
- @Override public long getTerm() {
- return term;
- }
-
- @Override public long getIndex() {
- return index;
- }
-
- @Override
- public int size() {
- return getData().size();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((data == null) ? 0 : data.hashCode());
- result = prime * result + (int) (index ^ (index >>> 32));
- result = prime * result + (int) (term ^ (term >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj;
- if (data == null) {
- if (other.data != null) {
- return false;
- }
- } else if (!data.equals(other.data)) {
- return false;
- }
- if (index != other.index) {
- return false;
- }
- if (term != other.term) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index)
- .append(", data=").append(data).append("]");
- return builder.toString();
- }
- }
-
public static class MockReplicatedLogBuilder {
private final ReplicatedLog mockLog = new SimpleReplicatedLog();
public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
- for (int i=start; i<end; i++) {
- this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload(Integer.toString(i))));
+ for (int i = start; i < end; i++) {
+ this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
+ new MockRaftActorContext.MockPayload(Integer.toString(i))));
}
return this;
}
public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
- this.mockLog.append(new ReplicatedLogImplEntry(index, term, payload));
+ this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
return this;
}
return this.mockLog;
}
}
+
+ @Override
+ public void setCurrentBehavior(final RaftActorBehavior behavior) {
+ super.setCurrentBehavior(behavior);
+ }
}