import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* The current state determines the current behavior of a RaftActor
private CaptureSnapshot captureSnapshot = null;
- private volatile boolean hasSnapshotCaptureInitiated = false;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
-
-
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
if(LOG.isDebugEnabled()) {
- LOG.debug("SnapshotOffer called..");
+ LOG.debug("{}: SnapshotOffer called..", persistenceId());
}
initRecoveryTimer();
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
}
replicatedLog.append(logEntry);
private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
- context.getLastApplied() + 1, ale.getToIndex());
+ LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
}
for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
ApplyState applyState = (ApplyState) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
}
} else if (message instanceof ApplyLogEntries){
ApplyLogEntries ale = (ApplyLogEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
}
persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
@Override
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
if(LOG.isDebugEnabled()) {
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
snapshot.getLastAppliedTerm()
);
}
} else if (message instanceof SaveSnapshotSuccess) {
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("SaveSnapshotSuccess received for snapshot");
+ LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
long sequenceNumber = success.metadata().sequenceNr();
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
- LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+ LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId(), saveSnapshotFailure.cause());
context.getReplicatedLog().snapshotRollback();
- LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+ LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
} else if (message instanceof CaptureSnapshot) {
- LOG.info("CaptureSnapshot received by actor");
+ LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
if(captureSnapshot == null) {
captureSnapshot = (CaptureSnapshot)message;
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: message: {}", message.getClass());
+ LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
}
}
context.getTermInformation().getCurrentTerm(), data);
if(LOG.isDebugEnabled()) {
- LOG.debug("Persist data {}", replicatedLogEntry);
+ LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
}
final RaftActorContext raftContext = getRaftActorContext();
self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!hasSnapshotCaptureInitiated){
+ if(!context.isSnapshotCaptureInitiated()){
raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
} else {
- LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+ LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+ persistenceId(), getId());
}
} else if (clientActor != null) {
// Send message for replication
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
- leaderId, peerAddress);
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+ persistenceId(), leaderId, peerAddress);
}
return peerAddress;
}
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+ LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
persistence().saveSnapshot(sn);
- LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", captureSnapshot.getLastAppliedIndex(),
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
}
captureSnapshot = null;
- hasSnapshotCaptureInitiated = false;
+ context.setSnapshotCaptureInitiated(false);
}
protected boolean hasFollowers(){
final Procedure<ReplicatedLogEntry> callback) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+ LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
// when a snaphsot is being taken, captureSnapshot != null
- if (hasSnapshotCaptureInitiated == false &&
+ if (!context.isSnapshotCaptureInitiated() &&
( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
dataSizeForCheck > dataThreshold)) {
dataSizeSinceLastSnapshot = 0;
- LOG.info("Initiating Snapshot Capture..");
+ LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ",
- context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+ LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+ persistenceId(), context.getLastApplied());
+ LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+ lastAppliedIndex);
+ LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+ lastAppliedTerm);
}
// send a CaptureSnapshot to self to make the expensive operation async.
getSelf().tell(new CaptureSnapshot(
lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
null);
- hasSnapshotCaptureInitiated = true;
+ context.setSnapshotCaptureInitiated(true);
}
if(callback != null){
callback.apply(replicatedLogEntry);
@Override public void update(long currentTerm, String votedFor) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
}
this.currentTerm = currentTerm;
this.votedFor = votedFor;