* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
private boolean shuttingDown;
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
protected RaftActor(final String id, final Map<String, String> peerAddresses,
final Optional<ConfigParams> configParams, final short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
- context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
- -1, -1, peerAddresses,
+ context = new RaftActorContextImpl(getSelf(), getContext(), id,
+ new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
- delegatingPersistenceProvider, this::handleApplyState, LOG);
+ delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
context.setPayloadVersion(payloadVersion);
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
@Override
- public void postStop() {
+ public void postStop() throws Exception {
context.close();
super.postStop();
}
* Handles a message.
*
* @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
- * {@link #handleNonRaftCommand(Object)} instead.
+ * {@link #handleNonRaftCommand(Object)} instead.
*/
@Deprecated
@Override
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
} else if (message instanceof FindLeader) {
- getSender().tell(
- new FindLeaderReply(getLeaderAddress()),
- getSelf()
- );
+ getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
} else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else if (message instanceof InitiateCaptureSnapshot) {
} else if (message instanceof RequestLeadership) {
onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
- handleNonRaftCommand(message);
+ if (message instanceof JournalProtocol.Response
+ && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+ LOG.debug("{}: handled a journal response", persistenceId());
+ } else if (message instanceof SnapshotProtocol.Response
+ && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+ LOG.debug("{}: handled a snapshot response", persistenceId());
+ } else {
+ handleNonRaftCommand(message);
+ }
}
}
// non-leader cannot satisfy leadership request
LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ " Current behavior: {}. Sending failure response",
- persistenceId(), getCurrentBehavior().state());
+ persistenceId(), message, getCurrentBehavior().state());
message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+ message.getRequestedFollowerId()
+ ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
}
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
- @Nullable final String followerId, final long newLeaderTimeoutInMillis) {
+ final @Nullable String followerId, final long newLeaderTimeoutInMillis) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
RaftState newState = message.getNewState();
if (newState == RaftState.Leader || newState == RaftState.Follower) {
+ getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
- getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
} else {
LOG.warn("Switching to behavior : {} - not supported", newState);
}
if (getCurrentBehavior() instanceof AbstractLeader) {
AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
Collection<String> followerIds = leader.getFollowerIds();
- List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
+ List<FollowerInfo> followerInfoList = new ArrayList<>(followerIds.size());
for (String id: followerIds) {
final FollowerLogInformation info = leader.getFollower(id);
followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
- info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
+ TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
context.getPeerInfo(info.getId()).isVoting()));
}
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- @Nonnull
- protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
+ protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
/**
* Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
- @Nonnull
- protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
+ protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
*/
protected abstract Optional<ActorRef> getRoleChangeNotifier();
+ /**
+ * This method is called on the leader when a voting change operation completes.
+ */
+ protected void onVotingStateChangeComplete() {
+ }
+
/**
* This method is called prior to operations such as leadership transfer and actor shutdown when the leader
* must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
replicatedLog().last(), idx);
- snapshotManager.capture(replicatedLog().last(), idx);
+ snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
}
}
@Nullable abstract String getLastLeaderId();
- @Nullable abstract short getLeaderPayloadVersion();
+ abstract short getLeaderPayloadVersion();
}
/**
final RaftActorBehavior behavior) {
this.lastValidLeaderId = lastValidLeaderId;
this.lastLeaderId = lastLeaderId;
- this.behavior = Preconditions.checkNotNull(behavior);
- this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
+ this.behavior = requireNonNull(behavior);
+ leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
@Override
BehaviorState capture(final RaftActorBehavior behavior) {
if (behavior == null) {
- Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+ verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
return NULL_BEHAVIOR_STATE;
}
return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
}
}
-
}