X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=57eb2647c9ebbc3ab345b93b16bcbfde44a65b1c;hp=47c8db6006544b13c11b42f7abdb370590fd3b8b;hb=b2cb02f62ab7c7599e8d94fe92d1ce63e17d599b;hpb=f276ae33b951d173b51c467bb7bb1a5f5cf9a1e6 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 47c8db6006..57eb2647c9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,13 +12,11 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; -import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.Lists; -import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -35,7 +33,6 @@ import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; @@ -51,7 +48,10 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, - (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), + configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(), delegatingPersistenceProvider, LOG); context.setPayloadVersion(payloadVersion); @@ -170,13 +170,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { initializeBehavior(); raftRecovery = null; - - if (context.getReplicatedLog().size() > 0) { - self().tell(new InitiateCaptureSnapshot(), self()); - LOG.info("{}: Snapshot capture initiated after recovery", persistenceId()); - } else { - LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId()); - } } } @@ -242,8 +235,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { applyState.getReplicatedLogEntry().getData()); } - applyState(applyState.getClientActor(), applyState.getIdentifier(), - applyState.getReplicatedLogEntry().getData()); + if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) { + applyState(applyState.getClientActor(), applyState.getIdentifier(), + applyState.getReplicatedLogEntry().getData()); + } long elapsedTime = System.nanoTime() - startTime; if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ @@ -261,6 +256,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getSnapshotManager().trimLog(context.getLastApplied()); } + // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState. + possiblyHandleBehaviorMessage(message); + } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; if(LOG.isDebugEnabled()) { @@ -279,30 +277,36 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if(message instanceof InitiateCaptureSnapshot) { captureSnapshot(); } else if(message instanceof SwitchBehavior) { - switchBehavior(((SwitchBehavior) message)); + switchBehavior((SwitchBehavior) message); } else if(message instanceof LeaderTransitioning) { onLeaderTransitioning(); } else if(message instanceof Shutdown) { onShutDown(); } else if(message instanceof Runnable) { ((Runnable)message).run(); - } else { - // Processing the message may affect the state, hence we need to capture it - final RaftActorBehavior currentBehavior = getCurrentBehavior(); - final BehaviorState state = behaviorStateTracker.capture(currentBehavior); - - // A behavior indicates that it processed the change by returning a reference to the next behavior - // to be used. A null return indicates it has not processed the message and we should be passing it to - // the subclass for handling. - final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); - if (nextBehavior != null) { - switchBehavior(state, nextBehavior); - } else { - handleNonRaftCommand(message); - } + } else if(message instanceof NoopPayload) { + persistData(null, null, (NoopPayload)message); + } else if (!possiblyHandleBehaviorMessage(message)) { + handleNonRaftCommand(message); } } + private boolean possiblyHandleBehaviorMessage(final Object message) { + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + final BehaviorState state = behaviorStateTracker.capture(currentBehavior); + + // A behavior indicates that it processed the change by returning a reference to the next behavior + // to be used. A null return indicates it has not processed the message and we should be passing it to + // the subclass for handling. + final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); + if (nextBehavior != null) { + switchBehavior(state, nextBehavior); + return true; + } + + return false; + } + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { LOG.debug("{}: Initiating leader transfer", persistenceId()); @@ -409,8 +413,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // Debugging message to retrieve raft stats. Map peerAddresses = new HashMap<>(); - for(String peerId: context.getPeerIds()) { - peerAddresses.put(peerId, context.getPeerAddress(peerId)); + Map peerVotingStates = new HashMap<>(); + for(PeerInfo info: context.getPeers()) { + peerVotingStates.put(info.getId(), info.isVoting()); + peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : ""); } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); @@ -429,7 +435,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .snapshotIndex(replicatedLog().getSnapshotIndex()) .snapshotTerm(replicatedLog().getSnapshotTerm()) .votedFor(context.getTermInformation().getVotedFor()) + .isVoting(context.isVotingMember()) .peerAddresses(peerAddresses) + .peerVotingStates(peerVotingStates) .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); ReplicatedLogEntry lastLogEntry = replicatedLog().last(); @@ -445,7 +453,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { for(String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()))); + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()), + context.getPeerInfo(info.getId()).isVoting())); } builder.followerInfoList(followerInfoList); @@ -462,12 +471,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onStateChanged(); } + String lastLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastLeaderId(); String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId(); String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name(); // it can happen that the state has not changed but the leader has changed. Optional roleChangeNotifier = getRoleChangeNotifier(); - if(!Objects.equals(lastValidLeaderId, currentBehavior.getLeaderId()) || + if(!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if(roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), @@ -484,7 +494,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } if (roleChangeNotifier.isPresent() && - (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) { + (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } @@ -512,8 +522,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param identifier * @param data */ - protected void persistData(final ActorRef clientActor, final String identifier, - final Payload data) { + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, @@ -525,28 +534,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final RaftActorContext raftContext = getRaftActorContext(); - replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure() { - @Override - public void apply(ReplicatedLogEntry replicatedLogEntry) { - if (!hasFollowers()){ - // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(replicatedLogEntry.getIndex()); - raftContext.setLastApplied(replicatedLogEntry.getIndex()); + replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { + if (!hasFollowers()){ + // Increment the Commit Index and the Last Applied values + raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); + raftContext.setLastApplied(replicatedLogEntry1.getIndex()); - // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self()); + // Apply the state immediately. + self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self()); - // Send a ApplyJournalEntries message so that we write the fact that we applied - // the state to durable storage - self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); + // Send a ApplyJournalEntries message so that we write the fact that we applied + // the state to durable storage + self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); - } else if (clientActor != null) { - context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); + } else { + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); - // Send message for replication - getCurrentBehavior().handleMessage(getSelf(), - new Replicate(clientActor, identifier, replicatedLogEntry)); - } + // Send message for replication + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(clientActor, identifier, replicatedLogEntry1)); } }); } @@ -579,8 +585,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected final boolean isLeaderActive() { - return getRaftState() != RaftState.IsolatedLeader && !shuttingDown && - !isLeadershipTransferInProgress(); + return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader && + !shuttingDown && !isLeadershipTransferInProgress(); } private boolean isLeadershipTransferInProgress() { @@ -594,7 +600,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @return A reference to the leader if known, null otherwise */ - protected ActorSelection getLeader(){ + public ActorSelection getLeader(){ String leaderAddress = getLeaderAddress(); if(leaderAddress == null){ @@ -663,9 +669,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void setPersistence(boolean persistent) { - if(persistent) { + DataPersistenceProvider currentPersistence = persistence(); + if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { setPersistence(new PersistentDataProvider(this)); - } else { + + if(getCurrentBehavior() != null) { + LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId()); + captureSnapshot(); + } + } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { setPersistence(new NonPersistentDataProvider() { /** * The way snapshotting works is, @@ -724,8 +736,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param data A piece of data that was persisted by the persistData call. * This should NEVER be null. */ - protected abstract void applyState(ActorRef clientActor, String identifier, - Object data); + protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data); /** * Returns the RaftActorRecoveryCohort to participate in persistence recovery. @@ -774,7 +785,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void onLeaderChanged(String oldLeader, String newLeader) { - }; + } private String getLeaderAddress(){ if(isLeader()){ @@ -839,57 +850,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - /** - * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries} - * whose type for fromIndex is long instead of int. This class was kept for backwards - * compatibility with Helium. - */ - // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility. - @SuppressWarnings("serial") - @Deprecated - static class DeleteEntries implements Serializable { - private final int fromIndex; - - public DeleteEntries(int fromIndex) { - this.fromIndex = fromIndex; - } - - public int getFromIndex() { - return fromIndex; - } - } - - /** - * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm} - * which has serialVersionUID set. This class was kept for backwards compatibility with Helium. - */ - // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility. - @SuppressWarnings("serial") - @Deprecated - static class UpdateElectionTerm implements Serializable { - private final long currentTerm; - private final String votedFor; - - public UpdateElectionTerm(long currentTerm, String votedFor) { - this.currentTerm = currentTerm; - this.votedFor = votedFor; - } - - public long getCurrentTerm() { - return currentTerm; - } - - public String getVotedFor() { - return votedFor; - } - } - /** * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors. */ private static abstract class BehaviorState implements Immutable { @Nullable abstract RaftActorBehavior getBehavior(); @Nullable abstract String getLastValidLeaderId(); + @Nullable abstract String getLastLeaderId(); @Nullable abstract short getLeaderPayloadVersion(); } @@ -899,10 +866,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final class SimpleBehaviorState extends BehaviorState { private final RaftActorBehavior behavior; private final String lastValidLeaderId; + private final String lastLeaderId; private final short leaderPayloadVersion; - SimpleBehaviorState(final String lastValidLeaderId, final RaftActorBehavior behavior) { + SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId, + final RaftActorBehavior behavior) { this.lastValidLeaderId = lastValidLeaderId; + this.lastLeaderId = lastLeaderId; this.behavior = Preconditions.checkNotNull(behavior); this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } @@ -921,6 +891,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { short getLeaderPayloadVersion() { return leaderPayloadVersion; } + + @Override + String getLastLeaderId() { + return lastLeaderId; + } } /** @@ -949,9 +924,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { short getLeaderPayloadVersion() { return -1; } + + @Override + String getLastLeaderId() { + return null; + } }; private String lastValidLeaderId; + private String lastLeaderId; BehaviorState capture(final RaftActorBehavior behavior) { if (behavior == null) { @@ -959,12 +940,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return NULL_BEHAVIOR_STATE; } - final String leaderId = behavior.getLeaderId(); - if (leaderId != null) { - lastValidLeaderId = leaderId; + lastLeaderId = behavior.getLeaderId(); + if (lastLeaderId != null) { + lastValidLeaderId = lastLeaderId; } - return new SimpleBehaviorState(lastValidLeaderId, behavior); + return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior); } }