X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=8270f2949a67cc9fc00f5180dce41872ca6a8a47;hp=a0bb0046073f08716e95d8959122099d8ad21df5;hb=4c975594eb74127ca4c0018984d569ae52be77e5;hpb=26d0e25159bf3b73ca38a88e17268cf17f876d8d diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index a0bb004607..8270f2949a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import com.google.common.base.Optional; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import java.io.Serializable; -import java.util.List; import java.util.Map; /** @@ -91,13 +96,16 @@ public abstract class RaftActor extends UntypedPersistentActor { * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ - private RaftActorContext context; + protected RaftActorContext context; /** * The in-memory journal */ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); + private CaptureSnapshot captureSnapshot = null; + + private volatile boolean hasSnapshotCaptureInitiated = false; public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); @@ -115,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { - LOG.debug("SnapshotOffer called.."); + LOG.info("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -125,25 +133,39 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog = new ReplicatedLogImpl(snapshot); context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); - LOG.debug("Applied snapshot to replicatedLog. " + - "snapshotIndex={}, snapshotTerm={}, journal-size={}", + LOG.info("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, - replicatedLog.size()); + replicatedLog.size() + ); // Apply the snapshot to the actors state - applySnapshot(snapshot.getState()); + applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { - replicatedLog.append((ReplicatedLogEntry) message); + ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; + + // Apply State immediately + replicatedLog.append(logEntry); + applyState(null, "recovery", logEntry.getData()); + context.setLastApplied(logEntry.getIndex()); + context.setCommitIndex(logEntry.getIndex()); + } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { - LOG.debug( + LOG.info( "RecoveryCompleted - Switching actor to Follower - " + - "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + "journal-size={}", replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); @@ -156,38 +178,59 @@ public abstract class RaftActor extends UntypedPersistentActor { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; - LOG.debug("Applying state for log index {} data {}", - applyState.getReplicatedLogEntry().getIndex(), - applyState.getReplicatedLogEntry().getData()); + if(LOG.isDebugEnabled()) { + LOG.debug("Applying state for log index {} data {}", + applyState.getReplicatedLogEntry().getIndex(), + applyState.getReplicatedLogEntry().getData()); + } applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); } else if(message instanceof ApplySnapshot ) { - applySnapshot(((ApplySnapshot) message).getSnapshot()); + Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); + + if(LOG.isDebugEnabled()) { + LOG.debug("ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm() + ); + } + applySnapshot(ByteString.copyFrom(snapshot.getState())); + + //clears the followers log, sets the snapshot index to ensure adjusted-index works + replicatedLog = new ReplicatedLogImpl(snapshot); + context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); } else if (message instanceof FindLeader) { getSender().tell( - new FindLeaderReply( - context.getPeerAddress(currentBehavior.getLeaderId())), + new FindLeaderReply(getLeaderAddress()), getSelf() ); } else if (message instanceof SaveSnapshotSuccess) { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; + LOG.info("SaveSnapshotSuccess received for snapshot"); + + context.getReplicatedLog().snapshotCommit(); // TODO: Not sure if we want to be this aggressive with trimming stuff trimPersistentData(success.metadata().sequenceNr()); } else if (message instanceof SaveSnapshotFailure) { + SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - // TODO: Handle failure in saving the snapshot + LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString()); + LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:"); - } else if (message instanceof FindLeader){ + context.getReplicatedLog().snapshotRollback(); - getSender().tell(new FindLeaderReply( - context.getPeerAddress(currentBehavior.getLeaderId())), - getSelf()); + LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); } else if (message instanceof AddRaftPeer){ @@ -203,7 +246,27 @@ public abstract class RaftActor extends UntypedPersistentActor { RemoveRaftPeer rrp = (RemoveRaftPeer)message; context.removePeer(rrp.getName()); + } else if (message instanceof CaptureSnapshot) { + LOG.info("CaptureSnapshot received by actor"); + CaptureSnapshot cs = (CaptureSnapshot)message; + captureSnapshot = cs; + createSnapshot(); + + } else if (message instanceof CaptureSnapshotReply){ + LOG.info("CaptureSnapshotReply received by actor"); + CaptureSnapshotReply csr = (CaptureSnapshotReply) message; + + ByteString stateInBytes = csr.getSnapshot(); + LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + handleCaptureSnapshotReply(stateInBytes); + } else { + if (!(message instanceof AppendEntriesMessages.AppendEntries) + && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { + if(LOG.isDebugEnabled()) { + LOG.debug("onReceiveCommand: message:" + message.getClass()); + } + } RaftState state = currentBehavior.handleMessage(getSender(), message); @@ -212,6 +275,8 @@ public abstract class RaftActor extends UntypedPersistentActor { if(oldBehavior != currentBehavior){ onStateChanged(); } + + onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); } } @@ -241,7 +306,9 @@ public abstract class RaftActor extends UntypedPersistentActor { context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); - LOG.debug("Persist data {}", replicatedLogEntry); + if(LOG.isDebugEnabled()) { + LOG.debug("Persist data {}", replicatedLogEntry); + } replicatedLog .appendAndPersist(clientActor, identifier, replicatedLogEntry); @@ -269,18 +336,13 @@ public abstract class RaftActor extends UntypedPersistentActor { * @return A reference to the leader if known, null otherwise */ protected ActorSelection getLeader(){ - String leaderId = currentBehavior.getLeaderId(); - if (leaderId == null) { - return null; - } - String peerAddress = context.getPeerAddress(leaderId); - LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " - + peerAddress); + String leaderAddress = getLeaderAddress(); - if(peerAddress == null){ + if(leaderAddress == null){ return null; } - return context.actorSelection(peerAddress); + + return context.actorSelection(leaderAddress); } /** @@ -360,7 +422,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @return The current state of the actor */ - protected abstract Object createSnapshot(); + protected abstract void createSnapshot(); /** * This method will be called by the RaftActor during recovery to @@ -372,7 +434,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @param snapshot A snapshot of the state of the actor */ - protected abstract void applySnapshot(Object snapshot); + protected abstract void applySnapshot(ByteString snapshot); /** * This method will be called by the RaftActor when the state of the @@ -381,6 +443,8 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected abstract void onStateChanged(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; + private RaftActorBehavior switchBehavior(RaftState state) { if (currentBehavior != null) { if (currentBehavior.state() == state) { @@ -424,11 +488,56 @@ public abstract class RaftActor extends UntypedPersistentActor { deleteMessages(sequenceNumber); } + private String getLeaderAddress(){ + if(isLeader()){ + return getSelf().path().toString(); + } + String leaderId = currentBehavior.getLeaderId(); + if (leaderId == null) { + return null; + } + String peerAddress = context.getPeerAddress(leaderId); + if(LOG.isDebugEnabled()) { + LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = " + + peerAddress); + } + + return peerAddress; + } + + private void handleCaptureSnapshotReply(ByteString stateInBytes) { + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + + Snapshot sn = Snapshot.create(stateInBytes.toByteArray(), + context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), + captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + + saveSnapshot(sn); + + LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + + //be greedy and remove entries from in-mem journal which are in the snapshot + // and update snapshotIndex and snapshotTerm without waiting for the success, + + context.getReplicatedLog().snapshotPreCommit(stateInBytes, + captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + captureSnapshot = null; + hasSnapshotCaptureInitiated = false; + } + private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - super(snapshot.getState(), + super(ByteString.copyFrom(snapshot.getState()), snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), snapshot.getUnAppliedEntries()); } @@ -477,8 +586,10 @@ public abstract class RaftActor extends UntypedPersistentActor { persist(replicatedLogEntry, new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { - // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) { + // when a snaphsot is being taken, captureSnapshot != null + if (hasSnapshotCaptureInitiated == false && + journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) { + LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -489,31 +600,19 @@ public abstract class RaftActor extends UntypedPersistentActor { lastAppliedTerm = lastAppliedEntry.getTerm(); } - LOG.debug("Snapshot Capture logSize: {}", journal.size()); - LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied()); - LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); - LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); - - // create a snapshot object from the state provided and save it - // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(createSnapshot(), - getFrom(context.getLastApplied() + 1), - lastIndex(), lastTerm(), lastAppliedIndex, - lastAppliedTerm); - saveSnapshot(sn); - - LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); - - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, - // TODO: damage-recovery to be done on failure - journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear(); - snapshotIndex = lastAppliedIndex; - snapshotTerm = lastAppliedTerm; - - LOG.info("Removed in-memory snapshotted entries, " + - "adjusted snaphsotIndex:{}" + - "and term:{}", snapshotIndex, lastAppliedTerm); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot Capture logSize: {}", journal.size()); + LOG.debug("Snapshot Capture lastApplied:{} ", + context.getLastApplied()); + LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); + LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + } + + // send a CaptureSnapshot to self to make the expensive operation async. + getSelf().tell(new CaptureSnapshot( + lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + null); + hasSnapshotCaptureInitiated = true; } // Send message for replication if (clientActor != null) { @@ -543,65 +642,6 @@ public abstract class RaftActor extends UntypedPersistentActor { } - private static class Snapshot implements Serializable { - private final Object state; - private final List unAppliedEntries; - private final long lastIndex; - private final long lastTerm; - private final long lastAppliedIndex; - private final long lastAppliedTerm; - - private Snapshot(Object state, - List unAppliedEntries, long lastIndex, - long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { - this.state = state; - this.unAppliedEntries = unAppliedEntries; - this.lastIndex = lastIndex; - this.lastTerm = lastTerm; - this.lastAppliedIndex = lastAppliedIndex; - this.lastAppliedTerm = lastAppliedTerm; - } - - - public static Snapshot create(Object state, - List entries, long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm) { - return new Snapshot(state, entries, lastIndex, lastTerm, - lastAppliedIndex, lastAppliedTerm); - } - - public Object getState() { - return state; - } - - public List getUnAppliedEntries() { - return unAppliedEntries; - } - - public long getLastTerm() { - return lastTerm; - } - - public long getLastAppliedIndex() { - return lastAppliedIndex; - } - - public long getLastAppliedTerm() { - return lastAppliedTerm; - } - - public String getLogMessage() { - StringBuilder sb = new StringBuilder(); - return sb.append("Snapshot={") - .append("lastTerm:" + this.getLastTerm() + ", ") - .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") - .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") - .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") - .toString(); - - } - } - private class ElectionTermImpl implements ElectionTerm { /** * Identifier of the actor whose election term information this is @@ -618,8 +658,9 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public void update(long currentTerm, String votedFor) { - LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor); - + if(LOG.isDebugEnabled()) { + LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + } this.currentTerm = currentTerm; this.votedFor = votedFor; }