import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
private boolean shuttingDown;
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
protected RaftActor(final String id, final Map<String, String> peerAddresses,
final Optional<ConfigParams> configParams, final short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
- context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
- -1, -1, peerAddresses,
+ context = new RaftActorContextImpl(getSelf(), getContext(), id,
+ new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
* Handles a message.
*
* @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
- * {@link #handleNonRaftCommand(Object)} instead.
+ * {@link #handleNonRaftCommand(Object)} instead.
*/
@Deprecated
@Override
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
} else if (message instanceof FindLeader) {
- getSender().tell(
- new FindLeaderReply(getLeaderAddress()),
- getSelf()
- );
+ getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
} else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else if (message instanceof InitiateCaptureSnapshot) {
} else if (message instanceof RequestLeadership) {
onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
- handleNonRaftCommand(message);
+ if (message instanceof JournalProtocol.Response
+ && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+ LOG.debug("{}: handled a journal response", persistenceId());
+ } else if (message instanceof SnapshotProtocol.Response
+ && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+ LOG.debug("{}: handled a snapshot response", persistenceId());
+ } else {
+ handleNonRaftCommand(message);
+ }
}
}
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
replicatedLog().last(), idx);
- snapshotManager.capture(replicatedLog().last(), idx);
+ snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
}
}
this.lastValidLeaderId = lastValidLeaderId;
this.lastLeaderId = lastLeaderId;
this.behavior = requireNonNull(behavior);
- this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
+ leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
@Override