import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import com.google.protobuf.GeneratedMessage;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.MockPayloadMessages;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class MockRaftActorContext implements RaftActorContext {
private long lastApplied = 0;
private final ElectionTerm electionTerm;
private ReplicatedLog replicatedLog;
+ private Map<String, String> peerAddresses = new HashMap();
public MockRaftActorContext(){
electionTerm = null;
this.system = system;
this.actor = actor;
- electionTerm = new ElectionTermImpl(id);
+ final String id1 = id;
+ electionTerm = new ElectionTerm() {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private final String id = id1;
+ private long currentTerm = 0;
+ private String votedFor = "";
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ public void update(long currentTerm, String votedFor){
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+
+ // TODO : Write to some persistent state
+ }
+
+ @Override public void updateAndPersist(long currentTerm,
+ String votedFor) {
+ update(currentTerm, votedFor);
+ }
+ };
initReplicatedLog();
}
public void initReplicatedLog(){
- MockReplicatedLog mockReplicatedLog = new MockReplicatedLog();
- this.replicatedLog = mockReplicatedLog;
- mockReplicatedLog.setLast(new MockReplicatedLogEntry(1,1,""));
- mockReplicatedLog.setReplicatedLogEntry(new MockReplicatedLogEntry(1,1, ""));
+ this.replicatedLog = new SimpleReplicatedLog();
+ this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("")));
}
@Override public ActorRef actorOf(Props props) {
return this.system;
}
+ @Override public LoggingAdapter getLogger() {
+ return Logging.getLogger(system, this);
+ }
- public static class MockReplicatedLog implements ReplicatedLog {
- private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
- private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, "");
+ @Override public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ @Override public String getPeerAddress(String peerId) {
+ return peerAddresses.get(peerId);
+ }
+
+ @Override public void addToPeers(String name, String address) {
+ peerAddresses.put(name, address);
+ }
+
+ @Override public void removePeer(String name) {
+ peerAddresses.remove(name);
+ }
+
+ @Override public ActorSelection getPeerActorSelection(String peerId) {
+ String peerAddress = getPeerAddress(peerId);
+ if(peerAddress != null){
+ return actorSelection(peerAddress);
+ }
+ return null;
+ }
+
+ @Override public void setPeerAddress(String peerId, String peerAddress) {
+ Preconditions.checkState(peerAddresses.containsKey(peerId));
+ peerAddresses.put(peerId, peerAddress);
+ }
+
+ public void setPeerAddresses(Map<String, String> peerAddresses) {
+ this.peerAddresses = peerAddresses;
+ }
+
+ @Override
+ public ConfigParams getConfigParams() {
+ return new DefaultConfigParamsImpl();
+ }
+
+ public static class SimpleReplicatedLog implements ReplicatedLog {
+ private final List<ReplicatedLogEntry> log = new ArrayList<>();
@Override public ReplicatedLogEntry get(long index) {
- return replicatedLogEntry;
+ if(index >= log.size() || index < 0){
+ return null;
+ }
+ return log.get((int) index);
}
@Override public ReplicatedLogEntry last() {
- return last;
+ if(log.size() == 0){
+ return null;
+ }
+ return log.get(log.size()-1);
+ }
+
+ @Override public long lastIndex() {
+ if(log.size() == 0){
+ return -1;
+ }
+
+ return last().getIndex();
+ }
+
+ @Override public long lastTerm() {
+ if(log.size() == 0){
+ return -1;
+ }
+
+ return last().getTerm();
}
@Override public void removeFrom(long index) {
+ if(index >= log.size() || index < 0){
+ return;
+ }
+
+ log.subList((int) index, log.size()).clear();
+ //log.remove((int) index);
+ }
+
+ @Override public void removeFromAndPersist(long index) {
+ removeFrom(index);
}
@Override public void append(ReplicatedLogEntry replicatedLogEntry) {
+ log.add(replicatedLogEntry);
}
- public void setReplicatedLogEntry(
+ @Override public void appendAndPersist(
ReplicatedLogEntry replicatedLogEntry) {
- this.replicatedLogEntry = replicatedLogEntry;
+ append(replicatedLogEntry);
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ if(index >= log.size() || index < 0){
+ return Collections.EMPTY_LIST;
+ }
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ for(int i=(int) index ; i < log.size() ; i++) {
+ entries.add(get(i));
+ }
+ return entries;
+ }
+
+ @Override public long size() {
+ return log.size();
+ }
+
+ @Override public boolean isPresent(long index) {
+ if(index >= log.size() || index < 0){
+ return false;
+ }
+
+ return true;
}
- public void setLast(ReplicatedLogEntry last) {
- this.last = last;
+ @Override public boolean isInSnapshot(long index) {
+ return false;
+ }
+
+ @Override public Object getSnapshot() {
+ return null;
+ }
+
+ @Override public long getSnapshotIndex() {
+ return -1;
+ }
+
+ @Override public long getSnapshotTerm() {
+ return -1;
}
}
- public static class SimpleReplicatedLog implements ReplicatedLog {
- private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
+ public static class MockPayload extends Payload implements Serializable {
+ private String value = "";
- @Override public ReplicatedLogEntry get(long index) {
- return log.get((int) index);
+ public MockPayload(String s) {
+ this.value = s;
}
- @Override public ReplicatedLogEntry last() {
- return log.get(log.size()-1);
+ @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
+ Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<GeneratedMessage.GeneratedExtension, String>();
+ map.put(MockPayloadMessages.value, value);
+ return map;
}
- @Override public void removeFrom(long index) {
- for(int i=(int) index ; i < log.size() ; i++) {
- log.remove(i);
- }
+ @Override public Payload decode(
+ AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payloadProtoBuff) {
+ String value = payloadProtoBuff.getExtension(MockPayloadMessages.value);
+ this.value = value;
+ return this;
}
- @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
- log.add(replicatedLogEntry);
+ @Override public String getClientPayloadClassName() {
+ return MockPayload.class.getName();
+ }
+
+ public String toString() {
+ return value;
}
}
- public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
+ public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
private final long term;
private final long index;
- private final Object data;
+ private final Payload data;
- public MockReplicatedLogEntry(long term, long index, Object data){
+ public MockReplicatedLogEntry(long term, long index, Payload data){
this.term = term;
this.index = index;
this.data = data;
}
- @Override public Object getData() {
+ @Override public Payload getData() {
return data;
}