This commit is mostly about conditional logging.
It also sneaks in a couple of other things,
1. Cleanup of ClientRequestTracker
2. Removal of some dead code
Change-Id: I0862f1273e94856e19107d3a4beec4d66452787d
Signed-off-by: Moiz Raja <moraja@cisco.com>
Signed-off-by: Harman Singh <harmasin@cisco.com>
}
} else if (message instanceof PrintState) {
- LOG.debug("State of the node:{} has entries={}, {}",
- getId(), state.size(), getReplicatedLogState());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("State of the node:{} has entries={}, {}",
+ getId(), state.size(), getReplicatedLogState());
+ }
} else if (message instanceof PrintRole) {
- LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ }
} else {
super.onReceiveCommand(message);
} catch (Exception e) {
LOG.error("Exception in applying snapshot", e);
}
- LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+ }
}
private ByteString fromObject(Object snapshot) throws Exception {
} else if (message instanceof UpdateElectionTerm) {
context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
} else if (message instanceof RecoveryCompleted) {
- LOG.debug(
- "RecoveryCompleted - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "RecoveryCompleted - Switching actor to Follower - " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size()
+ );
+ }
currentBehavior = switchBehavior(RaftState.Follower);
onStateChanged();
}
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
- applyState.getReplicatedLogEntry().getData());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Applying state for log index {} data {}",
+ applyState.getReplicatedLogEntry().getIndex(),
+ applyState.getReplicatedLogEntry().getData());
+ }
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ snapshot.getLastAppliedTerm()
+ );
+ }
applySnapshot(ByteString.copyFrom(snapshot.getState()));
//clears the followers log, sets the snapshot index to ensure adjusted-index works
} else {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
- LOG.debug("onReceiveCommand: message:" + message.getClass());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveCommand: message:" + message.getClass());
+ }
}
RaftState state =
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
- LOG.debug("Persist data {}", replicatedLogEntry);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Persist data {}", replicatedLogEntry);
+ }
replicatedLog
.appendAndPersist(clientActor, identifier, replicatedLogEntry);
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
- + peerAddress);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
+ }
return peerAddress;
}
lastAppliedTerm = lastAppliedEntry.getTerm();
}
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot Capture logSize: {}", journal.size());
+ LOG.debug("Snapshot Capture lastApplied:{} ",
+ context.getLastApplied());
+ LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+ LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ }
// send a CaptureSnapshot to self to make the expensive operation async.
getSelf().tell(new CaptureSnapshot(
}
@Override public void update(long currentTerm, String votedFor) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ }
this.currentTerm = currentTerm;
this.votedFor = votedFor;
}
return null;
}
+ /**
+ * Find the client request tracker for a specific logIndex
+ *
+ * @param logIndex
+ * @return
+ */
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+
/**
* Find the log index from the previous to last entry in the log
*
i < index + 1; i++) {
ActorRef clientActor = null;
String identifier = null;
- ClientRequestTracker tracker = findClientRequestTracker(i);
+ ClientRequestTracker tracker = removeClientRequestTracker(i);
if (tracker != null) {
clientActor = tracker.getClientActor();
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.event.LoggingAdapter;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
public class Follower extends AbstractRaftActorBehavior {
private ByteString snapshotChunksCollected = ByteString.EMPTY;
+ private final LoggingAdapter LOG;
+
public Follower(RaftActorContext context) {
super(context);
+ LOG = context.getLogger();
+
scheduleElection(electionDuration());
}
AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
- context.getLogger()
- .debug(appendEntries.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
}
// TODO : Refactor this method into a bunch of smaller methods
// an entry at prevLogIndex and this follower has no entries in
// it's log.
- context.getLogger().debug(
- "The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
+ appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
&& appendEntries.getPrevLogIndex() != -1
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- context.getLogger().debug(
- "The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
+ appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
&& previousEntry != null
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- context.getLogger().debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , previousEntry.getTerm()
- , appendEntries.getPrevLogTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , previousEntry.getTerm()
+ , appendEntries.getPrevLogTerm());
+ }
} else {
outOfSync = false;
}
if (outOfSync) {
// We found that the log was out of sync so just send a negative
// reply and return
- context.getLogger().debug("Follower is out-of-sync, " +
- "so sending negative reply, lastIndex():{}, lastTerm():{}",
- lastIndex(), lastTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Follower is out-of-sync, " +
+ "so sending negative reply, lastIndex():{}, lastTerm():{}",
+ lastIndex(), lastTerm()
+ );
+ }
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
- context.getLogger().debug(
- "Number of entries to be appended = " + appendEntries
- .getEntries().size()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Number of entries to be appended = " + appendEntries
+ .getEntries().size()
+ );
+ }
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
continue;
}
- context.getLogger().debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removing entries from log starting at "
+ + matchEntry.getIndex()
+ );
+ }
// Entries do not match so remove all subsequent entries
context.getReplicatedLog()
}
}
- context.getLogger().debug(
- "After cleanup entries to be added from = " + (addEntriesFrom
- + lastIndex())
- );
+ if(LOG.isDebugEnabled()) {
+ context.getLogger().debug(
+ "After cleanup entries to be added from = " + (addEntriesFrom
+ + lastIndex())
+ );
+ }
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom;
.appendAndPersist(appendEntries.getEntries().get(i));
}
- context.getLogger().debug(
- "Log size is now " + context.getReplicatedLog().size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Log size is now " + context.getReplicatedLog().size());
+ }
}
context.getReplicatedLog().lastIndex()));
if (prevCommitIndex != context.getCommitIndex()) {
- context.getLogger()
- .debug("Commit index set to " + context.getCommitIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Commit index set to " + context.getCommitIndex());
+ }
}
// If commitIndex > lastApplied: increment lastApplied, apply
// check if there are any entries to be applied. last-applied can be equal to last-index
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
- context.getLogger().debug("applyLogToStateMachine, " +
- "appendEntries.getLeaderCommit():{}," +
- "context.getLastApplied():{}, lastIndex():{}",
- appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("applyLogToStateMachine, " +
+ "appendEntries.getLeaderCommit():{}," +
+ "context.getLastApplied():{}, lastIndex():{}",
+ appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
+ );
+ }
+
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
}
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
- context.getLogger().debug("InstallSnapshot received by follower " +
- "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
+ );
+ }
try {
if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
} else {
// we have more to go
snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
- installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
+ installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+ }
}
sender.tell(new InstallSnapshotReply(
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
+import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
private final int minReplicationCount;
+ private final LoggingAdapter LOG;
+
public Leader(RaftActorContext context) {
super(context);
+ LOG = context.getLogger();
+
if (lastIndex() >= 0) {
context.setCommitIndex(lastIndex());
}
followerToLog.put(followerId, followerLogInformation);
}
- context.getLogger().debug("Election:Leader has following peers:"+ followers);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Election:Leader has following peers:" + followers);
+ }
if (followers.size() > 0) {
minReplicationCount = (followers.size() + 1) / 2 + 1;
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
- context.getLogger().debug(appendEntries.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
return state();
}
AppendEntriesReply appendEntriesReply) {
if(! appendEntriesReply.isSuccess()) {
- context.getLogger()
- .debug(appendEntriesReply.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntriesReply.toString());
+ }
}
// Update the FollowerLogInformation
followerToLog.get(followerId);
if(followerLogInformation == null){
- context.getLogger().error("Unknown follower {}", followerId);
+ LOG.error("Unknown follower {}", followerId);
return state();
}
return state();
}
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+
+ ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
+ if(toRemove != null) {
+ trackerList.remove(toRemove);
+ }
+
+ return toRemove;
+ }
+
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
for (ClientRequestTracker tracker : trackerList) {
if (tracker.getIndex() == logIndex) {
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- context.getLogger().debug("InstallSnapshotReply received, " +
- "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InstallSnapshotReply received, " +
+ "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+ reply.getChunkIndex(), followerId,
+ context.getReplicatedLog().getSnapshotIndex() + 1
+ );
+ }
FollowerLogInformation followerLogInformation =
followerToLog.get(followerId);
followerLogInformation.setNextIndex(
context.getReplicatedLog().getSnapshotIndex() + 1);
mapFollowerToSnapshot.remove(followerId);
- context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
+ followerToLog.get(followerId).getNextIndex().get());
+ }
} else {
followerToSnapshot.markSendStatus(true);
}
} else {
- context.getLogger().info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex());
+ LOG.info("InstallSnapshotReply received, " +
+ "sending snapshot chunk failed, Will retry, Chunk:{}",
+ reply.getChunkIndex()
+ );
followerToSnapshot.markSendStatus(false);
}
} else {
- context.getLogger().error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+ LOG.error("ERROR!!" +
+ "FollowerId in InstallSnapshotReply not known to Leader" +
+ " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+ followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+ );
}
}
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- context.getLogger().debug("Replicate message " + logIndex);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Replicate message " + logIndex);
+ }
// Create a tracker entry we will use this later to notify the
// client actor
if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
// if the follower is just not starting and leader's index
// is more than followers index
- context.getLogger().debug("SendInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("SendInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+ );
+ }
actor().tell(new SendInstallSnapshot(), actor());
} else {
).toSerializable(),
actor()
);
- context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
mapFollowerToSnapshot.get(followerId).getTotalChunks());
} catch (IOException e) {
- context.getLogger().error("InstallSnapshot failed for Leader.", e);
+ LOG.error("InstallSnapshot failed for Leader.", e);
}
}
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
- context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ }
return nextChunk;
}
int size = snapshotBytes.size();
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot {} bytes, total chunks to send:{}",
+ size, totalChunks);
+ }
}
public ByteString getSnapshotBytes() {
}
}
- context.getLogger().debug("length={}, offset={},size={}",
- snapshotLength, start, size);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("length={}, offset={},size={}",
+ snapshotLength, start, size);
+ }
return getSnapshotBytes().substring(start, start + size);
}
Logging.getLogger(getContext().system(), this);
public AbstractUntypedActor() {
- LOG.debug("Actor created {}", getSelf());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor created {}", getSelf());
+ }
getContext().
system().
actorSelection("user/termination-monitor").
@Override public void onReceive(Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
- LOG.debug("Received message {}", messageType);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received message {}", messageType);
+ }
handleReceive(message);
-
- LOG.debug("Done handling message {}", messageType);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Done handling message {}", messageType);
+ }
}
protected abstract void handleReceive(Object message) throws Exception;
}
protected void unknownMessage(Object message) throws Exception {
- LOG.debug("Received unhandled message {}", message);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received unhandled message {}", message);
+ }
unhandled(message);
}
}
for (Entry<URI, String> e: prefixes.getPrefixes()) {
writer.writeNamespace(e.getValue(), e.getKey().toString());
}
- LOG.debug("Instance identifier with Random prefix is now {}", str);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Instance identifier with Random prefix is now {}", str);
+ }
writer.writeCharacters(str);
}
DataSchemaNode childSchema = null;
if (schema instanceof DataNodeContainer) {
childSchema = SchemaUtils.findFirstSchema(child.getNodeType(), ((DataNodeContainer) schema).getChildNodes()).orNull();
- if (childSchema == null) {
+ if (childSchema == null && LOG.isDebugEnabled()) {
LOG.debug("Probably the data node \"{}\" does not conform to schema", child == null ? "" : child.getNodeType().getLocalName());
}
}
*/
public void writeValue(final @Nonnull XMLStreamWriter writer, final @Nonnull TypeDefinition<?> type, final Object value) throws XMLStreamException {
if (value == null) {
- LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName());
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName());
+ }
return;
}
writer.writeNamespace(prefix, qname.getNamespace().toString());
writer.writeCharacters(prefix + ':' + qname.getLocalName());
} else {
- LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+ }
writer.writeCharacters(String.valueOf(value));
}
}
private static void write(final @Nonnull XMLStreamWriter writer, final @Nonnull InstanceIdentifierTypeDefinition type, final @Nonnull Object value) throws XMLStreamException {
if (value instanceof YangInstanceIdentifier) {
- LOG.debug("Writing InstanceIdentifier object {}", value);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Writing InstanceIdentifier object {}", value);
+ }
write(writer, (YangInstanceIdentifier)value);
} else {
- LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
- writer.writeCharacters(String.valueOf(value));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+ }
+ writer.writeCharacters(String.valueOf(value));
}
}
}
* @return xml String
*/
public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
- LOG.debug("Converting input composite node to xml {}", cNode);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converting input composite node to xml {}", cNode);
+ }
if (cNode == null) {
return BLANK;
}
Set<RpcDefinition> rpcs = schemaContext.getOperations();
for(RpcDefinition rpc : rpcs) {
if(rpc.getQName().equals(cNode.getNodeType())){
- LOG.debug("Found the rpc definition from schema context matching with input composite node {}", rpc.getQName());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Found the rpc definition from schema context matching with input composite node {}", rpc.getQName());
+ }
CompositeNode inputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "input"));
domTree = XmlDocumentUtils.toDocument(inputContainer, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider());
-
- LOG.debug("input composite node to document conversion complete, document is {}", domTree);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("input composite node to document conversion complete, document is {}", domTree);
+ }
break;
}
}
* @return xml string
*/
public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
- LOG.debug("Converting output composite node to xml {}", cNode);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converting output composite node to xml {}", cNode);
+ }
if (cNode == null) {
return BLANK;
}
Set<RpcDefinition> rpcs = schemaContext.getOperations();
for(RpcDefinition rpc : rpcs) {
if(rpc.getQName().equals(cNode.getNodeType())){
- LOG.debug("Found the rpc definition from schema context matching with output composite node {}", rpc.getQName());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Found the rpc definition from schema context matching with output composite node {}", rpc.getQName());
+ }
CompositeNode outputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "output"));
domTree = XmlDocumentUtils.toDocument(outputContainer, rpc.getOutput(), XmlDocumentUtils.defaultValueCodecProvider());
-
- LOG.debug("output composite node to document conversion complete, document is {}", domTree);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("output composite node to document conversion complete, document is {}", domTree);
+ }
break;
}
}
LOG.error("Error during translation of Document to OutputStream", e);
}
- LOG.debug("Document to string conversion complete, xml string is {} ", writer.toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Document to string conversion complete, xml string is {} ", writer.toString());
+ }
return writer.toString();
}
* @return CompositeNode object based on the input, if any of the input parameter is null, a null object is returned
*/
public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml, SchemaContext schemaContext){
- LOG.debug("Converting input xml to composite node {}", xml);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converting input xml to composite node {}", xml);
+ }
if (xml==null || xml.length()==0) {
return null;
}
Set<RpcDefinition> rpcs = schemaContext.getOperations();
for(RpcDefinition rpcDef : rpcs) {
if(rpcDef.getQName().equals(rpc)){
- LOG.debug("found the rpc definition from schema context matching rpc {}", rpc);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("found the rpc definition from schema context matching rpc {}", rpc);
+ }
if(rpcDef.getInput() == null) {
LOG.warn("found rpc definition's input is null");
return null;
List<Node<?>> dataNodes = XmlDocumentUtils.toDomNodes(xmlData,
Optional.of(rpcDef.getInput().getChildNodes()), schemaContext);
-
- LOG.debug("Converted xml input to list of nodes {}", dataNodes);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converted xml input to list of nodes {}", dataNodes);
+ }
final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
it.setQName(rpc);
it.add(ImmutableCompositeNode.create(input, dataNodes));
} catch (IOException e) {
LOG.error("Error during building data tree from XML", e);
}
-
- LOG.debug("Xml to composite node conversion complete {} ", compositeNode);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Xml to composite node conversion complete {} ", compositeNode);
+ }
return compositeNode;
}
Preconditions.checkNotNull(path, "path should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
-
- LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+ }
ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
DataChangeListener.props(listener ));
}, actorContext.getActorSystem().dispatcher());
return listenerRegistrationProxy;
}
-
- LOG.debug(
- "No local shard for shardName {} was found so returning a noop registration",
- shardName);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "No local shard for shardName {} was found so returning a noop registration",
+ shardName);
+ }
return new NoOpDataChangeListenerRegistration(listener);
}
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
}
@Override public void onReceiveRecover(Object message) {
- LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
- getSender());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveRecover: Received message {} from {}",
+ message.getClass().toString(),
+ getSender());
+ }
if (message instanceof RecoveryFailure){
LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
}
@Override public void onReceiveCommand(Object message) {
- LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
- getSender());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveCommand: Received message {} from {}",
+ message.getClass().toString(),
+ getSender());
+ }
if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
// This must be for install snapshot. Don't want to open this up and trigger
ShardTransactionIdentifier.builder()
.remoteTransactionId(remoteTransactionId)
.build();
- LOG.debug("Creating transaction : {} ", transactionId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Creating transaction : {} ", transactionId);
+ }
ActorRef transactionActor =
createTypedTransactionActor(transactionType, transactionId, transactionChainId);
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
- LOG.debug(
- "Could not find cohort for modification : {}. Writing modification using a new transaction",
- modification);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Could not find cohort for modification : {}. Writing modification using a new transaction",
+ modification);
+ }
+
DOMStoreWriteTransaction transaction =
store.newWriteOnlyTransaction();
- LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+ }
modification.apply(transaction);
try {
return;
}
- final ListenableFuture<Void> future = cohort.commit();
- final ActorRef self = getSelf();
+ ListenableFuture<Void> future = cohort.commit();
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void v) {
- sender.tell(new CommitTransactionReply().toSerializable(), self);
+ sender.tell(new CommitTransactionReply().toSerializable(), getSelf());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
}
public void onFailure(Throwable t) {
LOG.error(t, "An exception happened during commit");
shardMBean.incrementFailedTransactionsCount();
- sender.tell(new akka.actor.Status.Failure(t), self);
+ sender.tell(new akka.actor.Status.Failure(t), getSelf());
}
});
private void registerChangeListener(
RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener
- .getPath());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("registerDataChangeListener for {}", registerChangeListener
+ .getPath());
+ }
ActorSelection dataChangeListenerPath = getContext()
getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug(
- "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
- , listenerRegistration.path().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
+ , listenerRegistration.path().toString());
+ }
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
getSelf());
}
- private void createTransactionChain() {
- DOMStoreTransactionChain chain = store.createTransactionChain();
- ActorRef transactionChain = getContext().actorOf(
- ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
- getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
- getSelf());
- }
-
private boolean isMetricsCaptureEnabled(){
CommonConfig config = new CommonConfig(getContext().system().settings().config());
return config.isMetricCaptureEnabled();
.tell(new EnableNotification(isLeader()), getSelf());
}
-
shardMBean.setRaftState(getRaftState().name());
shardMBean.setCurrentTerm(getCurrentTerm());
// If this actor is no longer the leader close all the transaction chains
if(!isLeader()){
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
- LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ entry.getKey(), getId());
+ }
entry.getValue().close();
}
peerAddress);
if(peerAddresses.containsKey(peerId)){
peerAddresses.put(peerId, peerAddress);
-
- LOG.debug(
- "Sending PeerAddressResolved for peer {} with address {} to {}",
- peerId, peerAddress, actor.path());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
+ }
actor
.tell(new PeerAddressResolved(peerId, peerAddress),
getSelf());
getSender().tell(new GetCompositeModificationReply(
new ImmutableCompositeModification(modification)), getSelf());
} else if (message instanceof ReceiveTimeout) {
- LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+ }
closeTransaction(false);
} else {
throw new UnknownMessageException(message);
protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
modification.addModification(
new WriteModification(message.getPath(), message.getData(),schemaContext));
- LOG.debug("writeData at path : " + message.getPath().toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("writeData at path : " + message.getPath().toString());
+ }
try {
transaction.write(message.getPath(), message.getData());
getSender().tell(new WriteDataReply().toSerializable(), getSelf());
protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
modification.addModification(
new MergeModification(message.getPath(), message.getData(), schemaContext));
- LOG.debug("mergeData at path : " + message.getPath().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("mergeData at path : " + message.getPath().toString());
+ }
try {
transaction.merge(message.getPath(), message.getData());
getSender().tell(new MergeDataReply().toSerializable(), getSelf());
}
protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
- LOG.debug("deleteData at path : " + message.getPath().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("deleteData at path : " + message.getPath().toString());
+ }
modification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
@Override public void onReceive(Object message) throws Exception {
if(message instanceof Terminated){
Terminated terminated = (Terminated) message;
- LOG.debug("Actor terminated : {}", terminated.actor());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ }
} else if(message instanceof Monitor){
Monitor monitor = (Monitor) message;
getContext().watch(monitor.getActorRef());
private void commit(CommitTransaction message) {
// Forward the commit to the shard
- log.debug("Forward commit transaction to Shard {} ", shardActor);
+ if(log.isDebugEnabled()) {
+ log.debug("Forward commit transaction to Shard {} ", shardActor);
+ }
shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
getContext());
@Override
public Void apply(Iterable<ActorPath> paths) {
cohortPaths = Lists.newArrayList(paths);
-
- LOG.debug("Tx {} successfully built cohort path list: {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} successfully built cohort path list: {}",
transactionId, cohortPaths);
+ }
return null;
}
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
@Override
public ListenableFuture<Boolean> canCommit() {
- LOG.debug("Tx {} canCommit", transactionId);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} canCommit", transactionId);
+ }
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
// The first phase of canCommit is to gather the list of cohort actor paths that will
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ }
returnFuture.setException(failure);
} else {
finishCanCommit(returnFuture);
}
private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
-
- LOG.debug("Tx {} finishCanCommit", transactionId);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finishCanCommit", transactionId);
+ }
// The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
// their canCommit processing. If any one fails then we'll fail canCommit.
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
+ }
returnFuture.setException(failure);
return;
}
return;
}
}
-
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+ }
returnFuture.set(Boolean.valueOf(result));
}
}, actorContext.getActorSystem().dispatcher());
private Future<Iterable<Object>> invokeCohorts(Object message) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
for(ActorPath actorPath : cohortPaths) {
-
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
+ }
ActorSelection cohort = actorContext.actorSelection(actorPath);
futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
- LOG.debug("Tx {} {}", transactionId, operationName);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} {}", transactionId, operationName);
+ }
final SettableFuture<Void> returnFuture = SettableFuture.create();
// The cohort actor list should already be built at this point by the canCommit phase but,
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
operationName, failure);
-
+ }
if(propagateException) {
returnFuture.setException(failure);
} else {
private void finishVoidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException,
final SettableFuture<Void> returnFuture) {
-
- LOG.debug("Tx {} finish {}", transactionId, operationName);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
+ }
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
}
if(exceptionToPropagate != null) {
- LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
operationName, exceptionToPropagate);
-
+ }
if(propagateException) {
// We don't log the exception here to avoid redundant logging since we're
// propagating to the caller in MD-SAL core who will log it.
// Since the caller doesn't want us to propagate the exception we'll also
// not log it normally. But it's usually not good to totally silence
// exceptions so we'll log it to debug level.
- LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
exceptionToPropagate);
+ }
returnFuture.set(null);
}
} else {
- LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
+ }
returnFuture.set(null);
}
}
new TransactionProxyCleanupPhantomReference(this);
phantomReferenceCache.put(cleanup, cleanup);
}
-
- LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ }
}
@Override
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Read operation on write-only transaction is not allowed");
- LOG.debug("Tx {} read {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} read {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
return transactionContext(path).readData(path);
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
- LOG.debug("Tx {} exists {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} exists {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
return transactionContext(path).dataExists(path);
checkModificationState();
- LOG.debug("Tx {} write {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} write {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
transactionContext(path).writeData(path, data);
checkModificationState();
- LOG.debug("Tx {} merge {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} merge {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
transactionContext(path).mergeData(path, data);
public void delete(YangInstanceIdentifier path) {
checkModificationState();
-
- LOG.debug("Tx {} delete {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} delete {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
transactionContext(path).deleteData(path);
inReadyState = true;
- LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
-
+ }
List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- LOG.debug("Tx {} Readying transaction for shard {}", identifier,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
-
+ }
cohortPathFutures.add(transactionContext.readyTransaction());
}
String transactionPath = reply.getTransactionPath();
- LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
+ }
ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
if (transactionType == TransactionType.READ_ONLY) {
"Invalid reply type {} for CreateTransaction", response.getClass()));
}
} catch (Exception e) {
- LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ }
remoteTransactionPaths
.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
}
@Override
public void closeTransaction() {
- LOG.debug("Tx {} closeTransaction called", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} closeTransaction called", identifier);
+ }
actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
public Future<ActorPath> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
-
+ }
// Send the ReadyTransaction message to the Tx actor.
final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
@Override
public ActorPath apply(Iterable<Object> notUsed) {
-
- LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
-
+ }
// At this point all the Futures succeeded and we need to extract the cohort
// actor path from the ReadyTransactionReply. For the recorded operations, they
// don't return any data so we're only interested that they completed
String resolvedCohortPath = getResolvedCohortPath(
reply.getCohortPath().toString());
- LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
identifier, resolvedCohortPath);
-
+ }
return actorContext.actorFor(resolvedCohortPath);
} else {
// Throwing an exception here will fail the Future.
@Override
public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ }
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
new DeleteData(path).toSerializable() ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ }
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
new MergeData(path, data, schemaContext).toSerializable()));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ }
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
new WriteData(path, data, schemaContext).toSerializable()));
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
+ }
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
// If there were any previous recorded put/merge/delete operation reply Futures then we
if(recordedOperationFutures.isEmpty()) {
finishReadData(path, returnFuture);
} else {
- LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData: verifying {} previous recorded operations",
identifier, recordedOperationFutures.size());
-
+ }
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
public void onComplete(Throwable failure, Iterable<Object> notUsed)
throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} readData: a recorded operation failed: {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData: a recorded operation failed: {}",
identifier, failure);
-
+ }
returnFuture.setException(new ReadFailedException(
"The read could not be performed because a previous put, merge,"
+ "or delete operation failed", failure));
private void finishReadData(final YangInstanceIdentifier path,
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
- LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+ }
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object readResponse) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} read operation failed: {}", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+ }
returnFuture.setException(new ReadFailedException(
"Error reading data for path " + path, failure));
} else {
- LOG.debug("Tx {} read operation succeeded", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} read operation succeeded", identifier, failure);
+ }
if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
path, readResponse);
public CheckedFuture<Boolean, ReadFailedException> dataExists(
final YangInstanceIdentifier path) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ }
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
// If there were any previous recorded put/merge/delete operation reply Futures then we
if(recordedOperationFutures.isEmpty()) {
finishDataExists(path, returnFuture);
} else {
- LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
identifier, recordedOperationFutures.size());
-
+ }
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
public void onComplete(Throwable failure, Iterable<Object> notUsed)
throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
identifier, failure);
-
+ }
returnFuture.setException(new ReadFailedException(
"The data exists could not be performed because a previous "
+ "put, merge, or delete operation failed", failure));
private void finishDataExists(final YangInstanceIdentifier path,
final SettableFuture<Boolean> returnFuture) {
- LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+ }
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+ }
returnFuture.setException(new ReadFailedException(
"Error checking data exists for path " + path, failure));
} else {
- LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+ }
if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
returnFuture.set(Boolean.valueOf(DataExistsReply.
fromSerializable(response).exists()));
@Override
public void closeTransaction() {
- LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+ }
}
@Override
public Future<ActorPath> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction called", identifier);
+ }
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ }
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ }
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ }
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
+ }
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
@Override
public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ }
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
- LOG.debug("Local shard found {}", found.getPath());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Local shard found {}", found.getPath());
+ }
return found.getPath();
}
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
- LOG.debug("Primary found {}", found.getPrimaryPath());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Primary found {}", found.getPrimaryPath());
+ }
return found.getPrimaryPath();
}
throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
*/
public Object executeRemoteOperation(ActorSelection actor, Object message) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
- actor.toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
+ actor.toString());
+ }
Future<Object> future = ask(actor, message, operationTimeout);
try {
*/
public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+ }
return ask(actor, message, operationTimeout);
}
Thread.currentThread().getContextClassLoader());
Config actorSystemConfig = config.get();
- LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
+ }
if (config.isMetricCaptureEnabled()) {
LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
config.getActorSystemName());
* @param announcements
*/
private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
- LOG.debug("Announcing [{}]", announcements);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Announcing [{}]", announcements);
+ }
RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
}
* @param removals
*/
private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
- LOG.debug("Removing [{}]", removals);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Removing [{}]", removals);
+ }
RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
}
}
private void invokeRemoteRpc(final InvokeRpc msg) {
- LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+ }
RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
null, msg.getRpc(), msg.getIdentifier());
RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
}
private void executeRpc(final ExecuteRpc msg) {
- LOG.debug("Executing rpc {}", msg.getRpc());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Executing rpc {}", msg.getRpc());
+ }
Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
schemaContext));
@Override
public void onRpcImplementationAdded(QName rpc) {
- LOG.debug("Adding registration for [{}]", rpc);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Adding registration for [{}]", rpc);
+ }
RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
routeIds.add(routeId);
@Override
public void onRpcImplementationRemoved(QName rpc) {
- LOG.debug("Removing registration for [{}]", rpc);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Removing registration for [{}]", rpc);
+ }
RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
routeIds.add(routeId);
@Override public void onReceive(Object message) throws Exception {
if(message instanceof Terminated){
Terminated terminated = (Terminated) message;
- LOG.debug("Actor terminated : {}", terminated.actor());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ }
}else if(message instanceof Monitor){
Monitor monitor = (Monitor) message;
getContext().watch(monitor.getActorRef());
receiveUpdateRemoteBuckets(
((UpdateRemoteBuckets) message).getBuckets());
} else {
- log.debug("Unhandled message [{}]", message);
+ if(log.isDebugEnabled()) {
+ log.debug("Unhandled message [{}]", message);
+ }
unhandled(message);
}
}
versions.put(entry.getKey(), remoteVersion);
}
}
-
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ if(log.isDebugEnabled()) {
+ log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ }
}
///
}
clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
if (!clusterMembers.contains(member.address()))
clusterMembers.add(member.address());
-
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
-
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ if(log.isDebugEnabled()) {
+ log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ }
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if(log.isDebugEnabled()) {
+ log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ }
return;
}
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- log.debug("Sending bucket versions to [{}]", remoteRef);
+ if(log.isDebugEnabled()) {
+ log.debug("Sending bucket versions to [{}]", remoteRef);
+ }
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ if(log.isDebugEnabled()) {
+ log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ }
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}