Improve Shard logging output 60/14660/6
authortpantelis <tpanteli@brocade.com>
Fri, 30 Jan 2015 23:29:15 +0000 (18:29 -0500)
committertpantelis <tpanteli@brocade.com>
Mon, 2 Feb 2015 19:02:24 +0000 (14:02 -0500)
When debugging it's useful to see the shard name in the log output.

Also changed ShardIdentifier to cache the toString() output as the
class is immutable so we don't incur the overhead of building the
String for every log message.

Change-Id: Ic7ea9878eeab04ea9b43a25b7d4b2b190f79e607
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java

index c256c822a420e3a22b5a351778d58a88e73a9e8d..aa7b4533b7758f7a3f457ea48b1ae72cc6d94162 100644 (file)
@@ -179,7 +179,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.debug("{}: SnapshotOffer called..", persistenceId());
         }
 
         initRecoveryTimer();
@@ -209,7 +209,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
         replicatedLog.append(logEntry);
@@ -217,8 +217,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getLastApplied() + 1, ale.getToIndex());
+            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
         }
 
         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
@@ -289,8 +289,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyState applyState = (ApplyState) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Applying state for log index {} data {}",
-                    applyState.getReplicatedLogEntry().getIndex(),
+                LOG.debug("{}: Applying state for log index {} data {}",
+                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
                     applyState.getReplicatedLogEntry().getData());
             }
 
@@ -300,7 +300,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof ApplyLogEntries){
             ApplyLogEntries ale = (ApplyLogEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
             }
             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
@@ -312,8 +312,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
                     snapshot.getLastAppliedTerm()
                 );
             }
@@ -333,7 +333,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("SaveSnapshotSuccess received for snapshot");
+            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
 
             long sequenceNumber = success.metadata().sequenceNr();
 
@@ -342,19 +342,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
 
-            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
-            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+            LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+                    persistenceId());
 
             context.getReplicatedLog().snapshotRollback();
 
-            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
                 context.getReplicatedLog().getSnapshotIndex(),
                 context.getReplicatedLog().getSnapshotTerm(),
                 context.getReplicatedLog().size());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.info("CaptureSnapshot received by actor");
+            LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
 
             if(captureSnapshot == null) {
                 captureSnapshot = (CaptureSnapshot)message;
@@ -368,7 +368,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
+                    LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
                 }
             }
 
@@ -414,7 +414,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             context.getTermInformation().getCurrentTerm(), data);
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Persist data {}", replicatedLogEntry);
+            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
         }
 
         final RaftActorContext raftContext = getRaftActorContext();
@@ -441,7 +441,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                         raftContext.getTermInformation().getCurrentTerm());
                                 raftContext.getReplicatedLog().snapshotCommit();
                             } else {
-                                LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+                                        persistenceId(), getId());
                             }
                         } else if (clientActor != null) {
                             // Send message for replication
@@ -652,15 +653,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
-                    leaderId, peerAddress);
+            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+                    persistenceId(), leaderId, peerAddress);
         }
 
         return peerAddress;
     }
 
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+        LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
         // create a snapshot object from the state provided and save it
         // when snapshot is saved async, SaveSnapshotSuccess is raised.
@@ -672,7 +673,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         persistence().saveSnapshot(sn);
 
-        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
         //be greedy and remove entries from in-mem journal which are in the snapshot
         // and update snapshotIndex and snapshotTerm without waiting for the success,
@@ -681,8 +682,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
-        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
-            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
@@ -751,7 +752,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             final Procedure<ReplicatedLogEntry> callback)  {
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
             }
 
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
@@ -799,7 +800,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
                             dataSizeSinceLastSnapshot = 0;
 
-                            LOG.info("Initiating Snapshot Capture..");
+                            LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
@@ -813,11 +814,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             if(LOG.isDebugEnabled()) {
-                                LOG.debug("Snapshot Capture logSize: {}", journal.size());
-                                LOG.debug("Snapshot Capture lastApplied:{} ",
-                                    context.getLastApplied());
-                                LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
-                                LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+                                        persistenceId(), context.getLastApplied());
+                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+                                        lastAppliedIndex);
+                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+                                        lastAppliedTerm);
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
@@ -869,7 +872,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         @Override public void update(long currentTerm, String votedFor) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
             }
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;
index 462c94ec8a40736cc005c994b74520d9111c3430..da1627b98e7e4e8204385914795300ce073a71ad 100644 (file)
@@ -109,7 +109,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         leaderId = context.getId();
 
-        LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
+        LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
 
         minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
@@ -153,7 +153,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug(appendEntries.toString());
+            LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
         }
 
         return this;
@@ -165,7 +165,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if(! appendEntriesReply.isSuccess()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntriesReply.toString());
+                LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
             }
         }
 
@@ -175,7 +175,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             followerToLog.get(followerId);
 
         if(followerLogInformation == null){
-            LOG.error("Unknown follower {}", followerId);
+            LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
             return this;
         }
 
@@ -322,9 +322,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("InstallSnapshotReply received, " +
+                        LOG.debug("{}: InstallSnapshotReply received, " +
                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
-                            reply.getChunkIndex(), followerId,
+                                context.getId(), reply.getChunkIndex(), followerId,
                             context.getReplicatedLog().getSnapshotIndex() + 1
                         );
                     }
@@ -336,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
-                            followerToLog.get(followerId).getNextIndex());
+                        LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+                                context.getId(), followerToLog.get(followerId).getNextIndex());
                     }
 
                     if (mapFollowerToSnapshot.isEmpty()) {
@@ -350,19 +350,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     followerToSnapshot.markSendStatus(true);
                 }
             } else {
-                LOG.info("InstallSnapshotReply received, " +
-                        "sending snapshot chunk failed, Will retry, Chunk:{}",
-                    reply.getChunkIndex()
-                );
+                LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+                        context.getId(), reply.getChunkIndex());
 
                 followerToSnapshot.markSendStatus(false);
             }
 
         } else {
-            LOG.error("ERROR!!" +
-                    "FollowerId in InstallSnapshotReply not known to Leader" +
+            LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+                    context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
             );
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
@@ -377,7 +374,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message {}", logIndex);
+            LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
         }
 
         // Create a tracker entry we will use this later to notify the
@@ -434,11 +431,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         // then snapshot should be sent
 
                         if(LOG.isDebugEnabled()) {
-                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
-                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                    "leader-last-index:{}", followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                            );
+                            LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+                                    "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
+                                    "leader-last-index: %s", context.getId(), followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
                         }
                         actor().tell(new InitiateInstallSnapshot(), actor());
 
@@ -494,7 +490,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", e.getKey());
+                    LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
@@ -516,7 +512,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     // on every install snapshot, we try to capture the snapshot.
     // Once a capture is going on, another one issued will get ignored by RaftActor.
     private void initiateCaptureSnapshot() {
-        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+        LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
         ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
         long lastAppliedIndex = -1;
         long lastAppliedTerm = -1;
@@ -569,12 +565,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     ).toSerializable(),
                     actor()
                 );
-                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
+                LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                        context.getId(), followerActor.path(),
+                        mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks());
             }
         } catch (IOException e) {
-            LOG.error(e, "InstallSnapshot failed for Leader.");
+            LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
         }
     }
 
@@ -590,7 +587,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
         ByteString nextChunk = followerToSnapshot.getNextChunk();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+            LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
         }
         return nextChunk;
     }
@@ -654,14 +651,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * snapshot chunks
      */
     protected class FollowerToSnapshot {
-        private ByteString snapshotBytes;
+        private final ByteString snapshotBytes;
         private int offset = 0;
         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
         private int replyReceivedForOffset;
         // if replyStatus is false, the previous chunk is attempted
         private boolean replyStatus = false;
         private int chunkIndex;
-        private int totalChunks;
+        private final int totalChunks;
         private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
         private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
@@ -671,8 +668,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
-                    size, totalChunks);
+                LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+                        context.getId(), size, totalChunks);
             }
             replyReceivedForOffset = -1;
             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
@@ -741,7 +738,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("length={}, offset={},size={}",
+                LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
                     snapshotLength, start, size);
             }
             ByteString substring = getSnapshotBytes().substring(start, start + size);
index 04462be0420eaa3f0504be0523b7c9371128273d..dbeafe9eb8b2fce467451eb4594c6c1be913797e 100644 (file)
@@ -94,8 +94,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         // 1. Reply false if term < currentTerm (§5.1)
         if (appendEntries.getTerm() < currentTerm()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Cannot append entries because sender term {} is less than {}",
-                        appendEntries.getTerm(), currentTerm());
+                LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
+                        context.getId(), appendEntries.getTerm(), currentTerm());
             }
 
             sender.tell(
@@ -136,7 +136,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         RequestVote requestVote) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug(requestVote.toString());
+            LOG.debug("{}: Received {}", context.getId(), requestVote);
         }
 
         boolean grantVote = false;
@@ -350,12 +350,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
                 LOG.warning(
-                        "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
+                        "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+                        context.getId(), i, i, index);
                 break;
             }
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Setting last applied to {}", newLastApplied);
+            LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
         }
         context.setLastApplied(newLastApplied);
 
@@ -393,7 +394,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         try {
             close();
         } catch (Exception e) {
-            LOG.error(e, "Failed to close behavior : {}", this.state());
+            LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
         }
 
         return behavior;
index 702417273ff586fc635ebcb65e15fa4c8bd64885..09ffe056c3e94fcd4592f8f44f4e29123967f918 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import java.util.Set;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -19,8 +20,6 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.Set;
-
 /**
  * The behavior of a RaftActor when it is in the CandidateState
  * <p/>
@@ -53,7 +52,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         peers = context.getPeerAddresses().keySet();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Candidate has following peers: {}", peers);
+            LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers);
         }
 
         votesRequired = getMajorityVoteCount(peers.size());
@@ -66,7 +65,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug(appendEntries.toString());
+            LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
         }
 
         return this;
@@ -106,7 +105,8 @@ public class Candidate extends AbstractRaftActorBehavior {
             RaftRPC rpc = (RaftRPC) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+                LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc,
+                        context.getTermInformation().getCurrentTerm());
             }
 
             // If RPC request or response contains term T > currentTerm:
@@ -150,7 +150,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             context.getId());
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Starting new term {}", (currentTerm + 1));
+            LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1));
         }
 
         // Request for a vote
index cc2e55d51b115cc124890c3e643ed810244a7c24..31b5efbe3878df73f74d46d90d018751dca67bbf 100644 (file)
@@ -77,7 +77,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntries.toString());
+                LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
             }
         }
 
@@ -109,8 +109,8 @@ public class Follower extends AbstractRaftActorBehavior {
             // it's log.
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
-                        appendEntries.getPrevLogIndex());
+                LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+                        context.getId(), appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
@@ -121,8 +121,8 @@ public class Follower extends AbstractRaftActorBehavior {
             // prevLogIndex entry was not found in it's log
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
-                        appendEntries.getPrevLogIndex());
+                LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+                        context.getId(), appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
@@ -135,8 +135,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                        "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
-                        , prevLogTerm
+                        "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                        , context.getId(), prevLogTerm
                         , appendEntries.getPrevLogTerm());
             }
         } else {
@@ -147,9 +147,9 @@ public class Follower extends AbstractRaftActorBehavior {
             // We found that the log was out of sync so just send a negative
             // reply and return
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Follower ({}) is out-of-sync, " +
+                LOG.debug("{}: Follower ({}) is out-of-sync, " +
                         "so sending negative reply, lastIndex():{}, lastTerm():{}",
-                        context.getId(), lastIndex(), lastTerm()
+                        context.getId(), context.getId(), lastIndex(), lastTerm()
                 );
             }
             sender.tell(
@@ -162,9 +162,8 @@ public class Follower extends AbstractRaftActorBehavior {
         if (appendEntries.getEntries() != null
             && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "Number of entries to be appended = {}", appendEntries.getEntries().size()
-                );
+                LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+                        appendEntries.getEntries().size());
             }
 
             // 3. If an existing entry conflicts with a new one (same index
@@ -189,9 +188,8 @@ public class Follower extends AbstractRaftActorBehavior {
                     }
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug(
-                            "Removing entries from log starting at {}", matchEntry.getIndex()
-                        );
+                        LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+                                matchEntry.getIndex());
                     }
 
                     // Entries do not match so remove all subsequent entries
@@ -202,8 +200,8 @@ public class Follower extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
-                );
+                LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+                        (addEntriesFrom + lastIndex()));
             }
 
             // 4. Append any new entries not already in the log
@@ -211,13 +209,14 @@ public class Follower extends AbstractRaftActorBehavior {
                  i < appendEntries.getEntries().size(); i++) {
 
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+                    LOG.debug("{}: Append entry to log {}", context.getId(),
+                            appendEntries.getEntries().get(i).getData());
                 }
                 context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+                LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
             }
         }
 
@@ -232,7 +231,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (prevCommitIndex != context.getCommitIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Commit index set to {}", context.getCommitIndex());
+                LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
             }
         }
 
@@ -242,9 +241,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
             context.getLastApplied() < lastIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("applyLogToStateMachine, " +
+                LOG.debug("{}: applyLogToStateMachine, " +
                         "appendEntries.getLeaderCommit():{}," +
-                        "context.getLastApplied():{}, lastIndex():{}",
+                        "context.getLastApplied():{}, lastIndex():{}", context.getId(),
                     appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
                 );
             }
@@ -302,8 +301,8 @@ public class Follower extends AbstractRaftActorBehavior {
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("InstallSnapshot received by follower " +
-                    "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            LOG.debug("{}: InstallSnapshot received by follower " +
+                    "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
                 installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
             );
         }
@@ -339,8 +338,7 @@ public class Follower extends AbstractRaftActorBehavior {
             snapshotTracker = null;
 
         } catch (Exception e){
-
-            LOG.error(e, "Exception in InstallSnapshot of follower:");
+            LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     installSnapshot.getChunkIndex(), false), actor());
index ee3cc65dddb90815aecd50771c834d3ab461ecd4..fcfaee36033f3eba278f1f351d3c8cb3e974feb1 100644 (file)
@@ -57,8 +57,8 @@ public class Leader extends AbstractLeader {
 
         if (originalMessage instanceof IsolatedLeaderCheck) {
             if (isLeaderIsolated()) {
-                LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
-                    minIsolatedLeaderPeerCount, leaderId);
+                LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                        context.getId(), minIsolatedLeaderPeerCount, leaderId);
                 return switchBehavior(new IsolatedLeader(context));
             }
         }
index 9cd758ba30fdb94e85cd1703d99a8e0c55a50a17..3fc9c142c5e8b40daec717b3292fe1d52afcbf94 100644 (file)
@@ -101,8 +101,7 @@ public class Shard extends RaftActor {
     // The state of this Shard
     private final InMemoryDOMDataStore store;
 
-    private final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
 
     /// The name of this shard
     private final ShardIdentifier name;
@@ -148,7 +147,7 @@ public class Shard extends RaftActor {
         this.schemaContext = schemaContext;
         this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
 
-        LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
+        LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
@@ -166,7 +165,7 @@ public class Shard extends RaftActor {
         }
 
         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
-                datastoreContext.getShardTransactionCommitQueueCapacity());
+                datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
 
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
@@ -216,13 +215,13 @@ public class Shard extends RaftActor {
     @Override
     public void onReceiveRecover(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("onReceiveRecover: Received message {} from {}",
-                message.getClass().toString(),
-                getSender());
+            LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
+                message.getClass().toString(), getSender());
         }
 
         if (message instanceof RecoveryFailure){
-            LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+            LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
+                    persistenceId());
 
             // Even though recovery failed, we still need to finish our recovery, eg send the
             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
@@ -235,7 +234,7 @@ public class Shard extends RaftActor {
     @Override
     public void onReceiveCommand(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
+            LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
         }
 
         if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
@@ -275,8 +274,8 @@ public class Shard extends RaftActor {
         if(cohortEntry != null) {
             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
             if(elapsed > transactionCommitTimeout) {
-                LOG.warning("Current transaction {} has timed out after {} ms - aborting",
-                        cohortEntry.getTransactionID(), transactionCommitTimeout);
+                LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+                        persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
 
                 doAbortTransaction(cohortEntry.getTransactionID(), null);
             }
@@ -286,7 +285,7 @@ public class Shard extends RaftActor {
     private void handleCommitTransaction(final CommitTransaction commit) {
         final String transactionID = commit.getTransactionID();
 
-        LOG.debug("Committing transaction {}", transactionID);
+        LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
 
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
@@ -295,8 +294,8 @@ public class Shard extends RaftActor {
             // We're not the current Tx - the Tx was likely expired b/c it took too long in
             // between the canCommit and commit messages.
             IllegalStateException ex = new IllegalStateException(
-                    String.format("Cannot commit transaction %s - it is not the current transaction",
-                            transactionID));
+                    String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+                            persistenceId(), transactionID));
             LOG.error(ex.getMessage());
             shardMBean.incrementFailedTransactionsCount();
             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
@@ -323,8 +322,8 @@ public class Shard extends RaftActor {
                         new ModificationPayload(cohortEntry.getModification()));
             }
         } catch (InterruptedException | ExecutionException | IOException e) {
-            LOG.error(e, "An exception occurred while preCommitting transaction {}",
-                    cohortEntry.getTransactionID());
+            LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
+                    persistenceId(), cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -352,8 +351,8 @@ public class Shard extends RaftActor {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
                 IllegalStateException ex = new IllegalStateException(
-                        String.format("Could not finish committing transaction %s - no CohortEntry found",
-                                transactionID));
+                        String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
+                                persistenceId(), transactionID));
                 LOG.error(ex.getMessage());
                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
             }
@@ -361,7 +360,7 @@ public class Shard extends RaftActor {
             return;
         }
 
-        LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
             // We block on the future here so we don't have to worry about possibly accessing our
@@ -377,7 +376,7 @@ public class Shard extends RaftActor {
         } catch (InterruptedException | ExecutionException e) {
             sender.tell(new akka.actor.Status.Failure(e), getSelf());
 
-            LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+            LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
             shardMBean.incrementFailedTransactionsCount();
         }
 
@@ -385,13 +384,13 @@ public class Shard extends RaftActor {
     }
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
-        LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+        LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
     }
 
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
-                ready.getTxnClientVersion());
+        LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+                ready.getTransactionID(), ready.getTxnClientVersion());
 
         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
         // commitCoordinator in preparation for the subsequent three phase commit initiated by
@@ -406,7 +405,7 @@ public class Shard extends RaftActor {
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+            LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
         }
@@ -424,7 +423,7 @@ public class Shard extends RaftActor {
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
-            LOG.debug("Aborting transaction {}", transactionID);
+            LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
 
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
             // aborted during replication in which case we may still commit locally if replication
@@ -446,7 +445,7 @@ public class Shard extends RaftActor {
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.error(t, "An exception happened during abort");
+                    LOG.error(t, "{}: An exception happened during abort", persistenceId());
 
                     if(sender != null) {
                         sender.tell(new akka.actor.Status.Failure(t), self);
@@ -462,10 +461,10 @@ public class Shard extends RaftActor {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
-                "Could not find shard leader so transaction cannot be created. This typically happens" +
+            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+                "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
                 " when the system is coming up or recovering and a leader is being elected. Try again" +
-                " later.")), getSelf());
+                " later.", persistenceId()))), getSelf());
         }
     }
 
@@ -556,7 +555,7 @@ public class Shard extends RaftActor {
                 .build();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Creating transaction : {} ", transactionId);
+            LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
         }
 
         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
@@ -581,7 +580,7 @@ public class Shard extends RaftActor {
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
             shardMBean.incrementFailedTransactionsCount();
-            LOG.error(e, "Failed to commit");
+            LOG.error(e, "{}: Failed to commit", persistenceId());
         }
     }
 
@@ -598,14 +597,14 @@ public class Shard extends RaftActor {
 
     private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
 
-        LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+        LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
 
         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                      NormalizedNode<?, ?>>> registration;
         if(isLeader()) {
             registration = doChangeListenerRegistration(registerChangeListener);
         } else {
-            LOG.debug("Shard is not the leader - delaying registration");
+            LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
             DelayedListenerRegistration delayedReg =
                     new DelayedListenerRegistration(registerChangeListener);
@@ -616,8 +615,8 @@ public class Shard extends RaftActor {
         ActorRef listenerRegistration = getContext().actorOf(
                 DataChangeListenerRegistration.props(registration));
 
-        LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
-                    listenerRegistration.path());
+        LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+                persistenceId(), listenerRegistration.path());
 
         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
     }
@@ -641,7 +640,7 @@ public class Shard extends RaftActor {
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
                 new DataChangeListenerProxy(dataChangeListenerPath);
 
-        LOG.debug("Registering for path {}", registerChangeListener.getPath());
+        LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
 
         return store.registerChangeListener(registerChangeListener.getPath(), listener,
                 registerChangeListener.getScope());
@@ -658,7 +657,7 @@ public class Shard extends RaftActor {
         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+            LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
         }
     }
 
@@ -668,40 +667,42 @@ public class Shard extends RaftActor {
             try {
                 currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
-                LOG.error(e, "Error extracting ModificationPayload");
+                LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
             }
         } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else if (data instanceof CompositeModificationByteStringPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
-            LOG.error("Unknown state received {} during recovery", data);
+            LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
         }
     }
 
     @Override
     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
         if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+                    LOG, name.toString());
         }
 
         recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+            LOG.debug("{}: submitted recovery sbapshot", persistenceId());
         }
     }
 
     @Override
     protected void applyCurrentLogRecoveryBatch() {
         if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+                    LOG, name.toString());
         }
 
         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+            LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
                     currentLogRecoveryBatch.size());
         }
     }
@@ -712,7 +713,7 @@ public class Shard extends RaftActor {
             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+                LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
             }
 
             for(DOMStoreWriteTransaction tx: txList) {
@@ -721,7 +722,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementCommittedTransactionCount();
                 } catch (InterruptedException | ExecutionException e) {
                     shardMBean.incrementFailedTransactionsCount();
-                    LOG.error(e, "Failed to commit");
+                    LOG.error(e, "{}: Failed to commit", persistenceId());
                 }
             }
         }
@@ -751,7 +752,7 @@ public class Shard extends RaftActor {
             try {
                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
-                LOG.error(e, "Error extracting ModificationPayload");
+                LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
             }
         }
         else if (data instanceof CompositeModificationPayload) {
@@ -763,8 +764,8 @@ public class Shard extends RaftActor {
 
             applyModificationToState(clientActor, identifier, modification);
         } else {
-            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
-                    data, data.getClass().getClassLoader(),
+            LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+                    persistenceId(), data, data.getClass().getClassLoader(),
                     CompositeModificationPayload.class.getClassLoader());
         }
 
@@ -775,8 +776,8 @@ public class Shard extends RaftActor {
     private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
         if(modification == null) {
             LOG.error(
-                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    identifier, clientActor != null ? clientActor.path().toString() : null);
+                    "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
@@ -821,7 +822,7 @@ public class Shard extends RaftActor {
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
 
-        LOG.info("Applying snapshot");
+        LOG.info("{}: Applying snapshot", persistenceId());
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
@@ -834,9 +835,9 @@ public class Shard extends RaftActor {
             transaction.write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
         } catch (InterruptedException | ExecutionException e) {
-            LOG.error(e, "An exception occurred when applying snapshot");
+            LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
         } finally {
-            LOG.info("Done applying snapshot");
+            LOG.info("{}: Done applying snapshot", persistenceId());
         }
     }
 
@@ -865,8 +866,8 @@ public class Shard extends RaftActor {
             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
                 if(LOG.isDebugEnabled()) {
                     LOG.debug(
-                        "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
-                        entry.getKey(), getId());
+                        "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+                        persistenceId(), entry.getKey(), getId());
                 }
                 entry.getValue().close();
             }
index 19fa26682e2a4cea7b637fda85064a3aea0226e5..659acb745473af389480ff08e19cc68857add93a 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
+import akka.event.LoggingAdapter;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.util.LinkedList;
@@ -19,8 +20,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -29,8 +28,6 @@ import org.slf4j.LoggerFactory;
  */
 public class ShardCommitCoordinator {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
     private final Cache<String, CohortEntry> cohortCache;
 
     private CohortEntry currentCohortEntry;
@@ -39,11 +36,18 @@ public class ShardCommitCoordinator {
 
     private final int queueCapacity;
 
-    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+    private final LoggingAdapter log;
+
+    private final String name;
+
+    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+            String name) {
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
 
         this.queueCapacity = queueCapacity;
+        this.log = log;
+        this.name = name;
 
         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
         // since this should only be accessed on the shard's dispatcher.
@@ -74,9 +78,9 @@ public class ShardCommitCoordinator {
     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
             final ActorRef shard) {
         String transactionID = canCommit.getTransactionID();
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Processing canCommit for transaction {} for shard {}",
-                    transactionID, shard.path());
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Processing canCommit for transaction {} for shard {}",
+                    name, transactionID, shard.path());
         }
 
         // Lookup the cohort entry that was cached previously (or should have been) by
@@ -86,8 +90,8 @@ public class ShardCommitCoordinator {
             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
             // between canCommit and ready and the entry was expired from the cache.
             IllegalStateException ex = new IllegalStateException(
-                    String.format("No cohort entry found for transaction %s", transactionID));
-            LOG.error(ex.getMessage());
+                    String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+            log.error(ex.getMessage());
             sender.tell(new Status.Failure(ex), shard);
             return;
         }
@@ -98,8 +102,8 @@ public class ShardCommitCoordinator {
         if(currentCohortEntry != null) {
             // There's already a Tx commit in progress - attempt to queue this entry to be
             // committed after the current Tx completes.
-            LOG.debug("Transaction {} is already in progress - queueing transaction {}",
-                    currentCohortEntry.getTransactionID(), transactionID);
+            log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
+                    name, currentCohortEntry.getTransactionID(), transactionID);
 
             if(queuedCohortEntries.size() < queueCapacity) {
                 queuedCohortEntries.offer(cohortEntry);
@@ -107,10 +111,10 @@ public class ShardCommitCoordinator {
                 removeCohortEntry(transactionID);
 
                 RuntimeException ex = new RuntimeException(
-                        String.format("Could not enqueue transaction %s - the maximum commit queue"+
+                        String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
                                       " capacity %d has been reached.",
-                                transactionID, queueCapacity));
-                LOG.error(ex.getMessage());
+                                      name, transactionID, queueCapacity));
+                log.error(ex.getMessage());
                 sender.tell(new Status.Failure(ex), shard);
             }
         } else {
@@ -140,7 +144,7 @@ public class ShardCommitCoordinator {
                 removeCohortEntry(cohortEntry.getTransactionID());
             }
         } catch (InterruptedException | ExecutionException e) {
-            LOG.debug("An exception occurred during canCommit", e);
+            log.debug("{}: An exception occurred during canCommit: {}", name, e);
 
             // Remove the entry from the cache now since the Tx will be aborted.
             removeCohortEntry(cohortEntry.getTransactionID());
index 238b4e46dce041add47117503fcb68feb54e8e27..2a97036883b272d3f6757d9657417c0891567f7f 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.event.LoggingAdapter;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
@@ -21,8 +22,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
@@ -37,16 +36,19 @@ class ShardRecoveryCoordinator {
 
     private static final int TIME_OUT = 10;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
     private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
     private final SchemaContext schemaContext;
     private final String shardName;
     private final ExecutorService executor;
+    private final LoggingAdapter log;
+    private final String name;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+            String name) {
         this.schemaContext = schemaContext;
         this.shardName = shardName;
+        this.log = log;
+        this.name = name;
 
         executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
                 new ThreadFactoryBuilder().setDaemon(true)
@@ -85,7 +87,7 @@ class ShardRecoveryCoordinator {
             if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
                 return resultingTxList;
             } else {
-                LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+                log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
index 5053d47f84e6fff6f0ab8aa33521dc23fbf45e4d..03bae2d99dedec37619e63c8b2924c1c7acddc20 100644 (file)
@@ -20,6 +20,7 @@ public class ShardIdentifier {
     private final String shardName;
     private final String memberName;
     private final String type;
+    private final String fullName;
 
     public ShardIdentifier(String shardName, String memberName, String type) {
 
@@ -30,6 +31,9 @@ public class ShardIdentifier {
         this.shardName = shardName;
         this.memberName = memberName;
         this.type = type;
+
+        fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-")
+                .append(type).toString();
     }
 
     @Override
@@ -64,14 +68,10 @@ public class ShardIdentifier {
         return result;
     }
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
         //ensure the output of toString matches the pattern above
-        return new StringBuilder(memberName)
-                    .append("-shard-")
-                    .append(shardName)
-                    .append("-")
-                    .append(type)
-                    .toString();
+        return fullName;
     }
 
     public static Builder builder(){