X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=65254f2d6277c34c9773570e5938d8a005ab3015;hp=a7c3db4fc246bcce77d23e8d324a4301d930568e;hb=9a24e798d7f9855be2518db54adcac60f7f9ba54;hpb=3cd841f641ebd8e4c3002ad3a61d06d4c276a656 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index a7c3db4fc2..65254f2d62 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; @@ -33,16 +31,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -85,8 +82,7 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries * */ public abstract class RaftActor extends AbstractUntypedPersistentActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** * The current state determines the current behavior of a RaftActor @@ -107,14 +103,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private CaptureSnapshot captureSnapshot = null; - private volatile boolean hasSnapshotCaptureInitiated = false; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; - - public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -131,8 +123,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void initRecoveryTimer() { if(recoveryTimer == null) { - recoveryTimer = new Stopwatch(); - recoveryTimer.start(); + recoveryTimer = Stopwatch.createStarted(); } } @@ -172,16 +163,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onRecoveryComplete(); - RaftActorBehavior oldBehavior = currentBehavior; - currentBehavior = new Follower(context); - handleBehaviorChange(oldBehavior, currentBehavior); + initializeBehavior(); } } } private void onRecoveredSnapshot(SnapshotOffer offer) { if(LOG.isDebugEnabled()) { - LOG.debug("SnapshotOffer called.."); + LOG.debug("{}: SnapshotOffer called..", persistenceId()); } initRecoveryTimer(); @@ -197,21 +186,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setLastApplied(snapshot.getLastAppliedIndex()); context.setCommitIndex(snapshot.getLastAppliedIndex()); - Stopwatch timer = new Stopwatch(); - timer.start(); + Stopwatch timer = Stopwatch.createStarted(); // Apply the snapshot to the actors state - applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState())); + applyRecoverySnapshot(snapshot.getState()); timer.stop(); LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + replicatedLog.size(), persistenceId(), timer.toString(), - replicatedLog.snapshotIndex, replicatedLog.snapshotTerm); + replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm()); } private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { if(LOG.isDebugEnabled()) { - LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex()); + LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex()); } replicatedLog.append(logEntry); @@ -219,8 +207,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onRecoveredApplyLogEntries(ApplyLogEntries ale) { if(LOG.isDebugEnabled()) { - LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}", - context.getLastApplied() + 1, ale.getToIndex()); + LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", + persistenceId(), context.getLastApplied() + 1, ale.getToIndex()); } for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { @@ -270,11 +258,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { "Persistence Id = " + persistenceId() + " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " + "journal-size={}", - replicatedLog.lastIndex(), replicatedLog.snapshotIndex, - replicatedLog.snapshotTerm, replicatedLog.size()); + replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(), + replicatedLog.getSnapshotTerm(), replicatedLog.size()); + + initializeBehavior(); + } + + protected void initializeBehavior(){ + changeCurrentBehavior(new Follower(context)); + } + protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ RaftActorBehavior oldBehavior = currentBehavior; - currentBehavior = new Follower(context); + currentBehavior = newBehavior; handleBehaviorChange(oldBehavior, currentBehavior); } @@ -283,8 +279,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { ApplyState applyState = (ApplyState) message; if(LOG.isDebugEnabled()) { - LOG.debug("Applying state for log index {} data {}", - applyState.getReplicatedLogEntry().getIndex(), + LOG.debug("{}: Applying state for log index {} data {}", + persistenceId(), applyState.getReplicatedLogEntry().getIndex(), applyState.getReplicatedLogEntry().getData()); } @@ -294,7 +290,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof ApplyLogEntries){ ApplyLogEntries ale = (ApplyLogEntries) message; if(LOG.isDebugEnabled()) { - LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex()); + LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex()); } persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { @Override @@ -306,12 +302,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); if(LOG.isDebugEnabled()) { - LOG.debug("ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + LOG.debug("{}: ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm() ); } - applySnapshot(ByteString.copyFrom(snapshot.getState())); + + applySnapshot(snapshot.getState()); //clears the followers log, sets the snapshot index to ensure adjusted-index works replicatedLog = new ReplicatedLogImpl(snapshot); @@ -326,7 +323,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotSuccess) { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; - LOG.info("SaveSnapshotSuccess received for snapshot"); + LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId()); long sequenceNumber = success.metadata().sequenceNr(); @@ -335,39 +332,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString()); - LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:"); + LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", + persistenceId(), saveSnapshotFailure.cause()); context.getReplicatedLog().snapshotRollback(); - LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", + LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), context.getReplicatedLog().size()); } else if (message instanceof CaptureSnapshot) { - LOG.info("CaptureSnapshot received by actor"); - CaptureSnapshot cs = (CaptureSnapshot)message; - captureSnapshot = cs; - createSnapshot(); + LOG.info("{}: CaptureSnapshot received by actor", persistenceId()); - } else if (message instanceof CaptureSnapshotReply){ - LOG.info("CaptureSnapshotReply received by actor"); - CaptureSnapshotReply csr = (CaptureSnapshotReply) message; + if(captureSnapshot == null) { + captureSnapshot = (CaptureSnapshot)message; + createSnapshot(); + } - ByteString stateInBytes = csr.getSnapshot(); - LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); - handleCaptureSnapshotReply(stateInBytes); + } else if (message instanceof CaptureSnapshotReply){ + handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else { - if (!(message instanceof AppendEntriesMessages.AppendEntries) - && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { - if(LOG.isDebugEnabled()) { - LOG.debug("onReceiveCommand: message: {}", message.getClass()); - } - } - RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = currentBehavior.handleMessage(getSender(), message); @@ -379,15 +366,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (oldBehavior != currentBehavior){ onStateChanged(); } - if (oldBehavior != null) { - // it can happen that the state has not changed but the leader has changed. - onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); - if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) { - // we do not want to notify when the behavior/role is set for the first time (i.e follower) - getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(), - currentBehavior.state().name()), getSelf()); - } + String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId(); + String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name(); + + // it can happen that the state has not changed but the leader has changed. + onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + + if (getRoleChangeNotifier().isPresent() && + (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) { + getRoleChangeNotifier().get().tell( + new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()), + getSelf()); } } @@ -407,7 +397,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getTermInformation().getCurrentTerm(), data); if(LOG.isDebugEnabled()) { - LOG.debug("Persist data {}", replicatedLogEntry); + LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); } final RaftActorContext raftContext = getRaftActorContext(); @@ -429,12 +419,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self()); // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!hasSnapshotCaptureInitiated){ + if(!context.isSnapshotCaptureInitiated()){ raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), raftContext.getTermInformation().getCurrentTerm()); raftContext.getReplicatedLog().snapshotCommit(); } else { - LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId()); + LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress", + persistenceId(), getId()); } } else if (clientActor != null) { // Send message for replication @@ -572,9 +563,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * This method is called during recovery to reconstruct the state of the actor. * - * @param snapshot A snapshot of the state of the actor + * @param snapshotBytes A snapshot of the state of the actor */ - protected abstract void applyRecoverySnapshot(ByteString snapshot); + protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); /** * This method is called during recovery at the end of a batch to apply the current batched @@ -603,9 +594,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * operations when the derived actor is out of sync with it's peers * and the only way to bring it in sync is by applying a snapshot * - * @param snapshot A snapshot of the state of the actor + * @param snapshotBytes A snapshot of the state of the actor */ - protected abstract void applySnapshot(ByteString snapshot); + protected abstract void applySnapshot(byte[] snapshotBytes); /** * This method will be called by the RaftActor when the state of the @@ -645,44 +636,56 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } String peerAddress = context.getPeerAddress(leaderId); if(LOG.isDebugEnabled()) { - LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}", - leaderId, peerAddress); + LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", + persistenceId(), leaderId, peerAddress); } return peerAddress; } - private void handleCaptureSnapshotReply(ByteString stateInBytes) { + private void handleCaptureSnapshotReply(byte[] snapshotBytes) { + LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); + // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(stateInBytes.toByteArray(), + Snapshot sn = Snapshot.create(snapshotBytes, context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); persistence().saveSnapshot(sn); - LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, + long dataThreshold = Runtime.getRuntime().totalMemory() * + getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); - context.getReplicatedLog().snapshotPreCommit( - captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + } else { + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + } + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + - "and term:{}", captureSnapshot.getLastAppliedIndex(), + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes)); + currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot( + ByteString.copyFrom(snapshotBytes))); } captureSnapshot = null; - hasSnapshotCaptureInitiated = false; + context.setSnapshotCaptureInitiated(false); } protected boolean hasFollowers(){ @@ -713,13 +716,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure() { - @Override public void apply(DeleteEntries param) - throws Exception { + @Override + public void apply(DeleteEntries param) + throws Exception { //FIXME : Doing nothing for now dataSize = 0; - for(ReplicatedLogEntry entry : journal){ + for (ReplicatedLogEntry entry : journal) { dataSize += entry.size(); } } @@ -731,17 +735,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { appendAndPersist(replicatedLogEntry, null); } - @Override - public int dataSize() { - return dataSize; - } - public void appendAndPersist( final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { if(LOG.isDebugEnabled()) { - LOG.debug("Append log entry and persist {} ", replicatedLogEntry); + LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry); } // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs @@ -762,7 +761,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex()+1; + long journalSize = lastIndex() + 1; if(!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log @@ -783,13 +782,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; // when a snaphsot is being taken, captureSnapshot != null - if (hasSnapshotCaptureInitiated == false && + if (!context.isSnapshotCaptureInitiated() && ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || dataSizeForCheck > dataThreshold)) { dataSizeSinceLastSnapshot = 0; - LOG.info("Initiating Snapshot Capture.."); + LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," + + " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold); + long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -803,20 +804,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot Capture logSize: {}", journal.size()); - LOG.debug("Snapshot Capture lastApplied:{} ", - context.getLastApplied()); - LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); - LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size()); + LOG.debug("{}: Snapshot Capture lastApplied:{} ", + persistenceId(), context.getLastApplied()); + LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(), + lastAppliedIndex); + LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(), + lastAppliedTerm); } // send a CaptureSnapshot to self to make the expensive operation async. - getSelf().tell(new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), null); - hasSnapshotCaptureInitiated = true; + context.setSnapshotCaptureInitiated(true); } - if(callback != null){ + if (callback != null){ callback.apply(replicatedLogEntry); } } @@ -859,7 +865,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void update(long currentTerm, String votedFor) { if(LOG.isDebugEnabled()) { - LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor); } this.currentTerm = currentTerm; this.votedFor = votedFor;