import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
-import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
- LOG.debug("SnapshotOffer called..");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("SnapshotOffer called..");
+ }
initRecoveryTimer();
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
- currentBehavior = switchBehavior(RaftState.Follower);
+ currentBehavior = new Follower(context);
onStateChanged();
}
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: message:" + message.getClass());
+ LOG.debug("onReceiveCommand: message: {}", message.getClass());
}
}
- RaftState state =
- currentBehavior.handleMessage(getSender(), message);
RaftActorBehavior oldBehavior = currentBehavior;
- currentBehavior = switchBehavior(state);
+ currentBehavior = currentBehavior.handleMessage(getSender(), message);
+
if(oldBehavior != currentBehavior){
onStateChanged();
}
protected void onLeaderChanged(String oldLeader, String newLeader){};
- private RaftActorBehavior switchBehavior(RaftState state) {
- if (currentBehavior != null) {
- if (currentBehavior.state() == state) {
- return currentBehavior;
- }
- LOG.info("Switching from state " + currentBehavior.state() + " to "
- + state);
-
- try {
- currentBehavior.close();
- } catch (Exception e) {
- LOG.error(e,
- "Failed to close behavior : " + currentBehavior.state());
- }
-
- } else {
- LOG.info("Switching behavior to " + state);
- }
- RaftActorBehavior behavior = null;
- if (state == RaftState.Candidate) {
- behavior = new Candidate(context);
- } else if (state == RaftState.Follower) {
- behavior = new Follower(context);
- } else {
- behavior = new Leader(context);
- }
-
-
-
- return behavior;
- }
-
private void trimPersistentData(long sequenceNumber) {
// Trim akka snapshots
// FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
- + peerAddress);
+ LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
+ leaderId, peerAddress);
}
return peerAddress;
public void appendAndPersist(final ActorRef clientActor,
final String identifier,
final ReplicatedLogEntry replicatedLogEntry) {
- context.getLogger().debug(
- "Append log entry and persist {} ", replicatedLogEntry);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+ }
+
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
journal.add(replicatedLogEntry);