import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
-import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
initializeBehavior();
raftRecovery = null;
-
- if (context.getReplicatedLog().size() > 0) {
- self().tell(new InitiateCaptureSnapshot(), self());
- LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
- } else {
- LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
- }
}
}
// Debugging message to retrieve raft stats.
Map<String, String> peerAddresses = new HashMap<>();
- for(String peerId: context.getPeerIds()) {
- peerAddresses.put(peerId, context.getPeerAddress(peerId));
+ Map<String, Boolean> peerVotingStates = new HashMap<>();
+ for(PeerInfo info: context.getPeers()) {
+ peerVotingStates.put(info.getId(), info.isVoting());
+ peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
+ .isVoting(context.isVotingMember())
.peerAddresses(peerAddresses)
+ .peerVotingStates(peerVotingStates)
.customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
ReplicatedLogEntry lastLogEntry = replicatedLog().last();
for(String id: followerIds) {
final FollowerLogInformation info = leader.getFollower(id);
followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
- info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+ context.getPeerInfo(info.getId()).isVoting()));
}
builder.followerInfoList(followerInfoList);
* @param identifier
* @param data
*/
- protected void persistData(final ActorRef clientActor, final String identifier,
- final Payload data) {
+ protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
context.getReplicatedLog().lastIndex() + 1,
final RaftActorContext raftContext = getRaftActorContext();
- replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(ReplicatedLogEntry replicatedLogEntry) {
- if (!hasFollowers()){
- // Increment the Commit Index and the Last Applied values
- raftContext.setCommitIndex(replicatedLogEntry.getIndex());
- raftContext.setLastApplied(replicatedLogEntry.getIndex());
+ replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+ if (!hasFollowers()){
+ // Increment the Commit Index and the Last Applied values
+ raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
+ raftContext.setLastApplied(replicatedLogEntry1.getIndex());
- // Apply the state immediately.
- self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
+ // Apply the state immediately.
+ self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
- // Send a ApplyJournalEntries message so that we write the fact that we applied
- // the state to durable storage
- self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+ // Send a ApplyJournalEntries message so that we write the fact that we applied
+ // the state to durable storage
+ self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
- } else if (clientActor != null) {
- context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+ } else if (clientActor != null) {
+ context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
- // Send message for replication
- getCurrentBehavior().handleMessage(getSelf(),
- new Replicate(clientActor, identifier, replicatedLogEntry));
- }
+ // Send message for replication
+ getCurrentBehavior().handleMessage(getSelf(),
+ new Replicate(clientActor, identifier, replicatedLogEntry1));
}
});
}
* @param data A piece of data that was persisted by the persistData call.
* This should NEVER be null.
*/
- protected abstract void applyState(ActorRef clientActor, String identifier,
- Object data);
+ protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.