* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.collect.Lists;
-import java.util.Collection;
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
-
- private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
+ private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50);
/**
* This context should NOT be passed directly to any other actor it is
private boolean shuttingDown;
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
protected RaftActor(final String id, final Map<String, String> peerAddresses,
final Optional<ConfigParams> configParams, final short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
- context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
- -1, -1, peerAddresses,
- configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
- delegatingPersistenceProvider, this::handleApplyState, LOG);
+ context = new RaftActorContextImpl(getSelf(), getContext(), id,
+ new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
+ configParams.isPresent() ? configParams.orElseThrow() : new DefaultConfigParamsImpl(),
+ delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
context.setPayloadVersion(payloadVersion);
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
@Override
- public void postStop() {
+ public void postStop() throws Exception {
context.close();
super.postStop();
}
* Handles a message.
*
* @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
- * {@link #handleNonRaftCommand(Object)} instead.
+ * {@link #handleNonRaftCommand(Object)} instead.
*/
@Deprecated
@Override
if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
return;
}
- if (message instanceof ApplyState) {
- ApplyState applyState = (ApplyState) message;
-
+ if (message instanceof ApplyState applyState) {
if (!hasFollowers()) {
// for single node, the capture should happen after the apply state
// as we delete messages from the persistent journal which have made it to the snapshot
}
possiblyHandleBehaviorMessage(message);
- } else if (message instanceof ApplyJournalEntries) {
- ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
+ } else if (message instanceof ApplyJournalEntries applyEntries) {
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
} else if (message instanceof FindLeader) {
- getSender().tell(
- new FindLeaderReply(getLeaderAddress()),
- getSelf()
- );
+ getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
} else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else if (message instanceof InitiateCaptureSnapshot) {
captureSnapshot();
- } else if (message instanceof SwitchBehavior) {
- switchBehavior((SwitchBehavior) message);
- } else if (message instanceof LeaderTransitioning) {
- onLeaderTransitioning((LeaderTransitioning)message);
+ } else if (message instanceof SwitchBehavior switchBehavior) {
+ switchBehavior(switchBehavior);
+ } else if (message instanceof LeaderTransitioning leaderTransitioning) {
+ onLeaderTransitioning(leaderTransitioning);
} else if (message instanceof Shutdown) {
onShutDown();
- } else if (message instanceof Runnable) {
- ((Runnable)message).run();
- } else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload) message, false);
- } else if (message instanceof RequestLeadership) {
- onRequestLeadership((RequestLeadership) message);
+ } else if (message instanceof Runnable runnable) {
+ runnable.run();
+ } else if (message instanceof NoopPayload noopPayload) {
+ persistData(null, null, noopPayload, false);
+ } else if (message instanceof RequestLeadership requestLeadership) {
+ onRequestLeadership(requestLeadership);
} else if (!possiblyHandleBehaviorMessage(message)) {
- handleNonRaftCommand(message);
+ if (message instanceof JournalProtocol.Response response
+ && delegatingPersistenceProvider.handleJournalResponse(response)) {
+ LOG.debug("{}: handled a journal response", persistenceId());
+ } else if (message instanceof SnapshotProtocol.Response response
+ && delegatingPersistenceProvider.handleSnapshotResponse(response)) {
+ LOG.debug("{}: handled a snapshot response", persistenceId());
+ } else {
+ handleNonRaftCommand(message);
+ }
}
}
// non-leader cannot satisfy leadership request
LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ " Current behavior: {}. Sending failure response",
- persistenceId(), getCurrentBehavior().state());
+ persistenceId(), message, getCurrentBehavior().state());
message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+ message.getRequestedFollowerId()
+ ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
}
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
- @Nullable final String followerId, final long newLeaderTimeoutInMillis) {
+ final @Nullable String followerId, final long newLeaderTimeoutInMillis) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
&& leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
- roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+ roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), null,
getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
+ final var builder = newOnDemandRaftStateBuilder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog().dataSize())
builder.lastLogTerm(lastLogEntry.getTerm());
}
- if (getCurrentBehavior() instanceof AbstractLeader) {
- AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
- Collection<String> followerIds = leader.getFollowerIds();
- List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
- for (String id: followerIds) {
- final FollowerLogInformation info = leader.getFollower(id);
- followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
- info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
- context.getPeerInfo(info.getId()).isVoting()));
- }
-
- builder.followerInfoList(followerInfoList);
+ if (getCurrentBehavior() instanceof AbstractLeader leader) {
+ builder.followerInfoList(leader.getFollowerIds().stream()
+ .map(leader::getFollower)
+ .map(info -> new FollowerInfo(info.getId(), info.getNextIndex(), info.getMatchIndex(),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
+ TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
+ context.getPeerInfo(info.getId()).isVoting()))
+ .collect(ImmutableList.toImmutableList()));
}
sender().tell(builder.build(), self());
if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
|| oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
if (roleChangeNotifier.isPresent()) {
- roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
+ roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
currentBehavior.getLeaderPayloadVersion()), getSelf());
}
if (roleChangeNotifier.isPresent()
&& (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
- roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+ roleChangeNotifier.orElseThrow().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
}
if (wasAppended && hasFollowers()) {
// Send log entry for replication.
- getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
- !batchHint));
+ getCurrentBehavior().handleMessage(getSelf(),
+ new Replicate(replicatedLogEntry.getIndex(), !batchHint, clientActor, identifier));
}
}
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- @Nonnull
- protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
+ protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
/**
* Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
- @Nonnull
- protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
+ protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
replicatedLog().last(), idx);
- snapshotManager.capture(replicatedLog().last(), idx);
+ snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
}
}
@Nullable abstract String getLastLeaderId();
- @Nullable abstract short getLeaderPayloadVersion();
+ abstract short getLeaderPayloadVersion();
}
/**
final RaftActorBehavior behavior) {
this.lastValidLeaderId = lastValidLeaderId;
this.lastLeaderId = lastLeaderId;
- this.behavior = Preconditions.checkNotNull(behavior);
- this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
+ this.behavior = requireNonNull(behavior);
+ leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
@Override
BehaviorState capture(final RaftActorBehavior behavior) {
if (behavior == null) {
- Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+ verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
return NULL_BEHAVIOR_STATE;
}