When debugging it's useful to see the shard name in the log output.
Also changed ShardIdentifier to cache the toString() output as the
class is immutable so we don't incur the overhead of building the
String for every log message.
Change-Id: Ic7ea9878eeab04ea9b43a25b7d4b2b190f79e607
Signed-off-by: tpantelis <tpanteli@brocade.com>
private void onRecoveredSnapshot(SnapshotOffer offer) {
if(LOG.isDebugEnabled()) {
- LOG.debug("SnapshotOffer called..");
+ LOG.debug("{}: SnapshotOffer called..", persistenceId());
}
initRecoveryTimer();
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
}
replicatedLog.append(logEntry);
private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
- context.getLastApplied() + 1, ale.getToIndex());
+ LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
}
for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
ApplyState applyState = (ApplyState) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
}
} else if (message instanceof ApplyLogEntries){
ApplyLogEntries ale = (ApplyLogEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
}
persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
@Override
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
if(LOG.isDebugEnabled()) {
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
snapshot.getLastAppliedTerm()
);
}
} else if (message instanceof SaveSnapshotSuccess) {
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("SaveSnapshotSuccess received for snapshot");
+ LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
long sequenceNumber = success.metadata().sequenceNr();
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
- LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+ LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId());
context.getReplicatedLog().snapshotRollback();
- LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+ LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
} else if (message instanceof CaptureSnapshot) {
- LOG.info("CaptureSnapshot received by actor");
+ LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
if(captureSnapshot == null) {
captureSnapshot = (CaptureSnapshot)message;
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: message: {}", message.getClass());
+ LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
}
}
context.getTermInformation().getCurrentTerm(), data);
if(LOG.isDebugEnabled()) {
- LOG.debug("Persist data {}", replicatedLogEntry);
+ LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
}
final RaftActorContext raftContext = getRaftActorContext();
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
} else {
- LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+ LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+ persistenceId(), getId());
}
} else if (clientActor != null) {
// Send message for replication
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
- leaderId, peerAddress);
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+ persistenceId(), leaderId, peerAddress);
}
return peerAddress;
}
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+ LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
persistence().saveSnapshot(sn);
- LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", captureSnapshot.getLastAppliedIndex(),
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
final Procedure<ReplicatedLogEntry> callback) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+ LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
dataSizeSinceLastSnapshot = 0;
- LOG.info("Initiating Snapshot Capture..");
+ LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ",
- context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+ LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+ persistenceId(), context.getLastApplied());
+ LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+ lastAppliedIndex);
+ LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+ lastAppliedTerm);
}
// send a CaptureSnapshot to self to make the expensive operation async.
@Override public void update(long currentTerm, String votedFor) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
}
this.currentTerm = currentTerm;
this.votedFor = votedFor;
leaderId = context.getId();
- LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
+ LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
return this;
if(! appendEntriesReply.isSuccess()) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntriesReply.toString());
+ LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
}
}
followerToLog.get(followerId);
if(followerLogInformation == null){
- LOG.error("Unknown follower {}", followerId);
+ LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
return this;
}
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
+ LOG.debug("{}: InstallSnapshotReply received, " +
"last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
+ context.getId(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1
);
}
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
- followerToLog.get(followerId).getNextIndex());
+ LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+ context.getId(), followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
followerToSnapshot.markSendStatus(true);
}
} else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
+ LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+ context.getId(), reply.getChunkIndex());
followerToSnapshot.markSendStatus(false);
}
} else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
+ LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+ context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
);
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
+ LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
}
// Create a tracker entry we will use this later to notify the
// then snapshot should be sent
if(LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+ "follower-nextIndex: %s, leader-snapshot-index: %s, " +
+ "leader-last-index: %s", context.getId(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
}
actor().tell(new InitiateInstallSnapshot(), actor());
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", e.getKey());
+ LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
// on every install snapshot, we try to capture the snapshot.
// Once a capture is going on, another one issued will get ignored by RaftActor.
private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
).toSerializable(),
actor()
);
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ context.getId(), followerActor.path(),
+ mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
}
} catch (IOException e) {
- LOG.error(e, "InstallSnapshot failed for Leader.");
+ LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
}
}
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
if (LOG.isDebugEnabled()) {
- LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
}
return nextChunk;
}
* snapshot chunks
*/
protected class FollowerToSnapshot {
- private ByteString snapshotBytes;
+ private final ByteString snapshotBytes;
private int offset = 0;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
private int replyReceivedForOffset;
// if replyStatus is false, the previous chunk is attempted
private boolean replyStatus = false;
private int chunkIndex;
- private int totalChunks;
+ private final int totalChunks;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
+ LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+ context.getId(), size, totalChunks);
}
replyReceivedForOffset = -1;
chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("length={}, offset={},size={}",
+ LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
snapshotLength, start, size);
}
ByteString substring = getSnapshotBytes().substring(start, start + size);
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Cannot append entries because sender term {} is less than {}",
- appendEntries.getTerm(), currentTerm());
+ LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
+ context.getId(), appendEntries.getTerm(), currentTerm());
}
sender.tell(
RequestVote requestVote) {
if(LOG.isDebugEnabled()) {
- LOG.debug(requestVote.toString());
+ LOG.debug("{}: Received {}", context.getId(), requestVote);
}
boolean grantVote = false;
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
LOG.warning(
- "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
+ "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ context.getId(), i, i, index);
break;
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Setting last applied to {}", newLastApplied);
+ LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
}
context.setLastApplied(newLastApplied);
try {
close();
} catch (Exception e) {
- LOG.error(e, "Failed to close behavior : {}", this.state());
+ LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
}
return behavior;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import java.util.Set;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import java.util.Set;
-
/**
* The behavior of a RaftActor when it is in the CandidateState
* <p/>
peers = context.getPeerAddresses().keySet();
if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Candidate has following peers: {}", peers);
+ LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers);
}
votesRequired = getMajorityVoteCount(peers.size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
return this;
RaftRPC rpc = (RaftRPC) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+ LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc,
+ context.getTermInformation().getCurrentTerm());
}
// If RPC request or response contains term T > currentTerm:
context.getId());
if(LOG.isDebugEnabled()) {
- LOG.debug("Starting new term {}", (currentTerm + 1));
+ LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1));
}
// Request for a vote
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
}
// it's log.
if(LOG.isDebugEnabled()) {
- LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
// prevLogIndex entry was not found in it's log
if(LOG.isDebugEnabled()) {
- LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , prevLogTerm
+ "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , context.getId(), prevLogTerm
, appendEntries.getPrevLogTerm());
}
} else {
// We found that the log was out of sync so just send a negative
// reply and return
if(LOG.isDebugEnabled()) {
- LOG.debug("Follower ({}) is out-of-sync, " +
+ LOG.debug("{}: Follower ({}) is out-of-sync, " +
"so sending negative reply, lastIndex():{}, lastTerm():{}",
- context.getId(), lastIndex(), lastTerm()
+ context.getId(), context.getId(), lastIndex(), lastTerm()
);
}
sender.tell(
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Number of entries to be appended = {}", appendEntries.getEntries().size()
- );
+ LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+ appendEntries.getEntries().size());
}
// 3. If an existing entry conflicts with a new one (same index
}
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Removing entries from log starting at {}", matchEntry.getIndex()
- );
+ LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+ matchEntry.getIndex());
}
// Entries do not match so remove all subsequent entries
}
if(LOG.isDebugEnabled()) {
- LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
- );
+ LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+ (addEntriesFrom + lastIndex()));
}
// 4. Append any new entries not already in the log
i < appendEntries.getEntries().size(); i++) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ LOG.debug("{}: Append entry to log {}", context.getId(),
+ appendEntries.getEntries().get(i).getData());
}
context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+ LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
}
}
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Commit index set to {}", context.getCommitIndex());
+ LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
}
}
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("applyLogToStateMachine, " +
+ LOG.debug("{}: applyLogToStateMachine, " +
"appendEntries.getLeaderCommit():{}," +
- "context.getLastApplied():{}, lastIndex():{}",
+ "context.getLastApplied():{}, lastIndex():{}", context.getId(),
appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
);
}
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshot received by follower " +
- "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ LOG.debug("{}: InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
);
}
snapshotTracker = null;
} catch (Exception e){
-
- LOG.error(e, "Exception in InstallSnapshot of follower:");
+ LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
if (originalMessage instanceof IsolatedLeaderCheck) {
if (isLeaderIsolated()) {
- LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
- minIsolatedLeaderPeerCount, leaderId);
+ LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), minIsolatedLeaderPeerCount, leaderId);
return switchBehavior(new IsolatedLeader(context));
}
}
// The state of this Shard
private final InMemoryDOMDataStore store;
- private final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
/// The name of this shard
private final ShardIdentifier name;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
- LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
+ LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
}
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity());
+ datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
@Override
public void onReceiveRecover(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveRecover: Received message {} from {}",
- message.getClass().toString(),
- getSender());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
+ message.getClass().toString(), getSender());
}
if (message instanceof RecoveryFailure){
- LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+ LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
+ persistenceId());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
+ LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
}
if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("Current transaction {} has timed out after {} ms - aborting",
- cohortEntry.getTransactionID(), transactionCommitTimeout);
+ LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
- LOG.debug("Committing transaction {}", transactionID);
+ LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
// We're not the current Tx - the Tx was likely expired b/c it took too long in
// between the canCommit and commit messages.
IllegalStateException ex = new IllegalStateException(
- String.format("Cannot commit transaction %s - it is not the current transaction",
- transactionID));
+ String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
new ModificationPayload(cohortEntry.getModification()));
}
} catch (InterruptedException | ExecutionException | IOException e) {
- LOG.error(e, "An exception occurred while preCommitting transaction {}",
- cohortEntry.getTransactionID());
+ LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("Could not finish committing transaction %s - no CohortEntry found",
- transactionID));
+ String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
return;
}
- LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
// We block on the future here so we don't have to worry about possibly accessing our
} catch (InterruptedException | ExecutionException e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
- LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
shardMBean.incrementFailedTransactionsCount();
}
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
- LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+ LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
- ready.getTxnClientVersion());
+ LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+ ready.getTransactionID(), ready.getTxnClientVersion());
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// to provide the compatible behavior.
ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
- LOG.debug("Aborting transaction {}", transactionID);
+ LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
// aborted during replication in which case we may still commit locally if replication
@Override
public void onFailure(final Throwable t) {
- LOG.error(t, "An exception happened during abort");
+ LOG.error(t, "{}: An exception happened during abort", persistenceId());
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
- "Could not find shard leader so transaction cannot be created. This typically happens" +
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
" when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.")), getSelf());
+ " later.", persistenceId()))), getSelf());
}
}
.build();
if(LOG.isDebugEnabled()) {
- LOG.debug("Creating transaction : {} ", transactionId);
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
}
ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
}
}
private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+ LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
if(isLeader()) {
registration = doChangeListenerRegistration(registerChangeListener);
} else {
- LOG.debug("Shard is not the leader - delaying registration");
+ LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg =
new DelayedListenerRegistration(registerChangeListener);
ActorRef listenerRegistration = getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- listenerRegistration.path());
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
- LOG.debug("Registering for path {}", registerChangeListener.getPath());
+ LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
return store.registerChangeListener(registerChangeListener.getPath(), listener,
registerChangeListener.getScope());
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+ LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
}
}
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
- LOG.error("Unknown state received {} during recovery", data);
+ LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
}
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+ LOG.debug("{}: submitted recovery sbapshot", persistenceId());
}
}
@Override
protected void applyCurrentLogRecoveryBatch() {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+ LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
currentLogRecoveryBatch.size());
}
}
Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+ LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
}
for(DOMStoreWriteTransaction tx: txList) {
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
}
}
}
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
}
else if (data instanceof CompositeModificationPayload) {
applyModificationToState(clientActor, identifier, modification);
} else {
- LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- data, data.getClass().getClassLoader(),
+ LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
if(modification == null) {
LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
+ "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
- LOG.info("Applying snapshot");
+ LOG.info("{}: Applying snapshot", persistenceId());
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e, "An exception occurred when applying snapshot");
+ LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
} finally {
- LOG.info("Done applying snapshot");
+ LOG.info("{}: Done applying snapshot", persistenceId());
}
}
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
- "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- entry.getKey(), getId());
+ "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ persistenceId(), entry.getKey(), getId());
}
entry.getValue().close();
}
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.event.LoggingAdapter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
*/
public class ShardCommitCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
private final int queueCapacity;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+ private final LoggingAdapter log;
+
+ private final String name;
+
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
+ this.log = log;
+ this.name = name;
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Processing canCommit for transaction {} for shard {}",
- transactionID, shard.path());
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Processing canCommit for transaction {} for shard {}",
+ name, transactionID, shard.path());
}
// Lookup the cohort entry that was cached previously (or should have been) by
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("No cohort entry found for transaction %s", transactionID));
- LOG.error(ex.getMessage());
+ String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
return;
}
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
- LOG.debug("Transaction {} is already in progress - queueing transaction {}",
- currentCohortEntry.getTransactionID(), transactionID);
+ log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
+ name, currentCohortEntry.getTransactionID(), transactionID);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
- String.format("Could not enqueue transaction %s - the maximum commit queue"+
+ String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
- transactionID, queueCapacity));
- LOG.error(ex.getMessage());
+ name, transactionID, queueCapacity));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
}
} else {
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
- LOG.debug("An exception occurred during canCommit", e);
+ log.debug("{}: An exception occurred during canCommit: {}", name, e);
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.event.LoggingAdapter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
private static final int TIME_OUT = 10;
- private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
+ private final LoggingAdapter log;
+ private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+ String name) {
this.schemaContext = schemaContext;
this.shardName = shardName;
+ this.log = log;
+ this.name = name;
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true)
if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
return resultingTxList;
} else {
- LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+ log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
private final String shardName;
private final String memberName;
private final String type;
+ private final String fullName;
public ShardIdentifier(String shardName, String memberName, String type) {
this.shardName = shardName;
this.memberName = memberName;
this.type = type;
+
+ fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-")
+ .append(type).toString();
}
@Override
return result;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
//ensure the output of toString matches the pattern above
- return new StringBuilder(memberName)
- .append("-shard-")
- .append(shardName)
- .append("-")
- .append(type)
- .toString();
+ return fullName;
}
public static Builder builder(){