When debugging it's useful to see the shard name in the log output.
Also changed ShardIdentifier to cache the toString() output as the
class is immutable so we don't incur the overhead of building the
String for every log message.
Change-Id: Ic7ea9878eeab04ea9b43a25b7d4b2b190f79e607
Signed-off-by: tpantelis <tpanteli@brocade.com>
private void onRecoveredSnapshot(SnapshotOffer offer) {
if(LOG.isDebugEnabled()) {
private void onRecoveredSnapshot(SnapshotOffer offer) {
if(LOG.isDebugEnabled()) {
- LOG.debug("SnapshotOffer called..");
+ LOG.debug("{}: SnapshotOffer called..", persistenceId());
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
if(LOG.isDebugEnabled()) {
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
}
replicatedLog.append(logEntry);
}
replicatedLog.append(logEntry);
private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
if(LOG.isDebugEnabled()) {
private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
- context.getLastApplied() + 1, ale.getToIndex());
+ LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
}
for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
}
for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
ApplyState applyState = (ApplyState) message;
if(LOG.isDebugEnabled()) {
ApplyState applyState = (ApplyState) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
}
applyState.getReplicatedLogEntry().getData());
}
} else if (message instanceof ApplyLogEntries){
ApplyLogEntries ale = (ApplyLogEntries) message;
if(LOG.isDebugEnabled()) {
} else if (message instanceof ApplyLogEntries){
ApplyLogEntries ale = (ApplyLogEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
}
persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
@Override
}
persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
@Override
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
if(LOG.isDebugEnabled()) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
if(LOG.isDebugEnabled()) {
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
snapshot.getLastAppliedTerm()
);
}
snapshot.getLastAppliedTerm()
);
}
} else if (message instanceof SaveSnapshotSuccess) {
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
} else if (message instanceof SaveSnapshotSuccess) {
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("SaveSnapshotSuccess received for snapshot");
+ LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
long sequenceNumber = success.metadata().sequenceNr();
long sequenceNumber = success.metadata().sequenceNr();
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
- LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+ LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId());
context.getReplicatedLog().snapshotRollback();
context.getReplicatedLog().snapshotRollback();
- LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+ LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
} else if (message instanceof CaptureSnapshot) {
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
} else if (message instanceof CaptureSnapshot) {
- LOG.info("CaptureSnapshot received by actor");
+ LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
if(captureSnapshot == null) {
captureSnapshot = (CaptureSnapshot)message;
if(captureSnapshot == null) {
captureSnapshot = (CaptureSnapshot)message;
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: message: {}", message.getClass());
+ LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
context.getTermInformation().getCurrentTerm(), data);
if(LOG.isDebugEnabled()) {
context.getTermInformation().getCurrentTerm(), data);
if(LOG.isDebugEnabled()) {
- LOG.debug("Persist data {}", replicatedLogEntry);
+ LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
}
final RaftActorContext raftContext = getRaftActorContext();
}
final RaftActorContext raftContext = getRaftActorContext();
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
} else {
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
} else {
- LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+ LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+ persistenceId(), getId());
}
} else if (clientActor != null) {
// Send message for replication
}
} else if (clientActor != null) {
// Send message for replication
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
- leaderId, peerAddress);
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+ persistenceId(), leaderId, peerAddress);
}
return peerAddress;
}
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
}
return peerAddress;
}
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+ LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
persistence().saveSnapshot(sn);
persistence().saveSnapshot(sn);
- LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", captureSnapshot.getLastAppliedIndex(),
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
final Procedure<ReplicatedLogEntry> callback) {
if(LOG.isDebugEnabled()) {
final Procedure<ReplicatedLogEntry> callback) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+ LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
dataSizeSinceLastSnapshot = 0;
dataSizeSinceLastSnapshot = 0;
- LOG.info("Initiating Snapshot Capture..");
+ LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
}
if(LOG.isDebugEnabled()) {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ",
- context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+ LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+ persistenceId(), context.getLastApplied());
+ LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+ lastAppliedIndex);
+ LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+ lastAppliedTerm);
}
// send a CaptureSnapshot to self to make the expensive operation async.
}
// send a CaptureSnapshot to self to make the expensive operation async.
@Override public void update(long currentTerm, String votedFor) {
if(LOG.isDebugEnabled()) {
@Override public void update(long currentTerm, String votedFor) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
}
this.currentTerm = currentTerm;
this.votedFor = votedFor;
}
this.currentTerm = currentTerm;
this.votedFor = votedFor;
leaderId = context.getId();
leaderId = context.getId();
- LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
+ LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
if(! appendEntriesReply.isSuccess()) {
if(LOG.isDebugEnabled()) {
if(! appendEntriesReply.isSuccess()) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntriesReply.toString());
+ LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
followerToLog.get(followerId);
if(followerLogInformation == null){
followerToLog.get(followerId);
if(followerLogInformation == null){
- LOG.error("Unknown follower {}", followerId);
+ LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
+ LOG.debug("{}: InstallSnapshotReply received, " +
"last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
"last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
+ context.getId(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1
);
}
context.getReplicatedLog().getSnapshotIndex() + 1
);
}
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
- followerToLog.get(followerId).getNextIndex());
+ LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+ context.getId(), followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
}
if (mapFollowerToSnapshot.isEmpty()) {
followerToSnapshot.markSendStatus(true);
}
} else {
followerToSnapshot.markSendStatus(true);
}
} else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
+ LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+ context.getId(), reply.getChunkIndex());
followerToSnapshot.markSendStatus(false);
}
} else {
followerToSnapshot.markSendStatus(false);
}
} else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
+ LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+ context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
);
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
);
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
+ LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
}
// Create a tracker entry we will use this later to notify the
}
// Create a tracker entry we will use this later to notify the
// then snapshot should be sent
if(LOG.isDebugEnabled()) {
// then snapshot should be sent
if(LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+ "follower-nextIndex: %s, leader-snapshot-index: %s, " +
+ "leader-last-index: %s", context.getId(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
}
actor().tell(new InitiateInstallSnapshot(), actor());
}
actor().tell(new InitiateInstallSnapshot(), actor());
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", e.getKey());
+ LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
// on every install snapshot, we try to capture the snapshot.
// Once a capture is going on, another one issued will get ignored by RaftActor.
private void initiateCaptureSnapshot() {
// on every install snapshot, we try to capture the snapshot.
// Once a capture is going on, another one issued will get ignored by RaftActor.
private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
).toSerializable(),
actor()
);
).toSerializable(),
actor()
);
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ context.getId(), followerActor.path(),
+ mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
}
} catch (IOException e) {
}
} catch (IOException e) {
- LOG.error(e, "InstallSnapshot failed for Leader.");
+ LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
if (LOG.isDebugEnabled()) {
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
if (LOG.isDebugEnabled()) {
- LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
* snapshot chunks
*/
protected class FollowerToSnapshot {
* snapshot chunks
*/
protected class FollowerToSnapshot {
- private ByteString snapshotBytes;
+ private final ByteString snapshotBytes;
private int offset = 0;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
private int replyReceivedForOffset;
// if replyStatus is false, the previous chunk is attempted
private boolean replyStatus = false;
private int chunkIndex;
private int offset = 0;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
private int replyReceivedForOffset;
// if replyStatus is false, the previous chunk is attempted
private boolean replyStatus = false;
private int chunkIndex;
- private int totalChunks;
+ private final int totalChunks;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
+ LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+ context.getId(), size, totalChunks);
}
replyReceivedForOffset = -1;
chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
replyReceivedForOffset = -1;
chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
if(LOG.isDebugEnabled()) {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("length={}, offset={},size={}",
+ LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
snapshotLength, start, size);
}
ByteString substring = getSnapshotBytes().substring(start, start + size);
snapshotLength, start, size);
}
ByteString substring = getSnapshotBytes().substring(start, start + size);
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
if(LOG.isDebugEnabled()) {
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Cannot append entries because sender term {} is less than {}",
- appendEntries.getTerm(), currentTerm());
+ LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
+ context.getId(), appendEntries.getTerm(), currentTerm());
RequestVote requestVote) {
if(LOG.isDebugEnabled()) {
RequestVote requestVote) {
if(LOG.isDebugEnabled()) {
- LOG.debug(requestVote.toString());
+ LOG.debug("{}: Received {}", context.getId(), requestVote);
}
boolean grantVote = false;
}
boolean grantVote = false;
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
LOG.warning(
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
LOG.warning(
- "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
+ "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ context.getId(), i, i, index);
break;
}
}
if(LOG.isDebugEnabled()) {
break;
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Setting last applied to {}", newLastApplied);
+ LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
}
context.setLastApplied(newLastApplied);
}
context.setLastApplied(newLastApplied);
try {
close();
} catch (Exception e) {
try {
close();
} catch (Exception e) {
- LOG.error(e, "Failed to close behavior : {}", this.state());
+ LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
/**
* The behavior of a RaftActor when it is in the CandidateState
* <p/>
/**
* The behavior of a RaftActor when it is in the CandidateState
* <p/>
peers = context.getPeerAddresses().keySet();
if(LOG.isDebugEnabled()) {
peers = context.getPeerAddresses().keySet();
if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Candidate has following peers: {}", peers);
+ LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers);
}
votesRequired = getMajorityVoteCount(peers.size());
}
votesRequired = getMajorityVoteCount(peers.size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
RaftRPC rpc = (RaftRPC) message;
if(LOG.isDebugEnabled()) {
RaftRPC rpc = (RaftRPC) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+ LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc,
+ context.getTermInformation().getCurrentTerm());
}
// If RPC request or response contains term T > currentTerm:
}
// If RPC request or response contains term T > currentTerm:
context.getId());
if(LOG.isDebugEnabled()) {
context.getId());
if(LOG.isDebugEnabled()) {
- LOG.debug("Starting new term {}", (currentTerm + 1));
+ LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1));
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
// it's log.
if(LOG.isDebugEnabled()) {
// it's log.
if(LOG.isDebugEnabled()) {
- LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
}
} else if (lastIndex() > -1
// prevLogIndex entry was not found in it's log
if(LOG.isDebugEnabled()) {
// prevLogIndex entry was not found in it's log
if(LOG.isDebugEnabled()) {
- LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
}
} else if (lastIndex() > -1
if (LOG.isDebugEnabled()) {
LOG.debug(
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , prevLogTerm
+ "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , context.getId(), prevLogTerm
, appendEntries.getPrevLogTerm());
}
} else {
, appendEntries.getPrevLogTerm());
}
} else {
// We found that the log was out of sync so just send a negative
// reply and return
if(LOG.isDebugEnabled()) {
// We found that the log was out of sync so just send a negative
// reply and return
if(LOG.isDebugEnabled()) {
- LOG.debug("Follower ({}) is out-of-sync, " +
+ LOG.debug("{}: Follower ({}) is out-of-sync, " +
"so sending negative reply, lastIndex():{}, lastTerm():{}",
"so sending negative reply, lastIndex():{}, lastTerm():{}",
- context.getId(), lastIndex(), lastTerm()
+ context.getId(), context.getId(), lastIndex(), lastTerm()
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Number of entries to be appended = {}", appendEntries.getEntries().size()
- );
+ LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+ appendEntries.getEntries().size());
}
// 3. If an existing entry conflicts with a new one (same index
}
// 3. If an existing entry conflicts with a new one (same index
}
if(LOG.isDebugEnabled()) {
}
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Removing entries from log starting at {}", matchEntry.getIndex()
- );
+ LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+ matchEntry.getIndex());
}
// Entries do not match so remove all subsequent entries
}
// Entries do not match so remove all subsequent entries
}
if(LOG.isDebugEnabled()) {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
- );
+ LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+ (addEntriesFrom + lastIndex()));
}
// 4. Append any new entries not already in the log
}
// 4. Append any new entries not already in the log
i < appendEntries.getEntries().size(); i++) {
if(LOG.isDebugEnabled()) {
i < appendEntries.getEntries().size(); i++) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ LOG.debug("{}: Append entry to log {}", context.getId(),
+ appendEntries.getEntries().get(i).getData());
}
context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
}
context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+ LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Commit index set to {}", context.getCommitIndex());
+ LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
if(LOG.isDebugEnabled()) {
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("applyLogToStateMachine, " +
+ LOG.debug("{}: applyLogToStateMachine, " +
"appendEntries.getLeaderCommit():{}," +
"appendEntries.getLeaderCommit():{}," +
- "context.getLastApplied():{}, lastIndex():{}",
+ "context.getLastApplied():{}, lastIndex():{}", context.getId(),
appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
);
}
appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
);
}
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
if(LOG.isDebugEnabled()) {
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshot received by follower " +
- "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ LOG.debug("{}: InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
);
}
installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
);
}
snapshotTracker = null;
} catch (Exception e){
snapshotTracker = null;
} catch (Exception e){
-
- LOG.error(e, "Exception in InstallSnapshot of follower:");
+ LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
if (originalMessage instanceof IsolatedLeaderCheck) {
if (isLeaderIsolated()) {
if (originalMessage instanceof IsolatedLeaderCheck) {
if (isLeaderIsolated()) {
- LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
- minIsolatedLeaderPeerCount, leaderId);
+ LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), minIsolatedLeaderPeerCount, leaderId);
return switchBehavior(new IsolatedLeader(context));
}
}
return switchBehavior(new IsolatedLeader(context));
}
}
// The state of this Shard
private final InMemoryDOMDataStore store;
// The state of this Shard
private final InMemoryDOMDataStore store;
- private final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
/// The name of this shard
private final ShardIdentifier name;
/// The name of this shard
private final ShardIdentifier name;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
- LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
+ LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
}
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
}
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity());
+ datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
@Override
public void onReceiveRecover(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
@Override
public void onReceiveRecover(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveRecover: Received message {} from {}",
- message.getClass().toString(),
- getSender());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
+ message.getClass().toString(), getSender());
}
if (message instanceof RecoveryFailure){
}
if (message instanceof RecoveryFailure){
- LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+ LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
+ persistenceId());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
+ LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
}
if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
}
if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("Current transaction {} has timed out after {} ms - aborting",
- cohortEntry.getTransactionID(), transactionCommitTimeout);
+ LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
- LOG.debug("Committing transaction {}", transactionID);
+ LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
// We're not the current Tx - the Tx was likely expired b/c it took too long in
// between the canCommit and commit messages.
IllegalStateException ex = new IllegalStateException(
// We're not the current Tx - the Tx was likely expired b/c it took too long in
// between the canCommit and commit messages.
IllegalStateException ex = new IllegalStateException(
- String.format("Cannot commit transaction %s - it is not the current transaction",
- transactionID));
+ String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
LOG.error(ex.getMessage());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
new ModificationPayload(cohortEntry.getModification()));
}
} catch (InterruptedException | ExecutionException | IOException e) {
new ModificationPayload(cohortEntry.getModification()));
}
} catch (InterruptedException | ExecutionException | IOException e) {
- LOG.error(e, "An exception occurred while preCommitting transaction {}",
- cohortEntry.getTransactionID());
+ LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("Could not finish committing transaction %s - no CohortEntry found",
- transactionID));
+ String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
- LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
// We block on the future here so we don't have to worry about possibly accessing our
try {
// We block on the future here so we don't have to worry about possibly accessing our
} catch (InterruptedException | ExecutionException e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
} catch (InterruptedException | ExecutionException e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
- LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
shardMBean.incrementFailedTransactionsCount();
}
shardMBean.incrementFailedTransactionsCount();
}
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
- LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+ LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
- ready.getTxnClientVersion());
+ LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+ ready.getTransactionID(), ready.getTxnClientVersion());
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// to provide the compatible behavior.
ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
// to provide the compatible behavior.
ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
}
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
- LOG.debug("Aborting transaction {}", transactionID);
+ LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
// aborted during replication in which case we may still commit locally if replication
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
// aborted during replication in which case we may still commit locally if replication
@Override
public void onFailure(final Throwable t) {
@Override
public void onFailure(final Throwable t) {
- LOG.error(t, "An exception happened during abort");
+ LOG.error(t, "{}: An exception happened during abort", persistenceId());
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
- "Could not find shard leader so transaction cannot be created. This typically happens" +
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
" when the system is coming up or recovering and a leader is being elected. Try again" +
" when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.")), getSelf());
+ " later.", persistenceId()))), getSelf());
.build();
if(LOG.isDebugEnabled()) {
.build();
if(LOG.isDebugEnabled()) {
- LOG.debug("Creating transaction : {} ", transactionId);
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
}
ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
}
ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+ LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
if(isLeader()) {
registration = doChangeListenerRegistration(registerChangeListener);
} else {
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
if(isLeader()) {
registration = doChangeListenerRegistration(registerChangeListener);
} else {
- LOG.debug("Shard is not the leader - delaying registration");
+ LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg =
new DelayedListenerRegistration(registerChangeListener);
DelayedListenerRegistration delayedReg =
new DelayedListenerRegistration(registerChangeListener);
ActorRef listenerRegistration = getContext().actorOf(
DataChangeListenerRegistration.props(registration));
ActorRef listenerRegistration = getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- listenerRegistration.path());
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
- LOG.debug("Registering for path {}", registerChangeListener.getPath());
+ LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
return store.registerChangeListener(registerChangeListener.getPath(), listener,
registerChangeListener.getScope());
return store.registerChangeListener(registerChangeListener.getPath(), listener,
registerChangeListener.getScope());
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
if(LOG.isDebugEnabled()) {
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+ LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
- LOG.error("Unknown state received {} during recovery", data);
+ LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
}
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
if(recoveryCoordinator == null) {
}
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
}
recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+ LOG.debug("{}: submitted recovery sbapshot", persistenceId());
}
}
@Override
protected void applyCurrentLogRecoveryBatch() {
if(recoveryCoordinator == null) {
}
}
@Override
protected void applyCurrentLogRecoveryBatch() {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
}
recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+ LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
currentLogRecoveryBatch.size());
}
}
currentLogRecoveryBatch.size());
}
}
Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
if(LOG.isDebugEnabled()) {
Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+ LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
}
for(DOMStoreWriteTransaction tx: txList) {
}
for(DOMStoreWriteTransaction tx: txList) {
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
}
else if (data instanceof CompositeModificationPayload) {
}
}
else if (data instanceof CompositeModificationPayload) {
applyModificationToState(clientActor, identifier, modification);
} else {
applyModificationToState(clientActor, identifier, modification);
} else {
- LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- data, data.getClass().getClassLoader(),
+ LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
CompositeModificationPayload.class.getClassLoader());
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
if(modification == null) {
LOG.error(
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
if(modification == null) {
LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
+ "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
- LOG.info("Applying snapshot");
+ LOG.info("{}: Applying snapshot", persistenceId());
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e, "An exception occurred when applying snapshot");
+ LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
- LOG.info("Done applying snapshot");
+ LOG.info("{}: Done applying snapshot", persistenceId());
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
- "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- entry.getKey(), getId());
+ "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ persistenceId(), entry.getKey(), getId());
}
entry.getValue().close();
}
}
entry.getValue().close();
}
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.event.LoggingAdapter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
*/
public class ShardCommitCoordinator {
*/
public class ShardCommitCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
private final int queueCapacity;
private final int queueCapacity;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+ private final LoggingAdapter log;
+
+ private final String name;
+
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
+ this.log = log;
+ this.name = name;
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Processing canCommit for transaction {} for shard {}",
- transactionID, shard.path());
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Processing canCommit for transaction {} for shard {}",
+ name, transactionID, shard.path());
}
// Lookup the cohort entry that was cached previously (or should have been) by
}
// Lookup the cohort entry that was cached previously (or should have been) by
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("No cohort entry found for transaction %s", transactionID));
- LOG.error(ex.getMessage());
+ String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
return;
}
sender.tell(new Status.Failure(ex), shard);
return;
}
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
- LOG.debug("Transaction {} is already in progress - queueing transaction {}",
- currentCohortEntry.getTransactionID(), transactionID);
+ log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
+ name, currentCohortEntry.getTransactionID(), transactionID);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
- String.format("Could not enqueue transaction %s - the maximum commit queue"+
+ String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
" capacity %d has been reached.",
- transactionID, queueCapacity));
- LOG.error(ex.getMessage());
+ name, transactionID, queueCapacity));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
}
} else {
sender.tell(new Status.Failure(ex), shard);
}
} else {
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
- LOG.debug("An exception occurred during canCommit", e);
+ log.debug("{}: An exception occurred during canCommit: {}", name, e);
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.event.LoggingAdapter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
private static final int TIME_OUT = 10;
private static final int TIME_OUT = 10;
- private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
+ private final LoggingAdapter log;
+ private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+ String name) {
this.schemaContext = schemaContext;
this.shardName = shardName;
this.schemaContext = schemaContext;
this.shardName = shardName;
+ this.log = log;
+ this.name = name;
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true)
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true)
if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
return resultingTxList;
} else {
if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
return resultingTxList;
} else {
- LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+ log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
private final String shardName;
private final String memberName;
private final String type;
private final String shardName;
private final String memberName;
private final String type;
+ private final String fullName;
public ShardIdentifier(String shardName, String memberName, String type) {
public ShardIdentifier(String shardName, String memberName, String type) {
this.shardName = shardName;
this.memberName = memberName;
this.type = type;
this.shardName = shardName;
this.memberName = memberName;
this.type = type;
+
+ fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-")
+ .append(type).toString();
- @Override public String toString() {
+ @Override
+ public String toString() {
//ensure the output of toString matches the pattern above
//ensure the output of toString matches the pattern above
- return new StringBuilder(memberName)
- .append("-shard-")
- .append(shardName)
- .append("-")
- .append(type)
- .toString();
}
public static Builder builder(){
}
public static Builder builder(){