import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.GeneratedMessage;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MockRaftActorContext implements RaftActorContext {
private String id;
private ActorSystem system;
private ActorRef actor;
- private AtomicLong index = new AtomicLong(0);
- private AtomicLong lastApplied = new AtomicLong(0);
+ private long index = 0;
+ private long lastApplied = 0;
private final ElectionTerm electionTerm;
+ private ReplicatedLog replicatedLog;
+ private Map<String, String> peerAddresses = new HashMap<>();
+ private ConfigParams configParams;
+ private boolean snapshotCaptureInitiated;
public MockRaftActorContext(){
electionTerm = null;
+
+ initReplicatedLog();
}
public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
this.system = system;
this.actor = actor;
- electionTerm = new ElectionTermImpl(id);
+ final String id1 = id;
+ electionTerm = new ElectionTerm() {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private final String id = id1;
+ private long currentTerm = 1;
+ private String votedFor = "";
+
+ @Override
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ @Override
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ @Override
+ public void update(long currentTerm, String votedFor){
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+
+ // TODO : Write to some persistent state
+ }
+
+ @Override public void updateAndPersist(long currentTerm,
+ String votedFor) {
+ update(currentTerm, votedFor);
+ }
+ };
+
+ configParams = new DefaultConfigParamsImpl();
+
+ initReplicatedLog();
+ }
+
+
+ public void initReplicatedLog(){
+ this.replicatedLog = new SimpleReplicatedLog();
+ long term = getTermInformation().getCurrentTerm();
+ this.replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
+ this.replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
}
@Override public ActorRef actorOf(Props props) {
return electionTerm;
}
- public void setIndex(AtomicLong index){
+ public void setIndex(long index){
this.index = index;
}
- @Override public AtomicLong getCommitIndex() {
+ @Override public long getCommitIndex() {
return index;
}
- public void setLastApplied(AtomicLong lastApplied){
+ @Override public void setCommitIndex(long commitIndex) {
+ this.index = commitIndex;
+ }
+
+ @Override public void setLastApplied(long lastApplied){
this.lastApplied = lastApplied;
}
- @Override public AtomicLong getLastApplied() {
+ @Override public long getLastApplied() {
return lastApplied;
}
+ @Override
+ public void setReplicatedLog(ReplicatedLog replicatedLog) {
+ this.replicatedLog = replicatedLog;
+ }
+
@Override public ReplicatedLog getReplicatedLog() {
- return new ReplicatedLog(){
+ return replicatedLog;
+ }
+
+ @Override public ActorSystem getActorSystem() {
+ return this.system;
+ }
+
+ @Override public Logger getLogger() {
+ return LoggerFactory.getLogger(getClass());
+ }
+
+ @Override public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ @Override public String getPeerAddress(String peerId) {
+ return peerAddresses.get(peerId);
+ }
+
+ @Override public void addToPeers(String name, String address) {
+ peerAddresses.put(name, address);
+ }
+
+ @Override public void removePeer(String name) {
+ peerAddresses.remove(name);
+ }
+
+ @Override public ActorSelection getPeerActorSelection(String peerId) {
+ String peerAddress = getPeerAddress(peerId);
+ if(peerAddress != null){
+ return actorSelection(peerAddress);
+ }
+ return null;
+ }
+
+ @Override public void setPeerAddress(String peerId, String peerAddress) {
+ Preconditions.checkState(peerAddresses.containsKey(peerId));
+ peerAddresses.put(peerId, peerAddress);
+ }
+
+ public void setPeerAddresses(Map<String, String> peerAddresses) {
+ this.peerAddresses = peerAddresses;
+ }
+
+ @Override
+ public ConfigParams getConfigParams() {
+ return configParams;
+ }
- @Override public ReplicatedLogEntry getReplicatedLogEntry(
- long index) {
- throw new UnsupportedOperationException(
- "getReplicatedLogEntry");
+ @Override
+ public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+ this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return snapshotCaptureInitiated;
+ }
+
+ public void setConfigParams(ConfigParams configParams) {
+ this.configParams = configParams;
+ }
+
+ public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
+ @Override public void appendAndPersist(
+ ReplicatedLogEntry replicatedLogEntry) {
+ append(replicatedLogEntry);
+ }
+
+ @Override
+ public int dataSize() {
+ return -1;
+ }
+
+ @Override public void removeFromAndPersist(long index) {
+ removeFrom(index);
+ }
+ }
+
+ public static class MockPayload extends Payload implements Serializable {
+ private static final long serialVersionUID = 3121380393130864247L;
+ private String value = "";
+
+ public MockPayload(){
+
+ }
+
+ public MockPayload(String s) {
+ this.value = s;
+ }
+
+ @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
+ Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<GeneratedMessage.GeneratedExtension, String>();
+ map.put(MockPayloadMessages.value, value);
+ return map;
+ }
+
+ @Override public Payload decode(
+ AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payloadProtoBuff) {
+ String value = payloadProtoBuff.getExtension(MockPayloadMessages.value);
+ this.value = value;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return value.length();
+ }
+
+ @Override public String getClientPayloadClassName() {
+ return MockPayload.class.getName();
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
}
+ MockPayload other = (MockPayload) obj;
+ if (value == null) {
+ if (other.value != null) {
+ return false;
+ }
+ } else if (!value.equals(other.value)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final long term;
+ private final long index;
+ private final Payload data;
+
+ public MockReplicatedLogEntry(long term, long index, Payload data){
+
+ this.term = term;
+ this.index = index;
+ this.data = data;
+ }
- @Override public ReplicatedLogEntry last() {
- return new ReplicatedLogEntry() {
- @Override public Object getData() {
- return null;
- }
+ @Override public Payload getData() {
+ return data;
+ }
- @Override public long getTerm() {
- return 1;
- }
+ @Override public long getTerm() {
+ return term;
+ }
- @Override public long getIndex() {
- return 1;
- }
- };
+ @Override public long getIndex() {
+ return index;
+ }
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((data == null) ? 0 : data.hashCode());
+ result = prime * result + (int) (index ^ (index >>> 32));
+ result = prime * result + (int) (term ^ (term >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
}
- };
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj;
+ if (data == null) {
+ if (other.data != null) {
+ return false;
+ }
+ } else if (!data.equals(other.data)) {
+ return false;
+ }
+ if (index != other.index) {
+ return false;
+ }
+ if (term != other.term) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index)
+ .append(", data=").append(data).append("]");
+ return builder.toString();
+ }
}
- @Override public ActorSystem getActorSystem() {
- return this.system;
+ public static class MockReplicatedLogBuilder {
+ private final ReplicatedLog mockLog = new SimpleReplicatedLog();
+
+ public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+ for (int i=start; i<end; i++) {
+ this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload("foo" + i)));
+ }
+ return this;
+ }
+
+ public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
+ this.mockLog.append(new ReplicatedLogImplEntry(index, term, payload));
+ return this;
+ }
+
+ public ReplicatedLog build() {
+ return this.mockLog;
+ }
}
}