X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=36c86f542d398f43b58b6236d8095b8f1ddcf4a9;hp=b8e9653bc5db92ffa4cb15afd391c6a9ec05cd16;hb=b78e7cc266d3540634ccc5b996b67b68365edbd8;hpb=07ba9a998f0b3c3045ed8e31afda5c96de141b3b diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index b8e9653bc5..36c86f542d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -81,6 +80,8 @@ public abstract class RaftActor extends UntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); + private static final int SNAPSHOT_ENTRY_COUNT = 100000; + /** * The current state determines the current behavior of a RaftActor * A Raft Actor always starts off in the Follower State @@ -108,6 +109,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { + LOG.debug("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -116,6 +118,13 @@ public abstract class RaftActor extends UntypedPersistentActor { // when we need to install it on a peer replicatedLog = new ReplicatedLogImpl(snapshot); + context.setReplicatedLog(replicatedLog); + + LOG.debug("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", + replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, + replicatedLog.size()); + // Apply the snapshot to the actors state applySnapshot(snapshot.getState()); @@ -127,7 +136,11 @@ public abstract class RaftActor extends UntypedPersistentActor { context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); } else if (message instanceof RecoveryCompleted) { LOG.debug( - "Last index in log : " + replicatedLog.lastIndex()); + "RecoveryCompleted - Switching actor to Follower - " + + "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "journal-size={}", + replicatedLog.lastIndex(), replicatedLog.snapshotIndex, + replicatedLog.snapshotTerm, replicatedLog.size()); currentBehavior = switchBehavior(RaftState.Follower); } } @@ -191,6 +204,15 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + public java.util.Set getPeers() { + return context.getPeerAddresses().keySet(); + } + + protected String getReplicatedLogState() { + return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + context.getReplicatedLog().size(); + } /** @@ -343,85 +365,33 @@ public abstract class RaftActor extends UntypedPersistentActor { } private void trimPersistentData(long sequenceNumber) { - // Trim snapshots + // Trim akka snapshots // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied // For now guessing that it is ANDed. deleteSnapshots(new SnapshotSelectionCriteria( - sequenceNumber - 100000, 43200000)); + sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000)); - // Trim journal + // Trim akka journal deleteMessages(sequenceNumber); } - private class ReplicatedLogImpl implements ReplicatedLog { - private final List journal; - private final Object snapshot; - private long snapshotIndex = -1; - private long snapshotTerm = -1; + private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - this.snapshot = snapshot.getState(); - this.snapshotIndex = snapshot.getLastAppliedIndex(); - this.snapshotTerm = snapshot.getLastAppliedTerm(); - - this.journal = new ArrayList<>(snapshot.getUnAppliedEntries()); + super(snapshot.getState(), + snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), + snapshot.getUnAppliedEntries()); } public ReplicatedLogImpl() { - this.snapshot = null; - this.journal = new ArrayList<>(); - } - - @Override public ReplicatedLogEntry get(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return null; - } - - return journal.get(adjustedIndex); - } - - @Override public ReplicatedLogEntry last() { - if (journal.size() == 0) { - return null; - } - return get(journal.size() - 1); - } - - @Override public long lastIndex() { - if (journal.size() == 0) { - return -1; - } - - return last().getIndex(); - } - - @Override public long lastTerm() { - if (journal.size() == 0) { - return -1; - } - - return last().getTerm(); - } - - - @Override public void removeFrom(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return; - } - - journal.subList(adjustedIndex , journal.size()).clear(); + super(); } + @Override public void removeFromAndPersist(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); - @Override public void removeFromAndPersist(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + if (adjustedIndex < 0) { return; } @@ -435,29 +405,6 @@ public abstract class RaftActor extends UntypedPersistentActor { //FIXME : Doing nothing for now } }); - - - } - - @Override public void append( - final ReplicatedLogEntry replicatedLogEntry) { - journal.add(replicatedLogEntry); - } - - @Override public List getFrom(long index) { - int adjustedIndex = adjustedIndex(index); - - List entries = new ArrayList<>(100); - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return entries; - } - - - for (int i = adjustedIndex; - i < journal.size(); i++) { - entries.add(journal.get(i)); - } - return entries; } @Override public void appendAndPersist( @@ -482,20 +429,42 @@ public abstract class RaftActor extends UntypedPersistentActor { new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (size() > 100000) { - ReplicatedLogEntry lastAppliedEntry = - get(context.getLastApplied()); + if (journal.size() > SNAPSHOT_ENTRY_COUNT) { + LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; + + ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied()); if (lastAppliedEntry != null) { lastAppliedIndex = lastAppliedEntry.getIndex(); lastAppliedTerm = lastAppliedEntry.getTerm(); } - saveSnapshot(Snapshot.create(createSnapshot(), + LOG.debug("Snapshot Capture logSize: {}", journal.size()); + LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied()); + LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); + LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + Snapshot sn = Snapshot.create(createSnapshot(), getFrom(context.getLastApplied() + 1), lastIndex(), lastTerm(), lastAppliedIndex, - lastAppliedTerm)); + lastAppliedTerm); + saveSnapshot(sn); + + LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + + //be greedy and remove entries from in-mem journal which are in the snapshot + // and update snapshotIndex and snapshotTerm without waiting for the success, + // TODO: damage-recovery to be done on failure + journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear(); + snapshotIndex = lastAppliedIndex; + snapshotTerm = lastAppliedTerm; + + LOG.info("Removed in-memory snapshotted entries, " + + "adjusted snaphsotIndex:{}" + + "and term:{}", snapshotIndex, lastAppliedTerm); } // Send message for replication if (clientActor != null) { @@ -509,46 +478,8 @@ public abstract class RaftActor extends UntypedPersistentActor { ); } - @Override public long size() { - return journal.size() + snapshotIndex + 1; - } - - @Override public boolean isPresent(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return false; - } - return true; - } - - @Override public boolean isInSnapshot(long index) { - return index <= snapshotIndex; - } - - @Override public Object getSnapshot() { - return snapshot; - } - - @Override public long getSnapshotIndex() { - return snapshotIndex; - } - - @Override public long getSnapshotTerm() { - return snapshotTerm; - } - - private int adjustedIndex(long index) { - if(snapshotIndex < 0){ - return (int) index; - } - return (int) (index - snapshotIndex); - } } - - - private static class DeleteEntries implements Serializable { private final int fromIndex; @@ -609,6 +540,17 @@ public abstract class RaftActor extends UntypedPersistentActor { public long getLastAppliedTerm() { return lastAppliedTerm; } + + public String getLogMessage() { + StringBuilder sb = new StringBuilder(); + return sb.append("Snapshot={") + .append("lastTerm:" + this.getLastTerm() + ", ") + .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") + .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") + .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") + .toString(); + + } } private class ElectionTermImpl implements ElectionTerm {