+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft;
-
-public class ElectionTermImpl implements ElectionTerm{
- /**
- * Identifier of the actor whose election term information this is
- */
- private final String id;
-
- private long currentTerm;
-
- private String votedFor;
-
- public ElectionTermImpl(String id) {
- this.id = id;
-
- // TODO: Read currentTerm from some persistent state
- currentTerm = 0;
-
- // TODO: Read votedFor from some file
- votedFor = "";
- }
-
- public long getCurrentTerm() {
- return currentTerm;
- }
-
- public String getVotedFor() {
- return votedFor;
- }
-
- public void update(long currentTerm, String votedFor){
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
-
- // TODO : Write to some persistent state
- }
-}
public RaftActor(String id, Map<String, String> peerAddresses) {
+ final String id1 = getSelf().path().toString();
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(),
- id, new ElectionTermImpl(getSelf().path().toString()),
+ id, new ElectionTermImpl(),
-1, -1, replicatedLog, peerAddresses, LOG);
}
} else if (message instanceof ReplicatedLogEntry) {
replicatedLog.append((ReplicatedLogEntry) message);
+ } else if (message instanceof DeleteEntries) {
+ replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
} else if (message instanceof RecoveryCompleted) {
LOG.debug(
"Last index in log : " + replicatedLog.lastIndex());
deleteMessages(sequenceNumber);
}
+
private class ReplicatedLogImpl implements ReplicatedLog {
private final List<ReplicatedLogEntry> journal;
private final Object snapshot;
if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
return;
}
- for (int i = adjustedIndex;
- i < journal.size(); i++) {
- deleteMessage(i);
- journal.remove(i);
+
+ journal.subList(adjustedIndex , journal.size()).clear();
+ }
+
+
+ @Override public void removeFromAndPersist(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return;
}
+
+ // FIXME: Maybe this should be done after the command is saved
+ journal.subList(adjustedIndex , journal.size()).clear();
+
+ persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+ @Override public void apply(DeleteEntries param)
+ throws Exception {
+ //FIXME : Doing nothing for now
+ }
+ });
+
+
}
@Override public void append(
if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
return entries;
}
+
+
for (int i = adjustedIndex;
i < journal.size(); i++) {
entries.add(journal.get(i));
}
}
+ private static class DeleteEntries implements Serializable {
+ private final int fromIndex;
+
+
+ public DeleteEntries(int fromIndex) {
+ this.fromIndex = fromIndex;
+ }
+
+ public int getFromIndex() {
+ return fromIndex;
+ }
+ }
+
private static class Snapshot implements Serializable {
private final Object state;
}
}
+ private class ElectionTermImpl implements ElectionTerm {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private long currentTerm = 0;
+ private String votedFor = null;
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ @Override public void update(long currentTerm, String votedFor) {
+ LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+ }
+
+ @Override
+ public void updateAndPersist(long currentTerm, String votedFor){
+ update(currentTerm, votedFor);
+ // FIXME : Maybe first persist then update the state
+ persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+
+ @Override public void apply(UpdateElectionTerm param)
+ throws Exception {
+
+ }
+ });
+ }
+ }
+
+ private static class UpdateElectionTerm implements Serializable {
+ private final long currentTerm;
+ private final String votedFor;
+
+ public UpdateElectionTerm(long currentTerm, String votedFor) {
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+ }
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+ }
}
this.system = system;
this.actor = actor;
- electionTerm = new ElectionTermImpl(id);
+ final String id1 = id;
+ electionTerm = new ElectionTerm() {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private final String id = id1;
+ private long currentTerm = 0;
+ private String votedFor = "";
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ public void update(long currentTerm, String votedFor){
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+
+ // TODO : Write to some persistent state
+ }
+
+ @Override public void updateAndPersist(long currentTerm,
+ String votedFor) {
+ update(currentTerm, votedFor);
+ }
+ };
initReplicatedLog();
}
}
+
public static class SimpleReplicatedLog implements ReplicatedLog {
- private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
+ private final List<ReplicatedLogEntry> log = new ArrayList<>();
@Override public ReplicatedLogEntry get(long index) {
if(index >= log.size() || index < 0){
if(index >= log.size() || index < 0){
return;
}
- for(int i=(int) index ; i < log.size() ; i++) {
- log.remove(i);
- }
+
+ log.subList((int) index, log.size()).clear();
+ //log.remove((int) index);
+ }
+
+ @Override public void removeFromAndPersist(long index) {
+ removeFrom(index);
}
@Override public void append(ReplicatedLogEntry replicatedLogEntry) {