From: Tony Tkacik Date: Tue, 16 Sep 2014 11:17:39 +0000 (+0000) Subject: Merge "Bug 1875 - Used variables for nexusproxy host, externalized versions" X-Git-Tag: release/helium~64 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=65292f0c7ee04222ccaaa9c9dcf9ece4438dc96e;hp=7ccaaa0a8a5739859b6834db2d6965b3680fbca0 Merge "Bug 1875 - Used variables for nexusproxy host, externalized versions" --- diff --git a/features/mdsal/src/main/resources/features.xml b/features/mdsal/src/main/resources/features.xml index da246b63e3..b6091ac161 100644 --- a/features/mdsal/src/main/resources/features.xml +++ b/features/mdsal/src/main/resources/features.xml @@ -35,6 +35,9 @@ odl-mdsal-broker war + + mvn:org.opendaylight.controller/sal-remote/${project.version} mvn:org.opendaylight.controller/sal-rest-connector/${project.version} mvn:com.google.code.gson/gson/${gson.version} mvn:org.opendaylight.yangtools/yang-data-codec-gson/${yangtools.version} @@ -47,7 +50,6 @@ mvn:io.netty/netty-common/${netty.version} mvn:io.netty/netty-handler/${netty.version} mvn:io.netty/netty-transport/${netty.version} - mvn:org.opendaylight.controller/sal-remote/${project.version} mvn:org.opendaylight.controller/sal-rest-connector-config/${mdsal.version}/xml/config diff --git a/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/FlowForwarder.java b/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/FlowForwarder.java index e0c16a0806..9951bf7448 100644 --- a/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/FlowForwarder.java +++ b/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/FlowForwarder.java @@ -77,7 +77,7 @@ public class FlowForwarder extends AbstractListeningCommiter { if (tableIdValidationPrecondition(tableKey, removeDataObj)) { final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj); builder.setFlowRef(new FlowRef(identifier)); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); this.provider.getSalFlowService().removeFlow(builder.build()); @@ -93,7 +93,7 @@ public class FlowForwarder extends AbstractListeningCommiter { if (tableIdValidationPrecondition(tableKey, update)) { final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder(); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setFlowRef(new FlowRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); builder.setUpdatedFlow((new UpdatedFlowBuilder(update)).build()); @@ -112,7 +112,7 @@ public class FlowForwarder extends AbstractListeningCommiter { if (tableIdValidationPrecondition(tableKey, addDataObj)) { final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setFlowRef(new FlowRef(identifier)); builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); diff --git a/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/GroupForwarder.java b/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/GroupForwarder.java index 72e35ce8db..1b2c532323 100644 --- a/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/GroupForwarder.java +++ b/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/GroupForwarder.java @@ -78,7 +78,7 @@ public class GroupForwarder extends AbstractListeningCommiter { final Group group = (removeDataObj); final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setGroupRef(new GroupRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); this.provider.getSalGroupService().removeGroup(builder.build()); @@ -93,7 +93,7 @@ public class GroupForwarder extends AbstractListeningCommiter { final Group updatedGroup = (update); final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder(); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setGroupRef(new GroupRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); builder.setUpdatedGroup((new UpdatedGroupBuilder(updatedGroup)).build()); @@ -109,7 +109,7 @@ public class GroupForwarder extends AbstractListeningCommiter { final Group group = (addDataObj); final AddGroupInputBuilder builder = new AddGroupInputBuilder(group); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setGroupRef(new GroupRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); this.provider.getSalGroupService().addGroup(builder.build()); diff --git a/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/MeterForwarder.java b/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/MeterForwarder.java index 8a805b0297..2f3de2a171 100644 --- a/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/MeterForwarder.java +++ b/opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/MeterForwarder.java @@ -77,7 +77,7 @@ public class MeterForwarder extends AbstractListeningCommiter { final RemoveMeterInputBuilder builder = new RemoveMeterInputBuilder(removeDataObj); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setMeterRef(new MeterRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); this.provider.getSalMeterService().removeMeter(builder.build()); @@ -90,7 +90,7 @@ public class MeterForwarder extends AbstractListeningCommiter { final UpdateMeterInputBuilder builder = new UpdateMeterInputBuilder(); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setMeterRef(new MeterRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); builder.setUpdatedMeter((new UpdatedMeterBuilder(update)).build()); @@ -105,7 +105,7 @@ public class MeterForwarder extends AbstractListeningCommiter { final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj); - builder.setNode(new NodeRef(nodeIdent)); + builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setMeterRef(new MeterRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); this.provider.getSalMeterService().addMeter(builder.build()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index c4ff108611..3bfdf732cf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -67,11 +67,15 @@ public class ExampleActor extends RaftActor { } } else if (message instanceof PrintState) { - LOG.debug("State of the node:{} has entries={}, {}", - getId(), state.size(), getReplicatedLogState()); + if(LOG.isDebugEnabled()) { + LOG.debug("State of the node:{} has entries={}, {}", + getId(), state.size(), getReplicatedLogState()); + } } else if (message instanceof PrintRole) { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers()); + if(LOG.isDebugEnabled()) { + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + } } else { super.onReceiveCommand(message); @@ -106,7 +110,9 @@ public class ExampleActor extends RaftActor { } catch (Exception e) { LOG.error("Exception in applying snapshot", e); } - LOG.debug("Snapshot applied to state :" + ((HashMap) state).size()); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot applied to state :" + ((HashMap) state).size()); + } } private ByteString fromObject(Object snapshot) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 75c237f503..9d06f63604 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit; */ public class DefaultConfigParamsImpl implements ConfigParams { - private static final int SNAPSHOT_BATCH_COUNT = 100000; + private static final int SNAPSHOT_BATCH_COUNT = 20000; /** * The maximum election time variance diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 778f5c68f6..91bbeeca50 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { - LOG.debug("SnapshotOffer called.."); + LOG.info("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -135,10 +135,11 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setReplicatedLog(replicatedLog); context.setLastApplied(snapshot.getLastAppliedIndex()); - LOG.debug("Applied snapshot to replicatedLog. " + - "snapshotIndex={}, snapshotTerm={}, journal-size={}", + LOG.info("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, - replicatedLog.size()); + replicatedLog.size() + ); // Apply the snapshot to the actors state applySnapshot(ByteString.copyFrom(snapshot.getState())); @@ -156,13 +157,16 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof UpdateElectionTerm) { context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); } else if (message instanceof RecoveryCompleted) { - LOG.debug( - "RecoveryCompleted - Switching actor to Follower - " + - "Persistence Id = " + persistenceId() + - " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + - "journal-size={}", - replicatedLog.lastIndex(), replicatedLog.snapshotIndex, - replicatedLog.snapshotTerm, replicatedLog.size()); + if(LOG.isDebugEnabled()) { + LOG.debug( + "RecoveryCompleted - Switching actor to Follower - " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "journal-size={}", + replicatedLog.lastIndex(), replicatedLog.snapshotIndex, + replicatedLog.snapshotTerm, replicatedLog.size() + ); + } currentBehavior = switchBehavior(RaftState.Follower); onStateChanged(); } @@ -172,9 +176,11 @@ public abstract class RaftActor extends UntypedPersistentActor { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; - LOG.debug("Applying state for log index {} data {}", - applyState.getReplicatedLogEntry().getIndex(), - applyState.getReplicatedLogEntry().getData()); + if(LOG.isDebugEnabled()) { + LOG.debug("Applying state for log index {} data {}", + applyState.getReplicatedLogEntry().getIndex(), + applyState.getReplicatedLogEntry().getData()); + } applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); @@ -182,9 +188,12 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if(message instanceof ApplySnapshot ) { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); - LOG.debug("ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), - snapshot.getLastAppliedTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm() + ); + } applySnapshot(ByteString.copyFrom(snapshot.getState())); //clears the followers log, sets the snapshot index to ensure adjusted-index works @@ -236,23 +245,25 @@ public abstract class RaftActor extends UntypedPersistentActor { context.removePeer(rrp.getName()); } else if (message instanceof CaptureSnapshot) { - LOG.debug("CaptureSnapshot received by actor"); + LOG.info("CaptureSnapshot received by actor"); CaptureSnapshot cs = (CaptureSnapshot)message; captureSnapshot = cs; createSnapshot(); } else if (message instanceof CaptureSnapshotReply){ - LOG.debug("CaptureSnapshotReply received by actor"); + LOG.info("CaptureSnapshotReply received by actor"); CaptureSnapshotReply csr = (CaptureSnapshotReply) message; ByteString stateInBytes = csr.getSnapshot(); - LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); handleCaptureSnapshotReply(stateInBytes); } else { if (!(message instanceof AppendEntriesMessages.AppendEntries) && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { - LOG.debug("onReceiveCommand: message:" + message.getClass()); + if(LOG.isDebugEnabled()) { + LOG.debug("onReceiveCommand: message:" + message.getClass()); + } } RaftState state = @@ -293,7 +304,9 @@ public abstract class RaftActor extends UntypedPersistentActor { context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); - LOG.debug("Persist data {}", replicatedLogEntry); + if(LOG.isDebugEnabled()) { + LOG.debug("Persist data {}", replicatedLogEntry); + } replicatedLog .appendAndPersist(clientActor, identifier, replicatedLogEntry); @@ -482,8 +495,10 @@ public abstract class RaftActor extends UntypedPersistentActor { return null; } String peerAddress = context.getPeerAddress(leaderId); - LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = " - + peerAddress); + if(LOG.isDebugEnabled()) { + LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = " + + peerAddress); + } return peerAddress; } @@ -583,10 +598,13 @@ public abstract class RaftActor extends UntypedPersistentActor { lastAppliedTerm = lastAppliedEntry.getTerm(); } - LOG.debug("Snapshot Capture logSize: {}", journal.size()); - LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied()); - LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); - LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot Capture logSize: {}", journal.size()); + LOG.debug("Snapshot Capture lastApplied:{} ", + context.getLastApplied()); + LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); + LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + } // send a CaptureSnapshot to self to make the expensive operation async. getSelf().tell(new CaptureSnapshot( @@ -638,8 +656,9 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public void update(long currentTerm, String votedFor) { - LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); - + if(LOG.isDebugEnabled()) { + LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + } this.currentTerm = currentTerm; this.votedFor = votedFor; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 7e896fed29..35d563b784 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -272,6 +272,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return null; } + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + return null; + } + + /** * Find the log index from the previous to last entry in the log * @@ -311,7 +322,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { i < index + 1; i++) { ActorRef clientActor = null; String identifier = null; - ClientRequestTracker tracker = findClientRequestTracker(i); + ClientRequestTracker tracker = removeClientRequestTracker(i); if (tracker != null) { clientActor = tracker.getClientActor(); @@ -321,19 +332,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getReplicatedLog().get(i); if (replicatedLogEntry != null) { + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor()); newLastApplied = i; } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - context.getLogger().error( + context.getLogger().warning( "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index ); break; } } - // Send a local message to the local RaftActor (it's derived class to be - // specific to apply the log to it's index) context.getLogger().debug("Setting last applied to {}", newLastApplied); context.setLastApplied(newLastApplied); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 610fdc987f..1cfdf9dba8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.event.LoggingAdapter; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -38,9 +39,13 @@ import java.util.ArrayList; public class Follower extends AbstractRaftActorBehavior { private ByteString snapshotChunksCollected = ByteString.EMPTY; + private final LoggingAdapter LOG; + public Follower(RaftActorContext context) { super(context); + LOG = context.getLogger(); + scheduleElection(electionDuration()); } @@ -48,8 +53,9 @@ public class Follower extends AbstractRaftActorBehavior { AppendEntries appendEntries) { if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { - context.getLogger() - .debug(appendEntries.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntries.toString()); + } } // TODO : Refactor this method into a bunch of smaller methods @@ -79,9 +85,10 @@ public class Follower extends AbstractRaftActorBehavior { // an entry at prevLogIndex and this follower has no entries in // it's log. - context.getLogger().debug( - "The followers log is empty and the senders prevLogIndex is {}", - appendEntries.getPrevLogIndex()); + if(LOG.isDebugEnabled()) { + LOG.debug("The followers log is empty and the senders prevLogIndex is {}", + appendEntries.getPrevLogIndex()); + } } else if (lastIndex() > -1 && appendEntries.getPrevLogIndex() != -1 @@ -90,9 +97,10 @@ public class Follower extends AbstractRaftActorBehavior { // The follower's log is out of sync because the Leader's // prevLogIndex entry was not found in it's log - context.getLogger().debug( - "The log is not empty but the prevLogIndex {} was not found in it", - appendEntries.getPrevLogIndex()); + if(LOG.isDebugEnabled()) { + LOG.debug("The log is not empty but the prevLogIndex {} was not found in it", + appendEntries.getPrevLogIndex()); + } } else if (lastIndex() > -1 && previousEntry != null @@ -102,10 +110,12 @@ public class Follower extends AbstractRaftActorBehavior { // prevLogIndex entry does exist in the follower's log but it has // a different term in it - context.getLogger().debug( - "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}" - , previousEntry.getTerm() - , appendEntries.getPrevLogTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug( + "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}" + , previousEntry.getTerm() + , appendEntries.getPrevLogTerm()); + } } else { outOfSync = false; } @@ -113,9 +123,12 @@ public class Follower extends AbstractRaftActorBehavior { if (outOfSync) { // We found that the log was out of sync so just send a negative // reply and return - context.getLogger().debug("Follower is out-of-sync, " + - "so sending negative reply, lastIndex():{}, lastTerm():{}", - lastIndex(), lastTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("Follower is out-of-sync, " + + "so sending negative reply, lastIndex():{}, lastTerm():{}", + lastIndex(), lastTerm() + ); + } sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() @@ -125,10 +138,12 @@ public class Follower extends AbstractRaftActorBehavior { if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { - context.getLogger().debug( - "Number of entries to be appended = " + appendEntries - .getEntries().size() - ); + if(LOG.isDebugEnabled()) { + LOG.debug( + "Number of entries to be appended = " + appendEntries + .getEntries().size() + ); + } // 3. If an existing entry conflicts with a new one (same index // but different terms), delete the existing entry and all that @@ -151,10 +166,12 @@ public class Follower extends AbstractRaftActorBehavior { continue; } - context.getLogger().debug( - "Removing entries from log starting at " - + matchEntry.getIndex() - ); + if(LOG.isDebugEnabled()) { + LOG.debug( + "Removing entries from log starting at " + + matchEntry.getIndex() + ); + } // Entries do not match so remove all subsequent entries context.getReplicatedLog() @@ -163,10 +180,12 @@ public class Follower extends AbstractRaftActorBehavior { } } - context.getLogger().debug( - "After cleanup entries to be added from = " + (addEntriesFrom - + lastIndex()) - ); + if(LOG.isDebugEnabled()) { + context.getLogger().debug( + "After cleanup entries to be added from = " + (addEntriesFrom + + lastIndex()) + ); + } // 4. Append any new entries not already in the log for (int i = addEntriesFrom; @@ -181,8 +200,9 @@ public class Follower extends AbstractRaftActorBehavior { .appendAndPersist(appendEntries.getEntries().get(i)); } - context.getLogger().debug( - "Log size is now " + context.getReplicatedLog().size()); + if(LOG.isDebugEnabled()) { + LOG.debug("Log size is now " + context.getReplicatedLog().size()); + } } @@ -195,8 +215,9 @@ public class Follower extends AbstractRaftActorBehavior { context.getReplicatedLog().lastIndex())); if (prevCommitIndex != context.getCommitIndex()) { - context.getLogger() - .debug("Commit index set to " + context.getCommitIndex()); + if(LOG.isDebugEnabled()) { + LOG.debug("Commit index set to " + context.getCommitIndex()); + } } // If commitIndex > lastApplied: increment lastApplied, apply @@ -204,10 +225,14 @@ public class Follower extends AbstractRaftActorBehavior { // check if there are any entries to be applied. last-applied can be equal to last-index if (appendEntries.getLeaderCommit() > context.getLastApplied() && context.getLastApplied() < lastIndex()) { - context.getLogger().debug("applyLogToStateMachine, " + - "appendEntries.getLeaderCommit():{}," + - "context.getLastApplied():{}, lastIndex():{}", - appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()); + if(LOG.isDebugEnabled()) { + LOG.debug("applyLogToStateMachine, " + + "appendEntries.getLeaderCommit():{}," + + "context.getLastApplied():{}, lastIndex():{}", + appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex() + ); + } + applyLogToStateMachine(appendEntries.getLeaderCommit()); } @@ -259,9 +284,13 @@ public class Follower extends AbstractRaftActorBehavior { } private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { - context.getLogger().debug("InstallSnapshot received by follower " + - "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(), - installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()); + + if(LOG.isDebugEnabled()) { + LOG.debug("InstallSnapshot received by follower " + + "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(), + installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks() + ); + } try { if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) { @@ -283,8 +312,11 @@ public class Follower extends AbstractRaftActorBehavior { } else { // we have more to go snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); - context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}", - installSnapshot.getChunkIndex(), snapshotChunksCollected.size()); + + if(LOG.isDebugEnabled()) { + LOG.debug("Chunk={},snapshotChunksCollected.size:{}", + installSnapshot.getChunkIndex(), snapshotChunksCollected.size()); + } } sender.tell(new InstallSnapshotReply( diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 90948ffef7..199d2d61cf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import akka.event.LoggingAdapter; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; @@ -80,9 +81,13 @@ public class Leader extends AbstractRaftActorBehavior { private final int minReplicationCount; + private final LoggingAdapter LOG; + public Leader(RaftActorContext context) { super(context); + LOG = context.getLogger(); + if (lastIndex() >= 0) { context.setCommitIndex(lastIndex()); } @@ -98,7 +103,9 @@ public class Leader extends AbstractRaftActorBehavior { followerToLog.put(followerId, followerLogInformation); } - context.getLogger().debug("Election:Leader has following peers:"+ followers); + if(LOG.isDebugEnabled()) { + LOG.debug("Election:Leader has following peers:" + followers); + } if (followers.size() > 0) { minReplicationCount = (followers.size() + 1) / 2 + 1; @@ -123,7 +130,9 @@ public class Leader extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - context.getLogger().debug(appendEntries.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntries.toString()); + } return state(); } @@ -132,8 +141,9 @@ public class Leader extends AbstractRaftActorBehavior { AppendEntriesReply appendEntriesReply) { if(! appendEntriesReply.isSuccess()) { - context.getLogger() - .debug(appendEntriesReply.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntriesReply.toString()); + } } // Update the FollowerLogInformation @@ -142,7 +152,7 @@ public class Leader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - context.getLogger().error("Unknown follower {}", followerId); + LOG.error("Unknown follower {}", followerId); return state(); } @@ -196,6 +206,16 @@ public class Leader extends AbstractRaftActorBehavior { return state(); } + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + + ClientRequestTracker toRemove = findClientRequestTracker(logIndex); + if(toRemove != null) { + trackerList.remove(toRemove); + } + + return toRemove; + } + protected ClientRequestTracker findClientRequestTracker(long logIndex) { for (ClientRequestTracker tracker : trackerList) { if (tracker.getIndex() == logIndex) { @@ -260,10 +280,13 @@ public class Leader extends AbstractRaftActorBehavior { if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply - context.getLogger().debug("InstallSnapshotReply received, " + - "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1); + if(LOG.isDebugEnabled()) { + LOG.debug("InstallSnapshotReply received, " + + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", + reply.getChunkIndex(), followerId, + context.getReplicatedLog().getSnapshotIndex() + 1 + ); + } FollowerLogInformation followerLogInformation = followerToLog.get(followerId); @@ -272,31 +295,38 @@ public class Leader extends AbstractRaftActorBehavior { followerLogInformation.setNextIndex( context.getReplicatedLog().getSnapshotIndex() + 1); mapFollowerToSnapshot.remove(followerId); - context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" + - followerToLog.get(followerId).getNextIndex().get()); + + if(LOG.isDebugEnabled()) { + LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + + followerToLog.get(followerId).getNextIndex().get()); + } } else { followerToSnapshot.markSendStatus(true); } } else { - context.getLogger().info("InstallSnapshotReply received, " + - "sending snapshot chunk failed, Will retry, Chunk:{}", - reply.getChunkIndex()); + LOG.info("InstallSnapshotReply received, " + + "sending snapshot chunk failed, Will retry, Chunk:{}", + reply.getChunkIndex() + ); followerToSnapshot.markSendStatus(false); } } else { - context.getLogger().error("ERROR!!" + - "FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); + LOG.error("ERROR!!" + + "FollowerId in InstallSnapshotReply not known to Leader" + + " or Chunk Index in InstallSnapshotReply not matching {} != {}", + followerToSnapshot.getChunkIndex(), reply.getChunkIndex() + ); } } private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - context.getLogger().debug("Replicate message " + logIndex); + if(LOG.isDebugEnabled()) { + LOG.debug("Replicate message " + logIndex); + } // Create a tracker entry we will use this later to notify the // client actor @@ -350,10 +380,13 @@ public class Leader extends AbstractRaftActorBehavior { if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) { // if the follower is just not starting and leader's index // is more than followers index - context.getLogger().debug("SendInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex); + if(LOG.isDebugEnabled()) { + LOG.debug("SendInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex + ); + } actor().tell(new SendInstallSnapshot(), actor()); } else { @@ -412,11 +445,11 @@ public class Leader extends AbstractRaftActorBehavior { ).toSerializable(), actor() ); - context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}", + LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), mapFollowerToSnapshot.get(followerId).getTotalChunks()); } catch (IOException e) { - context.getLogger().error("InstallSnapshot failed for Leader.", e); + LOG.error("InstallSnapshot failed for Leader.", e); } } @@ -431,7 +464,9 @@ public class Leader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.put(followerId, followerToSnapshot); } ByteString nextChunk = followerToSnapshot.getNextChunk(); - context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + if(LOG.isDebugEnabled()) { + LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + } return nextChunk; } @@ -526,8 +561,10 @@ public class Leader extends AbstractRaftActorBehavior { int size = snapshotBytes.size(); totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); - context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}", - size, totalChunks); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot {} bytes, total chunks to send:{}", + size, totalChunks); + } } public ByteString getSnapshotBytes() { @@ -591,8 +628,10 @@ public class Leader extends AbstractRaftActorBehavior { } } - context.getLogger().debug("length={}, offset={},size={}", - snapshotLength, start, size); + if(LOG.isDebugEnabled()) { + LOG.debug("length={}, offset={},size={}", + snapshotLength, start, size); + } return getSnapshotBytes().substring(start, start + size); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java index 9d40fa3d9e..c084cba822 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -9,7 +9,7 @@ package org.opendaylight.controller.cluster.raft.messages; import com.google.protobuf.ByteString; -import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages; +import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; public class InstallSnapshot extends AbstractRaftRPC { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 73c9f96b82..c4ef51d968 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapsho import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; -import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java index ef56d02a2e..cf37cbdd00 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java @@ -17,7 +17,9 @@ public abstract class AbstractUntypedActor extends UntypedActor { Logging.getLogger(getContext().system(), this); public AbstractUntypedActor() { - LOG.debug("Actor created {}", getSelf()); + if(LOG.isDebugEnabled()) { + LOG.debug("Actor created {}", getSelf()); + } getContext(). system(). actorSelection("user/termination-monitor"). @@ -27,11 +29,13 @@ public abstract class AbstractUntypedActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { final String messageType = message.getClass().getSimpleName(); - LOG.debug("Received message {}", messageType); - + if(LOG.isDebugEnabled()) { + LOG.debug("Received message {}", messageType); + } handleReceive(message); - - LOG.debug("Done handling message {}", messageType); + if(LOG.isDebugEnabled()) { + LOG.debug("Done handling message {}", messageType); + } } protected abstract void handleReceive(Object message) throws Exception; @@ -41,7 +45,9 @@ public abstract class AbstractUntypedActor extends UntypedActor { } protected void unknownMessage(Object message) throws Exception { - LOG.debug("Received unhandled message {}", message); + if(LOG.isDebugEnabled()) { + LOG.debug("Received unhandled message {}", message); + } unhandled(message); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java similarity index 87% rename from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java index e801ae1c10..b93be3e009 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // source: InstallSnapshot.proto -package org.opendaylight.controller.cluster.raft.protobuff.messages; +package org.opendaylight.controller.protobuff.messages.cluster.raft; public final class InstallSnapshotMessages { private InstallSnapshotMessages() {} @@ -186,14 +186,14 @@ public final class InstallSnapshotMessages { } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable + return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable .ensureFieldAccessorsInitialized( - org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class); + org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class); } public static com.google.protobuf.Parser PARSER = @@ -245,7 +245,7 @@ public final class InstallSnapshotMessages { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -261,7 +261,7 @@ public final class InstallSnapshotMessages { getLeaderIdBytes() { java.lang.Object ref = leaderId_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); leaderId_ = b; @@ -442,53 +442,53 @@ public final class InstallSnapshotMessages { return super.writeReplace(); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data) + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input) + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseFrom(input, extensionRegistry); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input) + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseDelimitedFrom(input); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom( + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseDelimitedFrom(input, extensionRegistry); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } - public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -497,7 +497,7 @@ public final class InstallSnapshotMessages { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) { + public static Builder newBuilder(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } @@ -513,20 +513,20 @@ public final class InstallSnapshotMessages { */ public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder - implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder { + implements org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshotOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable + return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable .ensureFieldAccessorsInitialized( - org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class); + org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class); } - // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder() + // Construct using org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.newBuilder() private Builder() { maybeForceBuilderInitialization(); } @@ -569,23 +569,23 @@ public final class InstallSnapshotMessages { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; } - public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() { - return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance(); + public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() { + return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance(); } - public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() { - org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial(); + public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot build() { + org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } return result; } - public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() { - org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this); + public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot buildPartial() { + org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot(this); int from_bitField0_ = bitField0_; int to_bitField0_ = 0; if (((from_bitField0_ & 0x00000001) == 0x00000001)) { @@ -622,16 +622,16 @@ public final class InstallSnapshotMessages { } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) { - return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other); + if (other instanceof org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) { + return mergeFrom((org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) { - if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this; + public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot other) { + if (other == org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this; if (other.hasTerm()) { setTerm(other.getTerm()); } @@ -667,11 +667,11 @@ public final class InstallSnapshotMessages { com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { - org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null; + org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parsedMessage = null; try { parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); } catch (com.google.protobuf.InvalidProtocolBufferException e) { - parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage(); + parsedMessage = (org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage(); throw e; } finally { if (parsedMessage != null) { @@ -744,7 +744,7 @@ public final class InstallSnapshotMessages { getLeaderIdBytes() { java.lang.Object ref = leaderId_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); leaderId_ = b; @@ -988,8 +988,8 @@ public final class InstallSnapshotMessages { "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" + "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" + " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" + - "light.controller.cluster.raft.protobuff." + - "messagesB\027InstallSnapshotMessagesH\001" + "light.controller.protobuff.messages.clus" + + "ter.raftB\027InstallSnapshotMessagesH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java index c9d5e89ae1..0f93f43c56 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java @@ -100,7 +100,9 @@ public class XmlStreamUtils { for (Entry e: prefixes.getPrefixes()) { writer.writeNamespace(e.getValue(), e.getKey().toString()); } - LOG.debug("Instance identifier with Random prefix is now {}", str); + if(LOG.isDebugEnabled()) { + LOG.debug("Instance identifier with Random prefix is now {}", str); + } writer.writeCharacters(str); } @@ -169,7 +171,7 @@ public class XmlStreamUtils { DataSchemaNode childSchema = null; if (schema instanceof DataNodeContainer) { childSchema = SchemaUtils.findFirstSchema(child.getNodeType(), ((DataNodeContainer) schema).getChildNodes()).orNull(); - if (childSchema == null) { + if (childSchema == null && LOG.isDebugEnabled()) { LOG.debug("Probably the data node \"{}\" does not conform to schema", child == null ? "" : child.getNodeType().getLocalName()); } } @@ -192,7 +194,9 @@ public class XmlStreamUtils { */ public void writeValue(final @Nonnull XMLStreamWriter writer, final @Nonnull TypeDefinition type, final Object value) throws XMLStreamException { if (value == null) { - LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName()); + if(LOG.isDebugEnabled()){ + LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName()); + } return; } @@ -232,18 +236,24 @@ public class XmlStreamUtils { writer.writeNamespace(prefix, qname.getNamespace().toString()); writer.writeCharacters(prefix + ':' + qname.getLocalName()); } else { - LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass()); + if(LOG.isDebugEnabled()) { + LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass()); + } writer.writeCharacters(String.valueOf(value)); } } private static void write(final @Nonnull XMLStreamWriter writer, final @Nonnull InstanceIdentifierTypeDefinition type, final @Nonnull Object value) throws XMLStreamException { if (value instanceof YangInstanceIdentifier) { - LOG.debug("Writing InstanceIdentifier object {}", value); + if(LOG.isDebugEnabled()) { + LOG.debug("Writing InstanceIdentifier object {}", value); + } write(writer, (YangInstanceIdentifier)value); } else { - LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass()); - writer.writeCharacters(String.valueOf(value)); + if(LOG.isDebugEnabled()) { + LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass()); + } + writer.writeCharacters(String.valueOf(value)); } } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java index ea8f4a3ef1..d0cc2adb5f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java @@ -74,7 +74,9 @@ public class XmlUtils { * @return xml String */ public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){ - LOG.debug("Converting input composite node to xml {}", cNode); + if(LOG.isDebugEnabled()) { + LOG.debug("Converting input composite node to xml {}", cNode); + } if (cNode == null) { return BLANK; } @@ -88,12 +90,14 @@ public class XmlUtils { Set rpcs = schemaContext.getOperations(); for(RpcDefinition rpc : rpcs) { if(rpc.getQName().equals(cNode.getNodeType())){ - LOG.debug("Found the rpc definition from schema context matching with input composite node {}", rpc.getQName()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Found the rpc definition from schema context matching with input composite node {}", rpc.getQName()); + } CompositeNode inputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "input")); domTree = XmlDocumentUtils.toDocument(inputContainer, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider()); - - LOG.debug("input composite node to document conversion complete, document is {}", domTree); + if(LOG.isDebugEnabled()) { + LOG.debug("input composite node to document conversion complete, document is {}", domTree); + } break; } } @@ -111,7 +115,9 @@ public class XmlUtils { * @return xml string */ public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){ - LOG.debug("Converting output composite node to xml {}", cNode); + if(LOG.isDebugEnabled()) { + LOG.debug("Converting output composite node to xml {}", cNode); + } if (cNode == null) { return BLANK; } @@ -125,12 +131,14 @@ public class XmlUtils { Set rpcs = schemaContext.getOperations(); for(RpcDefinition rpc : rpcs) { if(rpc.getQName().equals(cNode.getNodeType())){ - LOG.debug("Found the rpc definition from schema context matching with output composite node {}", rpc.getQName()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Found the rpc definition from schema context matching with output composite node {}", rpc.getQName()); + } CompositeNode outputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "output")); domTree = XmlDocumentUtils.toDocument(outputContainer, rpc.getOutput(), XmlDocumentUtils.defaultValueCodecProvider()); - - LOG.debug("output composite node to document conversion complete, document is {}", domTree); + if(LOG.isDebugEnabled()) { + LOG.debug("output composite node to document conversion complete, document is {}", domTree); + } break; } } @@ -152,8 +160,9 @@ public class XmlUtils { LOG.error("Error during translation of Document to OutputStream", e); } - LOG.debug("Document to string conversion complete, xml string is {} ", writer.toString()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Document to string conversion complete, xml string is {} ", writer.toString()); + } return writer.toString(); } @@ -188,7 +197,9 @@ public class XmlUtils { * @return CompositeNode object based on the input, if any of the input parameter is null, a null object is returned */ public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml, SchemaContext schemaContext){ - LOG.debug("Converting input xml to composite node {}", xml); + if(LOG.isDebugEnabled()) { + LOG.debug("Converting input xml to composite node {}", xml); + } if (xml==null || xml.length()==0) { return null; } @@ -208,8 +219,9 @@ public class XmlUtils { Set rpcs = schemaContext.getOperations(); for(RpcDefinition rpcDef : rpcs) { if(rpcDef.getQName().equals(rpc)){ - LOG.debug("found the rpc definition from schema context matching rpc {}", rpc); - + if(LOG.isDebugEnabled()) { + LOG.debug("found the rpc definition from schema context matching rpc {}", rpc); + } if(rpcDef.getInput() == null) { LOG.warn("found rpc definition's input is null"); return null; @@ -225,9 +237,9 @@ public class XmlUtils { List> dataNodes = XmlDocumentUtils.toDomNodes(xmlData, Optional.of(rpcDef.getInput().getChildNodes()), schemaContext); - - LOG.debug("Converted xml input to list of nodes {}", dataNodes); - + if(LOG.isDebugEnabled()) { + LOG.debug("Converted xml input to list of nodes {}", dataNodes); + } final CompositeNodeBuilder it = ImmutableCompositeNode.builder(); it.setQName(rpc); it.add(ImmutableCompositeNode.create(input, dataNodes)); @@ -240,8 +252,9 @@ public class XmlUtils { } catch (IOException e) { LOG.error("Error during building data tree from XML", e); } - - LOG.debug("Xml to composite node conversion complete {} ", compositeNode); + if(LOG.isDebugEnabled()) { + LOG.debug("Xml to composite node conversion complete {} ", compositeNode); + } return compositeNode; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto similarity index 82% rename from opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto rename to opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto index 14f821b5e2..4198644b13 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto @@ -1,6 +1,6 @@ package org.opendaylight.controller.cluster.raft; -option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages"; +option java_package = "org.opendaylight.controller.protobuff.messages.cluster.raft"; option java_outer_classname = "InstallSnapshotMessages"; option optimize_for = SPEED; diff --git a/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/ThreadExecutorStatsMXBeanImpl.java b/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/ThreadExecutorStatsMXBeanImpl.java index b67855d731..58677103c2 100644 --- a/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/ThreadExecutorStatsMXBeanImpl.java +++ b/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/ThreadExecutorStatsMXBeanImpl.java @@ -16,6 +16,8 @@ import java.util.concurrent.ThreadPoolExecutor; import javax.annotation.Nullable; import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler; import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MXBean implementation of the ThreadExecutorStatsMXBean interface that retrieves statistics @@ -25,7 +27,7 @@ import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue; */ public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean implements ThreadExecutorStatsMXBean { - + private static final Logger LOG = LoggerFactory.getLogger(ThreadExecutorStatsMXBeanImpl.class); private final ThreadPoolExecutor executor; /** @@ -36,14 +38,31 @@ public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean * @param mBeanType Used as the type property in the bean's ObjectName. * @param mBeanCategory Used as the Category property in the bean's ObjectName. */ - public ThreadExecutorStatsMXBeanImpl(Executor executor, String mBeanName, - String mBeanType, @Nullable String mBeanCategory) { + public ThreadExecutorStatsMXBeanImpl(final ThreadPoolExecutor executor, final String mBeanName, + final String mBeanType, @Nullable final String mBeanCategory) { super(mBeanName, mBeanType, mBeanCategory); + this.executor = Preconditions.checkNotNull(executor); + } + + /** + * Create a new bean for the statistics, which is already registered. + * + * @param executor + * @param mBeanName + * @param mBeanType + * @param mBeanCategory + * @return + */ + public static ThreadExecutorStatsMXBeanImpl create(final Executor executor, final String mBeanName, + final String mBeanType, @Nullable final String mBeanCategory) { + if (executor instanceof ThreadPoolExecutor) { + final ThreadExecutorStatsMXBeanImpl ret = new ThreadExecutorStatsMXBeanImpl((ThreadPoolExecutor) executor, mBeanName, mBeanType, mBeanCategory); + ret.registerMBean(); + return ret; + } - Preconditions.checkArgument(executor instanceof ThreadPoolExecutor, - "The ExecutorService of type {} is not an instanceof ThreadPoolExecutor", - executor.getClass()); - this.executor = (ThreadPoolExecutor)executor; + LOG.info("Executor {} is not supported", executor); + return null; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index bf541d95de..c780881a2f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -76,9 +76,9 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au Preconditions.checkNotNull(path, "path should not be null"); Preconditions.checkNotNull(listener, "listener should not be null"); - - LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); - + if(LOG.isDebugEnabled()) { + LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); + } ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf( DataChangeListener.props(listener )); @@ -108,11 +108,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au }, actorContext.getActorSystem().dispatcher()); return listenerRegistrationProxy; } - - LOG.debug( - "No local shard for shardName {} was found so returning a noop registration", - shardName); - + if(LOG.isDebugEnabled()) { + LOG.debug( + "No local shard for shardName {} was found so returning a noop registration", + shardName); + } return new NoOpDataChangeListenerRegistration(listener); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 713996b13b..0fa27706e1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; @@ -172,8 +171,11 @@ public class Shard extends RaftActor { } @Override public void onReceiveRecover(Object message) { - LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), - getSender()); + if(LOG.isDebugEnabled()) { + LOG.debug("onReceiveRecover: Received message {} from {}", + message.getClass().toString(), + getSender()); + } if (message instanceof RecoveryFailure){ LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause"); @@ -183,8 +185,11 @@ public class Shard extends RaftActor { } @Override public void onReceiveCommand(Object message) { - LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(), - getSender()); + if(LOG.isDebugEnabled()) { + LOG.debug("onReceiveCommand: Received message {} from {}", + message.getClass().toString(), + getSender()); + } if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { // This must be for install snapshot. Don't want to open this up and trigger @@ -193,6 +198,7 @@ public class Shard extends RaftActor { .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self()); + createSnapshotTransaction = null; // Send a PoisonPill instead of sending close transaction because we do not really need // a response getSender().tell(PoisonPill.getInstance(), self()); @@ -298,7 +304,9 @@ public class Shard extends RaftActor { ShardTransactionIdentifier.builder() .remoteTransactionId(remoteTransactionId) .build(); - LOG.debug("Creating transaction : {} ", transactionId); + if(LOG.isDebugEnabled()) { + LOG.debug("Creating transaction : {} ", transactionId); + } ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, transactionChainId); @@ -325,13 +333,19 @@ public class Shard extends RaftActor { DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(serialized); if (cohort == null) { - LOG.debug( - "Could not find cohort for modification : {}. Writing modification using a new transaction", - modification); + + if(LOG.isDebugEnabled()) { + LOG.debug( + "Could not find cohort for modification : {}. Writing modification using a new transaction", + modification); + } + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - LOG.debug("Created new transaction {}", transaction.getIdentifier().toString()); + if(LOG.isDebugEnabled()) { + LOG.debug("Created new transaction {}", transaction.getIdentifier().toString()); + } modification.apply(transaction); try { @@ -352,13 +366,12 @@ public class Shard extends RaftActor { return; } - final ListenableFuture future = cohort.commit(); - final ActorRef self = getSelf(); + ListenableFuture future = cohort.commit(); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Void v) { - sender.tell(new CommitTransactionReply().toSerializable(), self); + sender.tell(new CommitTransactionReply().toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); } @@ -367,7 +380,7 @@ public class Shard extends RaftActor { public void onFailure(Throwable t) { LOG.error(t, "An exception happened during commit"); shardMBean.incrementFailedTransactionsCount(); - sender.tell(new akka.actor.Status.Failure(t), self); + sender.tell(new akka.actor.Status.Failure(t), getSelf()); } }); @@ -401,8 +414,10 @@ public class Shard extends RaftActor { private void registerChangeListener( RegisterChangeListener registerChangeListener) { - LOG.debug("registerDataChangeListener for {}", registerChangeListener - .getPath()); + if(LOG.isDebugEnabled()) { + LOG.debug("registerDataChangeListener for {}", registerChangeListener + .getPath()); + } ActorSelection dataChangeListenerPath = getContext() @@ -430,23 +445,17 @@ public class Shard extends RaftActor { getContext().actorOf( DataChangeListenerRegistration.props(registration)); - LOG.debug( - "registerDataChangeListener sending reply, listenerRegistrationPath = {} " - , listenerRegistration.path().toString()); + if(LOG.isDebugEnabled()) { + LOG.debug( + "registerDataChangeListener sending reply, listenerRegistrationPath = {} " + , listenerRegistration.path().toString()); + } getSender() .tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } - private void createTransactionChain() { - DOMStoreTransactionChain chain = store.createTransactionChain(); - ActorRef transactionChain = getContext().actorOf( - ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean)); - getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(), - getSelf()); - } - private boolean isMetricsCaptureEnabled(){ CommonConfig config = new CommonConfig(getContext().system().settings().config()); return config.isMetricCaptureEnabled(); @@ -503,6 +512,8 @@ public class Shard extends RaftActor { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower + + LOG.info("Applying snapshot"); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); @@ -517,6 +528,8 @@ public class Shard extends RaftActor { syncCommitTransaction(transaction); } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred when applying snapshot"); + } finally { + LOG.info("Done applying snapshot"); } } @@ -526,14 +539,17 @@ public class Shard extends RaftActor { .tell(new EnableNotification(isLeader()), getSelf()); } - shardMBean.setRaftState(getRaftState().name()); shardMBean.setCurrentTerm(getCurrentTerm()); // If this actor is no longer the leader close all the transaction chains if(!isLeader()){ for(Map.Entry entry : transactionChains.entrySet()){ - LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId()); + if(LOG.isDebugEnabled()) { + LOG.debug( + "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", + entry.getKey(), getId()); + } entry.getValue().close(); } @@ -542,10 +558,6 @@ public class Shard extends RaftActor { } @Override protected void onLeaderChanged(String oldLeader, String newLeader) { - if((oldLeader == null && newLeader == null) || (newLeader != null && newLeader.equals(oldLeader)) ){ - return; - } - LOG.info("Current state = {}, Leader = {}", getRaftState().name(), newLeader); shardMBean.setLeader(newLeader); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 13ecaa5619..a97c00f1d8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -337,11 +337,11 @@ public class ShardManager extends AbstractUntypedActorWithMetering { peerAddress); if(peerAddresses.containsKey(peerId)){ peerAddresses.put(peerId, peerAddress); - - LOG.debug( - "Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); - + if(LOG.isDebugEnabled()) { + LOG.debug( + "Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); + } actor .tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index b810ed9575..f5ca6e3c5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -105,7 +105,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor { getSender().tell(new GetCompositeModificationReply( new ImmutableCompositeModification(modification)), getSelf()); } else if (message instanceof ReceiveTimeout) { - LOG.debug("Got ReceiveTimeout for inactivity - closing Tx"); + if(LOG.isDebugEnabled()) { + LOG.debug("Got ReceiveTimeout for inactivity - closing Tx"); + } closeTransaction(false); } else { throw new UnknownMessageException(message); @@ -163,8 +165,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor { protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) { modification.addModification( new WriteModification(message.getPath(), message.getData(),schemaContext)); - LOG.debug("writeData at path : " + message.getPath().toString()); - + if(LOG.isDebugEnabled()) { + LOG.debug("writeData at path : " + message.getPath().toString()); + } try { transaction.write(message.getPath(), message.getData()); getSender().tell(new WriteDataReply().toSerializable(), getSelf()); @@ -176,7 +179,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor { protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) { modification.addModification( new MergeModification(message.getPath(), message.getData(), schemaContext)); - LOG.debug("mergeData at path : " + message.getPath().toString()); + if(LOG.isDebugEnabled()) { + LOG.debug("mergeData at path : " + message.getPath().toString()); + } try { transaction.merge(message.getPath(), message.getData()); getSender().tell(new MergeDataReply().toSerializable(), getSelf()); @@ -186,7 +191,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor { } protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) { - LOG.debug("deleteData at path : " + message.getPath().toString()); + if(LOG.isDebugEnabled()) { + LOG.debug("deleteData at path : " + message.getPath().toString()); + } modification.addModification(new DeleteModification(message.getPath())); try { transaction.delete(message.getPath()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java index e6ac7f8dbc..0c3d33a78c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java @@ -25,7 +25,9 @@ public class TerminationMonitor extends UntypedActor{ @Override public void onReceive(Object message) throws Exception { if(message instanceof Terminated){ Terminated terminated = (Terminated) message; - LOG.debug("Actor terminated : {}", terminated.actor()); + if(LOG.isDebugEnabled()) { + LOG.debug("Actor terminated : {}", terminated.actor()); + } } else if(message instanceof Monitor){ Monitor monitor = (Monitor) message; getContext().watch(monitor.getActorRef()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index e3ae5dac7b..df85bb136a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -101,7 +101,9 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { private void commit(CommitTransaction message) { // Forward the commit to the shard - log.debug("Forward commit transaction to Shard {} ", shardActor); + if(log.isDebugEnabled()) { + log.debug("Forward commit transaction to Shard {} ", shardActor); + } shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index a5be69531d..a7a5b31b17 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -65,9 +65,10 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public Void apply(Iterable paths) { cohortPaths = Lists.newArrayList(paths); - - LOG.debug("Tx {} successfully built cohort path list: {}", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} successfully built cohort path list: {}", transactionId, cohortPaths); + } return null; } }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher()); @@ -75,8 +76,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public ListenableFuture canCommit() { - LOG.debug("Tx {} canCommit", transactionId); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} canCommit", transactionId); + } final SettableFuture returnFuture = SettableFuture.create(); // The first phase of canCommit is to gather the list of cohort actor paths that will @@ -89,7 +91,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public void onComplete(Throwable failure, Void notUsed) throws Throwable { if(failure != null) { - LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure); + } returnFuture.setException(failure); } else { finishCanCommit(returnFuture); @@ -101,9 +105,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } private void finishCanCommit(final SettableFuture returnFuture) { - - LOG.debug("Tx {} finishCanCommit", transactionId); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} finishCanCommit", transactionId); + } // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform // their canCommit processing. If any one fails then we'll fail canCommit. @@ -114,7 +118,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public void onComplete(Throwable failure, Iterable responses) throws Throwable { if(failure != null) { - LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure); + } returnFuture.setException(failure); return; } @@ -135,9 +141,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho return; } } - - LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); + } returnFuture.set(Boolean.valueOf(result)); } }, actorContext.getActorSystem().dispatcher()); @@ -146,9 +152,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private Future> invokeCohorts(Object message) { List> futureList = Lists.newArrayListWithCapacity(cohortPaths.size()); for(ActorPath actorPath : cohortPaths) { - - LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath); + } ActorSelection cohort = actorContext.actorSelection(actorPath); futureList.add(actorContext.executeRemoteOperationAsync(cohort, message)); @@ -184,8 +190,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private ListenableFuture voidOperation(final String operationName, final Object message, final Class expectedResponseClass, final boolean propagateException) { - LOG.debug("Tx {} {}", transactionId, operationName); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} {}", transactionId, operationName); + } final SettableFuture returnFuture = SettableFuture.create(); // The cohort actor list should already be built at this point by the canCommit phase but, @@ -199,9 +206,10 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public void onComplete(Throwable failure, Void notUsed) throws Throwable { if(failure != null) { - LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure); - + } if(propagateException) { returnFuture.setException(failure); } else { @@ -221,9 +229,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private void finishVoidOperation(final String operationName, final Object message, final Class expectedResponseClass, final boolean propagateException, final SettableFuture returnFuture) { - - LOG.debug("Tx {} finish {}", transactionId, operationName); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} finish {}", transactionId, operationName); + } Future> combinedFuture = invokeCohorts(message); combinedFuture.onComplete(new OnComplete>() { @@ -243,9 +251,10 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } if(exceptionToPropagate != null) { - LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId, + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId, operationName, exceptionToPropagate); - + } if(propagateException) { // We don't log the exception here to avoid redundant logging since we're // propagating to the caller in MD-SAL core who will log it. @@ -254,12 +263,16 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho // Since the caller doesn't want us to propagate the exception we'll also // not log it normally. But it's usually not good to totally silence // exceptions so we'll log it to debug level. - LOG.debug(String.format("%s failed", message.getClass().getSimpleName()), + if(LOG.isDebugEnabled()) { + LOG.debug(String.format("%s failed", message.getClass().getSimpleName()), exceptionToPropagate); + } returnFuture.set(null); } } else { - LOG.debug("Tx {}: {} succeeded", transactionId, operationName); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: {} succeeded", transactionId, operationName); + } returnFuture.set(null); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 97a9ff0bf3..6cf16b4426 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -224,8 +224,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { new TransactionProxyCleanupPhantomReference(this); phantomReferenceCache.put(cleanup, cleanup); } - - LOG.debug("Created txn {} of type {}", identifier, transactionType); + if(LOG.isDebugEnabled()) { + LOG.debug("Created txn {} of type {}", identifier, transactionType); + } } @Override @@ -235,8 +236,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Read operation on write-only transaction is not allowed"); - LOG.debug("Tx {} read {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} read {}", identifier, path); + } createTransactionIfMissing(actorContext, path); return transactionContext(path).readData(path); @@ -248,8 +250,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Exists operation on write-only transaction is not allowed"); - LOG.debug("Tx {} exists {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} exists {}", identifier, path); + } createTransactionIfMissing(actorContext, path); return transactionContext(path).dataExists(path); @@ -267,8 +270,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); - LOG.debug("Tx {} write {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} write {}", identifier, path); + } createTransactionIfMissing(actorContext, path); transactionContext(path).writeData(path, data); @@ -279,8 +283,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); - LOG.debug("Tx {} merge {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} merge {}", identifier, path); + } createTransactionIfMissing(actorContext, path); transactionContext(path).mergeData(path, data); @@ -290,9 +295,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void delete(YangInstanceIdentifier path) { checkModificationState(); - - LOG.debug("Tx {} delete {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} delete {}", identifier, path); + } createTransactionIfMissing(actorContext, path); transactionContext(path).deleteData(path); @@ -305,16 +310,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { inReadyState = true; - LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier, + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); - + } List> cohortPathFutures = Lists.newArrayList(); for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - LOG.debug("Tx {} Readying transaction for shard {}", identifier, + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); - + } cohortPathFutures.add(transactionContext.readyTransaction()); } @@ -381,8 +388,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath); + } ActorSelection transactionActor = actorContext.actorSelection(transactionPath); if (transactionType == TransactionType.READ_ONLY) { @@ -404,7 +412,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { "Invalid reply type {} for CreateTransaction", response.getClass())); } } catch (Exception e) { - LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); + } remoteTransactionPaths .put(shardName, new NoOpTransactionContext(shardName, e, identifier)); } @@ -489,15 +499,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void closeTransaction() { - LOG.debug("Tx {} closeTransaction called", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} closeTransaction called", identifier); + } actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable()); } @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", identifier, recordedOperationFutures.size()); - + } // Send the ReadyTransaction message to the Tx actor. final Future replyFuture = actorContext.executeRemoteOperationAsync(getActor(), @@ -522,10 +535,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return combinedFutures.transform(new AbstractFunction1, ActorPath>() { @Override public ActorPath apply(Iterable notUsed) { - - LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", identifier); - + } // At this point all the Futures succeeded and we need to extract the cohort // actor path from the ReadyTransactionReply. For the recorded operations, they // don't return any data so we're only interested that they completed @@ -543,9 +556,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { String resolvedCohortPath = getResolvedCohortPath( reply.getCohortPath().toString()); - LOG.debug("Tx {} readyTransaction: resolved cohort path {}", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction: resolved cohort path {}", identifier, resolvedCohortPath); - + } return actorContext.actorFor(resolvedCohortPath); } else { // Throwing an exception here will fail the Future. @@ -559,21 +573,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void deleteData(YangInstanceIdentifier path) { - LOG.debug("Tx {} deleteData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} deleteData called path = {}", identifier, path); + } recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() )); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} mergeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} mergeData called path = {}", identifier, path); + } recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), new MergeData(path, data, schemaContext).toSerializable())); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} writeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} writeData called path = {}", identifier, path); + } recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), new WriteData(path, data, schemaContext).toSerializable())); } @@ -582,8 +602,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> readData( final YangInstanceIdentifier path) { - LOG.debug("Tx {} readData called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData called path = {}", identifier, path); + } final SettableFuture>> returnFuture = SettableFuture.create(); // If there were any previous recorded put/merge/delete operation reply Futures then we @@ -593,9 +614,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(recordedOperationFutures.isEmpty()) { finishReadData(path, returnFuture); } else { - LOG.debug("Tx {} readData: verifying {} previous recorded operations", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData: verifying {} previous recorded operations", identifier, recordedOperationFutures.size()); - + } // Note: we make a copy of recordedOperationFutures to be on the safe side in case // Futures#sequence accesses the passed List on a different thread, as // recordedOperationFutures is not synchronized. @@ -608,9 +630,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void onComplete(Throwable failure, Iterable notUsed) throws Throwable { if(failure != null) { - LOG.debug("Tx {} readData: a recorded operation failed: {}", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData: a recorded operation failed: {}", identifier, failure); - + } returnFuture.setException(new ReadFailedException( "The read could not be performed because a previous put, merge," + "or delete operation failed", failure)); @@ -629,20 +652,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void finishReadData(final YangInstanceIdentifier path, final SettableFuture>> returnFuture) { - LOG.debug("Tx {} finishReadData called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} finishReadData called path = {}", identifier, path); + } OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object readResponse) throws Throwable { if(failure != null) { - LOG.debug("Tx {} read operation failed: {}", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} read operation failed: {}", identifier, failure); + } returnFuture.setException(new ReadFailedException( "Error reading data for path " + path, failure)); } else { - LOG.debug("Tx {} read operation succeeded", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} read operation succeeded", identifier, failure); + } if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse); @@ -669,8 +695,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture dataExists( final YangInstanceIdentifier path) { - LOG.debug("Tx {} dataExists called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists called path = {}", identifier, path); + } final SettableFuture returnFuture = SettableFuture.create(); // If there were any previous recorded put/merge/delete operation reply Futures then we @@ -681,9 +708,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(recordedOperationFutures.isEmpty()) { finishDataExists(path, returnFuture); } else { - LOG.debug("Tx {} dataExists: verifying {} previous recorded operations", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists: verifying {} previous recorded operations", identifier, recordedOperationFutures.size()); - + } // Note: we make a copy of recordedOperationFutures to be on the safe side in case // Futures#sequence accesses the passed List on a different thread, as // recordedOperationFutures is not synchronized. @@ -696,9 +724,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void onComplete(Throwable failure, Iterable notUsed) throws Throwable { if(failure != null) { - LOG.debug("Tx {} dataExists: a recorded operation failed: {}", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists: a recorded operation failed: {}", identifier, failure); - + } returnFuture.setException(new ReadFailedException( "The data exists could not be performed because a previous " + "put, merge, or delete operation failed", failure)); @@ -717,19 +746,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void finishDataExists(final YangInstanceIdentifier path, final SettableFuture returnFuture) { - LOG.debug("Tx {} finishDataExists called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} finishDataExists called path = {}", identifier, path); + } OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object response) throws Throwable { if(failure != null) { - LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure); + } returnFuture.setException(new ReadFailedException( "Error checking data exists for path " + path, failure)); } else { - LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); + } if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { returnFuture.set(Boolean.valueOf(DataExistsReply. fromSerializable(response).exists())); @@ -761,34 +793,46 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void closeTransaction() { - LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier); + } } @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction called", identifier); + } return akka.dispatch.Futures.failed(failure); } @Override public void deleteData(YangInstanceIdentifier path) { - LOG.debug("Tx {} deleteData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} deleteData called path = {}", identifier, path); + } } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} mergeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} mergeData called path = {}", identifier, path); + } } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} writeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} writeData called path = {}", identifier, path); + } } @Override public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { - LOG.debug("Tx {} readData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData called path = {}", identifier, path); + } return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error reading data for path " + path, failure)); } @@ -796,7 +840,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public CheckedFuture dataExists( YangInstanceIdentifier path) { - LOG.debug("Tx {} dataExists called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists called path = {}", identifier, path); + } return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error checking exists for path " + path, failure)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java index 0a1964b053..74a91d08cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java @@ -74,7 +74,7 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { } public void setDataStoreExecutor(ExecutorService dsExecutor) { - this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor, + this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor, "notification-executor", getMBeanType(), getMBeanCategory()); } @@ -82,7 +82,7 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager, "notification-manager", getMBeanType(), getMBeanCategory()); - this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(), + this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(), "data-store-executor", getMBeanType(), getMBeanCategory()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 7b5588cb19..8ba333d279 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -125,8 +125,9 @@ public class ActorContext { if (result instanceof LocalShardFound) { LocalShardFound found = (LocalShardFound) result; - LOG.debug("Local shard found {}", found.getPath()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Local shard found {}", found.getPath()); + } return found.getPath(); } @@ -141,8 +142,9 @@ public class ActorContext { if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { PrimaryFound found = PrimaryFound.fromSerializable(result); - LOG.debug("Primary found {}", found.getPrimaryPath()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Primary found {}", found.getPrimaryPath()); + } return found.getPrimaryPath(); } throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName); @@ -175,9 +177,10 @@ public class ActorContext { */ public Object executeRemoteOperation(ActorSelection actor, Object message) { - LOG.debug("Sending remote message {} to {}", message.getClass().toString(), - actor.toString()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Sending remote message {} to {}", message.getClass().toString(), + actor.toString()); + } Future future = ask(actor, message, operationTimeout); try { @@ -197,8 +200,9 @@ public class ActorContext { */ public Future executeRemoteOperationAsync(ActorSelection actor, Object message) { - LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); + } return ask(actor, message, operationTimeout); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java index 4c550a768c..022ef9bbaf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java @@ -25,12 +25,16 @@ public abstract class AbstractActorTest { System.setProperty("shard.persistent", "false"); system = ActorSystem.create("test"); + + deletePersistenceFiles(); } @AfterClass public static void tearDownClass() throws IOException { JavaTestKit.shutdownActorSystem(system); system = null; + + deletePersistenceFiles(); } protected static void deletePersistenceFiles() throws IOException { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 06bcac8d78..deb71c2df4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -343,11 +343,16 @@ public class ShardTest extends AbstractActorTest { subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); - waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor"); + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + + subject.tell(new CaptureSnapshot(-1,-1,-1,-1), + getRef()); + + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + } }; - Thread.sleep(2000); deletePersistenceFiles(); }}; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java new file mode 100644 index 0000000000..0e492f0fbb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.dispatch.Futures; +import akka.japi.Option; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.snapshot.japi.SnapshotStore; +import com.google.common.collect.Iterables; +import scala.concurrent.Future; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InMemorySnapshotStore extends SnapshotStore { + + Map> snapshots = new HashMap<>(); + + @Override public Future> doLoadAsync(String s, + SnapshotSelectionCriteria snapshotSelectionCriteria) { + List snapshotList = snapshots.get(s); + if(snapshotList == null){ + return Futures.successful(Option.none()); + } + + Snapshot snapshot = Iterables.getLast(snapshotList); + SelectedSnapshot selectedSnapshot = + new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData()); + return Futures.successful(Option.some(selectedSnapshot)); + } + + @Override public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + + if(snapshotList == null){ + snapshotList = new ArrayList<>(); + snapshots.put(snapshotMetadata.persistenceId(), snapshotList); + } + snapshotList.add(new Snapshot(snapshotMetadata, o)); + + return Futures.successful(null); + } + + @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { + } + + @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + + if(snapshotList == null){ + return; + } + + int deleteIndex = -1; + + for(int i=0;i snapshotList = snapshots.get(s); + + if(snapshotList == null){ + return; + } + + // TODO : This is a quick and dirty implementation. Do actual match later. + snapshotList.clear(); + snapshots.remove(s); + } + + private static class Snapshot { + private final SnapshotMetadata metadata; + private final Object data; + + private Snapshot(SnapshotMetadata metadata, Object data) { + this.metadata = metadata; + this.data = data; + } + + public SnapshotMetadata getMetadata() { + return metadata; + } + + public Object getData() { + return data; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 794b376af8..f0dadc618b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -1,4 +1,6 @@ akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] actor { @@ -14,6 +16,14 @@ akka { } } } + +in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +} + bounded-mailbox { mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java index 88336526e9..ac62974d29 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java @@ -12,6 +12,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl; @@ -96,22 +97,23 @@ public final class DomInmemoryDataBrokerModule extends newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE); commitStatsMXBean.registerMBean(); - final ThreadExecutorStatsMXBeanImpl commitExecutorStatsMXBean = - new ThreadExecutorStatsMXBeanImpl(commitExecutor, "CommitExecutorStats", + final AbstractMXBean commitExecutorStatsMXBean = + ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats", JMX_BEAN_TYPE, null); - commitExecutorStatsMXBean.registerMBean(); - - final ThreadExecutorStatsMXBeanImpl commitFutureStatsMXBean = - new ThreadExecutorStatsMXBeanImpl(listenableFutureExecutor, + final AbstractMXBean commitFutureStatsMXBean = + ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor, "CommitFutureExecutorStats", JMX_BEAN_TYPE, null); - commitFutureStatsMXBean.registerMBean(); newDataBroker.setCloseable(new AutoCloseable() { @Override public void close() { commitStatsMXBean.unregisterMBean(); - commitExecutorStatsMXBean.unregisterMBean(); - commitFutureStatsMXBean.unregisterMBean(); + if (commitExecutorStatsMXBean != null) { + commitExecutorStatsMXBean.unregisterMBean(); + } + if (commitFutureStatsMXBean != null) { + commitFutureStatsMXBean.unregisterMBean(); + } } }); diff --git a/opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java b/opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java index 96ddb9e0ce..d1f11ba9a3 100644 --- a/opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java +++ b/opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java @@ -492,13 +492,15 @@ public class XSQLAdapter extends Thread implements SchemaContextListener { out.print(prompt); char c = 0; byte data[] = new byte[1]; - while (c != '\n') { + while (!socket.isClosed() && socket.isConnected() && !socket.isInputShutdown() && c != '\n') { try { in.read(data); c = (char) data[0]; inputString.append(c); } catch (Exception err) { err.printStackTrace(out); + stopped = true; + break; } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 3d61c7b6b6..74fa73afb9 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -14,7 +14,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -80,29 +79,26 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(); private final ListenerTree listenerTree = ListenerTree.create(); private final AtomicLong txCounter = new AtomicLong(0); - private final ListeningExecutorService listeningExecutor; private final QueuedNotificationManager, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager; private final ExecutorService dataChangeListenerExecutor; - - private final ExecutorService domStoreExecutor; + private final ListeningExecutorService commitExecutor; private final boolean debugTransactions; private final String name; private volatile AutoCloseable closeable; - public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor, + public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor, final ExecutorService dataChangeListenerExecutor) { - this(name, domStoreExecutor, dataChangeListenerExecutor, + this(name, commitExecutor, dataChangeListenerExecutor, InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false); } - public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor, + public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor, final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize, final boolean debugTransactions) { this.name = Preconditions.checkNotNull(name); - this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor); - this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor); + this.commitExecutor = Preconditions.checkNotNull(commitExecutor); this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor); this.debugTransactions = debugTransactions; @@ -121,7 +117,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D } public ExecutorService getDomStoreExecutor() { - return domStoreExecutor; + return commitExecutor; } @Override @@ -156,7 +152,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D @Override public void close() { - ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS); + ExecutorServiceUtil.tryGracefulShutdown(commitExecutor, 30, TimeUnit.SECONDS); ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS); if(closeable != null) { @@ -386,7 +382,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D @Override public ListenableFuture canCommit() { - return listeningExecutor.submit(new Callable() { + return commitExecutor.submit(new Callable() { @Override public Boolean call() throws TransactionCommitFailedException { try { @@ -410,7 +406,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D @Override public ListenableFuture preCommit() { - return listeningExecutor.submit(new Callable() { + return commitExecutor.submit(new Callable() { @Override public Void call() { candidate = dataTree.prepare(modification); diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java index dc1482c6ab..2ee8e182c2 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.md.sal.dom.store.impl; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; import org.opendaylight.controller.sal.core.api.model.SchemaService; @@ -57,7 +59,7 @@ public final class InMemoryDOMDataStoreFactory { @Nullable final InMemoryDOMDataStoreConfigProperties properties) { InMemoryDOMDataStoreConfigProperties actualProperties = properties; - if(actualProperties == null) { + if (actualProperties == null) { actualProperties = InMemoryDOMDataStoreConfigProperties.getDefault(); } @@ -65,21 +67,18 @@ public final class InMemoryDOMDataStoreFactory { // task execution time to get higher throughput as DataChangeListeners typically provide // much of the business logic for a data model. If the executor queue size limit is reached, // subsequent submitted notifications will block the calling thread. - int dclExecutorMaxQueueSize = actualProperties.getMaxDataChangeExecutorQueueSize(); int dclExecutorMaxPoolSize = actualProperties.getMaxDataChangeExecutorPoolSize(); ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool( dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" ); - ExecutorService domStoreExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( - actualProperties.getMaxDataStoreExecutorQueueSize(), "DOMStore-" + name ); - - InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name, - domStoreExecutor, dataChangeListenerExecutor, + final ListeningExecutorService commitExecutor = MoreExecutors.sameThreadExecutor(); + final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name, + commitExecutor, dataChangeListenerExecutor, actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions); - if(schemaService != null) { + if (schemaService != null) { schemaService.registerSchemaContextListener(dataStore); } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/jmx/InMemoryDataStoreStats.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/jmx/InMemoryDataStoreStats.java index b3608eceef..e00be2446a 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/jmx/InMemoryDataStoreStats.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/jmx/InMemoryDataStoreStats.java @@ -9,7 +9,7 @@ package org.opendaylight.controller.md.sal.dom.store.impl.jmx; import java.util.concurrent.ExecutorService; - +import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; @@ -21,24 +21,28 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; */ public class InMemoryDataStoreStats implements AutoCloseable { - private final ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean; - private final ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean; + private final AbstractMXBean notificationExecutorStatsBean; + private final AbstractMXBean dataStoreExecutorStatsBean; private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean; - public InMemoryDataStoreStats(String mBeanType, QueuedNotificationManager manager, - ExecutorService dataStoreExecutor) { + public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager manager, + final ExecutorService dataStoreExecutor) { - this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager, + notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager, "notification-manager", mBeanType, null); notificationManagerStatsBean.registerMBean(); - this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(), + notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(), "notification-executor", mBeanType, null); - this.notificationExecutorStatsBean.registerMBean(); + if (notificationExecutorStatsBean != null) { + notificationExecutorStatsBean.registerMBean(); + } - this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dataStoreExecutor, + dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dataStoreExecutor, "data-store-executor", mBeanType, null); - this.dataStoreExecutorStatsBean.registerMBean(); + if (dataStoreExecutorStatsBean != null) { + dataStoreExecutorStatsBean.registerMBean(); + } } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java index 2e355d4f51..c82a72eaa5 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java @@ -38,8 +38,9 @@ public class RemoteRpcProviderFactory { Thread.currentThread().getContextClassLoader()); Config actorSystemConfig = config.get(); - LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render()); + } if (config.isMetricCaptureEnabled()) { LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.", config.getActorSystemName()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java index 98cf6a329f..2aaac5a78e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java @@ -53,7 +53,9 @@ public class RoutedRpcListener implements RouteChangeListener> announcements) { - LOG.debug("Announcing [{}]", announcements); + if(LOG.isDebugEnabled()) { + LOG.debug("Announcing [{}]", announcements); + } RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements)); rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); } @@ -63,7 +65,9 @@ public class RoutedRpcListener implements RouteChangeListener> removals){ - LOG.debug("Removing [{}]", removals); + if(LOG.isDebugEnabled()) { + LOG.debug("Removing [{}]", removals); + } RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals)); rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 6b02235dc7..2046e419d9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -79,8 +79,9 @@ public class RpcBroker extends AbstractUntypedActor { } private void invokeRemoteRpc(final InvokeRpc msg) { - LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); + } RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl( null, msg.getRpc(), msg.getIdentifier()); RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); @@ -147,8 +148,9 @@ public class RpcBroker extends AbstractUntypedActor { } private void executeRpc(final ExecuteRpc msg) { - LOG.debug("Executing rpc {}", msg.getRpc()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Executing rpc {}", msg.getRpc()); + } Future> future = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext)); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java index dee98521ae..22879dda2f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java @@ -31,7 +31,9 @@ public class RpcListener implements RpcRegistrationListener{ @Override public void onRpcImplementationAdded(QName rpc) { - LOG.debug("Adding registration for [{}]", rpc); + if(LOG.isDebugEnabled()) { + LOG.debug("Adding registration for [{}]", rpc); + } RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc, null); List> routeIds = new ArrayList<>(); routeIds.add(routeId); @@ -41,7 +43,9 @@ public class RpcListener implements RpcRegistrationListener{ @Override public void onRpcImplementationRemoved(QName rpc) { - LOG.debug("Removing registration for [{}]", rpc); + if(LOG.isDebugEnabled()) { + LOG.debug("Removing registration for [{}]", rpc); + } RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc, null); List> routeIds = new ArrayList<>(); routeIds.add(routeId); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java index abe2008c29..48ccd824d4 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java @@ -25,7 +25,9 @@ public class TerminationMonitor extends UntypedActor{ @Override public void onReceive(Object message) throws Exception { if(message instanceof Terminated){ Terminated terminated = (Terminated) message; - LOG.debug("Actor terminated : {}", terminated.actor()); + if(LOG.isDebugEnabled()) { + LOG.debug("Actor terminated : {}", terminated.actor()); + } }else if(message instanceof Monitor){ Monitor monitor = (Monitor) message; getContext().watch(monitor.getActorRef()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 3de3fc00d0..b50dfb1ba3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -111,7 +111,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering { receiveUpdateRemoteBuckets( ((UpdateRemoteBuckets) message).getBuckets()); } else { - log.debug("Unhandled message [{}]", message); + if(log.isDebugEnabled()) { + log.debug("Unhandled message [{}]", message); + } unhandled(message); } } @@ -236,8 +238,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering { versions.put(entry.getKey(), remoteVersion); } } - - log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + if(log.isDebugEnabled()) { + log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + } } /// diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 85c6ebe26f..1bbcc69f5e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -170,7 +170,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } clusterMembers.remove(member.address()); - log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + if(log.isDebugEnabled()) { + log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } } /** @@ -184,8 +186,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { if (!clusterMembers.contains(member.address())) clusterMembers.add(member.address()); - - log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + if(log.isDebugEnabled()) { + log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + } } /** @@ -205,8 +208,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); remoteMemberToGossipTo = clusterMembers.get(randomIndex); } - - log.debug("Gossiping to [{}]", remoteMemberToGossipTo); + if(log.isDebugEnabled()) { + log.debug("Gossiping to [{}]", remoteMemberToGossipTo); + } getLocalStatusAndSendTo(remoteMemberToGossipTo); } @@ -244,7 +248,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { void receiveGossip(GossipEnvelope envelope){ //TODO: Add more validations if (!selfAddress.equals(envelope.to())) { - log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); + if(log.isDebugEnabled()) { + log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); + } return; } @@ -291,7 +297,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { ActorSelection remoteRef = getContext().system().actorSelection( remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); - log.debug("Sending bucket versions to [{}]", remoteRef); + if(log.isDebugEnabled()) { + log.debug("Sending bucket versions to [{}]", remoteRef); + } futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher()); @@ -416,7 +424,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { public Void apply(Object msg) { if (msg instanceof GetBucketsByMembersReply) { Map buckets = ((GetBucketsByMembersReply) msg).getBuckets(); - log.debug("Buckets to send from {}: {}", selfAddress, buckets); + if(log.isDebugEnabled()) { + log.debug("Buckets to send from {}: {}", selfAddress, buckets); + } GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); sender.tell(envelope, getSelf()); } diff --git a/opendaylight/md-sal/topology-manager/pom.xml b/opendaylight/md-sal/topology-manager/pom.xml index fe1813a199..57313d2948 100644 --- a/opendaylight/md-sal/topology-manager/pom.xml +++ b/opendaylight/md-sal/topology-manager/pom.xml @@ -40,6 +40,16 @@ org.osgi.core provided + + junit + junit + test + + + org.mockito + mockito-all + test + diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java index c1996f4691..361373d78d 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java @@ -15,9 +15,9 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode; import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId; +import java.util.Collection; import java.util.Collections; import java.util.List; - import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -50,17 +50,19 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener { - private final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class); + private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class); private final InstanceIdentifier topology; private final OperationProcessor processor; - FlowCapableTopologyExporter(final OperationProcessor processor, final InstanceIdentifier topology) { + FlowCapableTopologyExporter(final OperationProcessor processor, + final InstanceIdentifier topology) { this.processor = Preconditions.checkNotNull(processor); this.topology = Preconditions.checkNotNull(topology); } @@ -73,15 +75,14 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open processor.enqueueOperation(new TopologyOperation() { @Override - public void applyOperation(final ReadWriteTransaction transaction) { - removeAffectedLinks(nodeId); + public void applyOperation(ReadWriteTransaction transaction) { + removeAffectedLinks(nodeId, transaction); + transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance); } - }); - processor.enqueueOperation(new TopologyOperation() { @Override - public void applyOperation(ReadWriteTransaction transaction) { - transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance); + public String toString() { + return "onNodeRemoved"; } }); } @@ -97,6 +98,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open final InstanceIdentifier path = getNodePath(toTopologyNodeId(notification.getId())); transaction.merge(LogicalDatastoreType.OPERATIONAL, path, node, true); } + + @Override + public String toString() { + return "onNodeUpdated"; + } }); } } @@ -104,28 +110,30 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open @Override public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) { - final InstanceIdentifier tpInstance = toTerminationPointIdentifier(notification - .getNodeConnectorRef()); + final InstanceIdentifier tpInstance = toTerminationPointIdentifier( + notification.getNodeConnectorRef()); - processor.enqueueOperation(new TopologyOperation() { - @Override - public void applyOperation(final ReadWriteTransaction transaction) { - final TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId()); - removeAffectedLinks(tpId); - } - }); + final TpId tpId = toTerminationPointId(getNodeConnectorKey( + notification.getNodeConnectorRef()).getId()); processor.enqueueOperation(new TopologyOperation() { @Override public void applyOperation(ReadWriteTransaction transaction) { + removeAffectedLinks(tpId, transaction); transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance); } + + @Override + public String toString() { + return "onNodeConnectorRemoved"; + } }); } @Override public void onNodeConnectorUpdated(final NodeConnectorUpdated notification) { - final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class); + final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation( + FlowCapableNodeConnectorUpdated.class); if (fcncu != null) { processor.enqueueOperation(new TopologyOperation() { @Override @@ -137,9 +145,14 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open transaction.merge(LogicalDatastoreType.OPERATIONAL, path, point, true); if ((fcncu.getState() != null && fcncu.getState().isLinkDown()) || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) { - removeAffectedLinks(point.getTpId()); + removeAffectedLinks(point.getTpId(), transaction); } } + + @Override + public String toString() { + return "onNodeConnectorUpdated"; + } }); } } @@ -153,6 +166,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open final InstanceIdentifier path = linkPath(link); transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true); } + + @Override + public String toString() { + return "onLinkDiscovered"; + } }); } @@ -168,6 +186,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open public void applyOperation(final ReadWriteTransaction transaction) { transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification))); } + + @Override + public String toString() { + return "onLinkRemoved"; + } }); } @@ -188,62 +211,92 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId())); } - private void removeAffectedLinks(final NodeId id) { - processor.enqueueOperation(new TopologyOperation() { + private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) { + CheckedFuture, ReadFailedException> topologyDataFuture = + transaction.read(LogicalDatastoreType.OPERATIONAL, topology); + Futures.addCallback(topologyDataFuture, new FutureCallback>() { @Override - public void applyOperation(final ReadWriteTransaction transaction) { - CheckedFuture, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology); - Futures.addCallback(topologyDataFuture, new FutureCallback>() { - @Override - public void onSuccess(Optional topologyOptional) { - if (topologyOptional.isPresent()) { - List linkList = topologyOptional.get().getLink() != null - ? topologyOptional.get().getLink() : Collections. emptyList(); - for (Link link : linkList) { - if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) { - transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link)); - } - } - } - } + public void onSuccess(Optional topologyOptional) { + removeAffectedLinks(id, topologyOptional); + } - @Override - public void onFailure(Throwable throwable) { - LOG.error("Error reading topology data for topology {}", topology, throwable); - } - }); + @Override + public void onFailure(Throwable throwable) { + LOG.error("Error reading topology data for topology {}", topology, throwable); } }); } - private void removeAffectedLinks(final TpId id) { - processor.enqueueOperation(new TopologyOperation() { - @Override - public void applyOperation(final ReadWriteTransaction transaction) { - CheckedFuture, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology); - Futures.addCallback(topologyDataFuture, new FutureCallback>() { - @Override - public void onSuccess(Optional topologyOptional) { - if (topologyOptional.isPresent()) { - List linkList = topologyOptional.get().getLink() != null - ? topologyOptional.get().getLink() : Collections. emptyList(); - for (Link link : linkList) { - if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) { - transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link)); - } - } - } - } + private void removeAffectedLinks(final NodeId id, Optional topologyOptional) { + if (!topologyOptional.isPresent()) { + return; + } + + List linkList = topologyOptional.get().getLink() != null ? + topologyOptional.get().getLink() : Collections. emptyList(); + final List> linkIDsToDelete = Lists.newArrayList(); + for (Link link : linkList) { + if (id.equals(link.getSource().getSourceNode()) || + id.equals(link.getDestination().getDestNode())) { + linkIDsToDelete.add(linkPath(link)); + } + } + + enqueueLinkDeletes(linkIDsToDelete); + } - @Override - public void onFailure(Throwable throwable) { - LOG.error("Error reading topology data for topology {}", topology, throwable); + private void enqueueLinkDeletes(final Collection> linkIDsToDelete) { + if(!linkIDsToDelete.isEmpty()) { + processor.enqueueOperation(new TopologyOperation() { + @Override + public void applyOperation(ReadWriteTransaction transaction) { + for(InstanceIdentifier linkID: linkIDsToDelete) { + transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID); } - }); + } + + @Override + public String toString() { + return "Delete Links " + linkIDsToDelete.size(); + } + }); + } + } + + private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) { + CheckedFuture, ReadFailedException> topologyDataFuture = + transaction.read(LogicalDatastoreType.OPERATIONAL, topology); + Futures.addCallback(topologyDataFuture, new FutureCallback>() { + @Override + public void onSuccess(Optional topologyOptional) { + removeAffectedLinks(id, topologyOptional); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.error("Error reading topology data for topology {}", topology, throwable); } }); } + private void removeAffectedLinks(final TpId id, Optional topologyOptional) { + if (!topologyOptional.isPresent()) { + return; + } + + List linkList = topologyOptional.get().getLink() != null + ? topologyOptional.get().getLink() : Collections. emptyList(); + final List> linkIDsToDelete = Lists.newArrayList(); + for (Link link : linkList) { + if (id.equals(link.getSource().getSourceTp()) || + id.equals(link.getDestination().getDestTp())) { + linkIDsToDelete.add(linkPath(link)); + } + } + + enqueueLinkDeletes(linkIDsToDelete); + } + private InstanceIdentifier getNodePath(final NodeId nodeId) { return topology.child(Node.class, new NodeKey(nodeId)); } diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java index 1cf648eb97..f09da00459 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java @@ -11,14 +11,17 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; + import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +53,9 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh for (; ; ) { TopologyOperation op = queue.take(); - LOG.debug("New operations available, starting transaction"); - final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); + LOG.debug("New {} operation available, starting transaction", op); + final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); int ops = 0; do { @@ -64,14 +67,16 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh } else { op = null; } + + LOG.debug("Next operation {}", op); } while (op != null); LOG.debug("Processed {} operations, submitting transaction", ops); - final CheckedFuture txResultFuture = tx.submit(); - Futures.addCallback(txResultFuture, new FutureCallback() { + CheckedFuture txResultFuture = tx.submit(); + Futures.addCallback(txResultFuture, new FutureCallback() { @Override - public void onSuccess(Object o) { + public void onSuccess(Void notUsed) { LOG.debug("Topology export successful for tx :{}", tx.getIdentifier()); } diff --git a/opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java b/opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java new file mode 100644 index 0000000000..b7a56a4890 --- /dev/null +++ b/opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java @@ -0,0 +1,666 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.md.controller.topology.manager; + +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdatedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdatedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscoveredBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemovedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortConfig; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.flow.capable.port.StateBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemovedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdatedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNodeConnector; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.LinkId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Destination; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.DestinationBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Source; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.SourceBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.Uninterruptibles; + +public class FlowCapableTopologyExporterTest { + + @Mock + private DataBroker mockDataBroker; + + @Mock + private BindingTransactionChain mockTxChain; + + private OperationProcessor processor; + + private FlowCapableTopologyExporter exporter; + + private InstanceIdentifier topologyIID; + + private final ExecutorService executor = Executors.newFixedThreadPool(1); + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + doReturn(mockTxChain).when(mockDataBroker) + .createTransactionChain(any(TransactionChainListener.class)); + + processor = new OperationProcessor(mockDataBroker); + + topologyIID = InstanceIdentifier.create(NetworkTopology.class) + .child(Topology.class, new TopologyKey(new TopologyId("test"))); + exporter = new FlowCapableTopologyExporter(processor, topologyIID); + + executor.execute(processor); + } + + @After + public void tearDown() { + executor.shutdownNow(); + } + + @SuppressWarnings({ "rawtypes" }) + @Test + public void testOnNodeRemoved() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + InstanceIdentifier invNodeID = InstanceIdentifier.create(Nodes.class).child( + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, + nodeKey); + + List linkList = Arrays.asList( + newLink("link1", newSourceNode("node1"), newDestNode("dest")), + newLink("link2", newSourceNode("source"), newDestNode("node1")), + newLink("link2", newSourceNode("source2"), newDestNode("dest2"))); + final Topology topology = new TopologyBuilder().setLink(linkList).build(); + + InstanceIdentifier[] expDeletedIIDs = { + topologyIID.child(Link.class, linkList.get(0).getKey()), + topologyIID.child(Link.class, linkList.get(1).getKey()), + topologyIID.child(Node.class, new NodeKey(new NodeId("node1"))) + }; + + SettableFuture> readFuture = SettableFuture.create(); + ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class); + doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1) + .read(LogicalDatastoreType.OPERATIONAL, topologyIID); + + CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1); + + int expDeleteCalls = expDeletedIIDs.length; + CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls); + ArgumentCaptor deletedLinkIDs = + ArgumentCaptor.forClass(InstanceIdentifier.class); + setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch); + + ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class); + setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch); + CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2); + + doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build()); + + waitForSubmit(submitLatch1); + + setReadFutureAsync(topology, readFuture); + + waitForDeletes(expDeleteCalls, deleteLatch); + + waitForSubmit(submitLatch2); + + assertDeletedIDs(expDeletedIIDs, deletedLinkIDs); + + verifyMockTx(mockTx1); + verifyMockTx(mockTx2); + } + + @SuppressWarnings({ "rawtypes" }) + @Test + public void testOnNodeRemovedWithNoTopology() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + InstanceIdentifier invNodeID = InstanceIdentifier.create(Nodes.class).child( + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, + nodeKey); + + InstanceIdentifier[] expDeletedIIDs = { + topologyIID.child(Node.class, new NodeKey(new NodeId("node1"))) + }; + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx) + .read(LogicalDatastoreType.OPERATIONAL, topologyIID); + CountDownLatch submitLatch = setupStubbedSubmit(mockTx); + + CountDownLatch deleteLatch = new CountDownLatch(1); + ArgumentCaptor deletedLinkIDs = + ArgumentCaptor.forClass(InstanceIdentifier.class); + setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch); + + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build()); + + waitForSubmit(submitLatch); + + waitForDeletes(1, deleteLatch); + + assertDeletedIDs(expDeletedIIDs, deletedLinkIDs); + } + + @SuppressWarnings("rawtypes") + @Test + public void testOnNodeConnectorRemoved() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey = + newInvNodeConnKey("tp1"); + + InstanceIdentifier invNodeConnID = newNodeConnID(nodeKey, ncKey); + + List linkList = Arrays.asList( + newLink("link1", newSourceTp("tp1"), newDestTp("dest")), + newLink("link2", newSourceTp("source"), newDestTp("tp1")), + newLink("link3", newSourceTp("source2"), newDestTp("dest2"))); + final Topology topology = new TopologyBuilder().setLink(linkList).build(); + + InstanceIdentifier[] expDeletedIIDs = { + topologyIID.child(Link.class, linkList.get(0).getKey()), + topologyIID.child(Link.class, linkList.get(1).getKey()), + topologyIID.child(Node.class, new NodeKey(new NodeId("node1"))) + .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1"))) + }; + + final SettableFuture> readFuture = SettableFuture.create(); + ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class); + doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1) + .read(LogicalDatastoreType.OPERATIONAL, topologyIID); + + CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1); + + int expDeleteCalls = expDeletedIIDs.length; + CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls); + ArgumentCaptor deletedLinkIDs = + ArgumentCaptor.forClass(InstanceIdentifier.class); + setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch); + + ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class); + setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch); + CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2); + + doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef( + new NodeConnectorRef(invNodeConnID)).build()); + + waitForSubmit(submitLatch1); + + setReadFutureAsync(topology, readFuture); + + waitForDeletes(expDeleteCalls, deleteLatch); + + waitForSubmit(submitLatch2); + + assertDeletedIDs(expDeletedIIDs, deletedLinkIDs); + + verifyMockTx(mockTx1); + verifyMockTx(mockTx2); + } + + @SuppressWarnings("rawtypes") + @Test + public void testOnNodeConnectorRemovedWithNoTopology() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey = + newInvNodeConnKey("tp1"); + + InstanceIdentifier invNodeConnID = newNodeConnID(nodeKey, ncKey); + + InstanceIdentifier[] expDeletedIIDs = { + topologyIID.child(Node.class, new NodeKey(new NodeId("node1"))) + .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1"))) + }; + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx) + .read(LogicalDatastoreType.OPERATIONAL, topologyIID); + CountDownLatch submitLatch = setupStubbedSubmit(mockTx); + + CountDownLatch deleteLatch = new CountDownLatch(1); + ArgumentCaptor deletedLinkIDs = + ArgumentCaptor.forClass(InstanceIdentifier.class); + setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch); + + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef( + new NodeConnectorRef(invNodeConnID)).build()); + + waitForSubmit(submitLatch); + + waitForDeletes(1, deleteLatch); + + assertDeletedIDs(expDeletedIIDs, deletedLinkIDs); + } + + @Test + public void testOnNodeUpdated() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + InstanceIdentifier invNodeID = InstanceIdentifier.create(Nodes.class).child( + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, + nodeKey); + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + CountDownLatch submitLatch = setupStubbedSubmit(mockTx); + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeUpdated(new NodeUpdatedBuilder().setNodeRef(new NodeRef(invNodeID)) + .setId(nodeKey.getId()).addAugmentation(FlowCapableNodeUpdated.class, + new FlowCapableNodeUpdatedBuilder().build()).build()); + + waitForSubmit(submitLatch); + + ArgumentCaptor mergedNode = ArgumentCaptor.forClass(Node.class); + NodeId expNodeId = new NodeId("node1"); + verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(Node.class, + new NodeKey(expNodeId))), mergedNode.capture(), eq(true)); + assertEquals("getNodeId", expNodeId, mergedNode.getValue().getNodeId()); + InventoryNode augmentation = mergedNode.getValue().getAugmentation(InventoryNode.class); + assertNotNull("Missing augmentation", augmentation); + assertEquals("getInventoryNodeRef", new NodeRef(invNodeID), augmentation.getInventoryNodeRef()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testOnNodeConnectorUpdated() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey = + newInvNodeConnKey("tp1"); + + InstanceIdentifier invNodeConnID = newNodeConnID(nodeKey, ncKey); + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + CountDownLatch submitLatch = setupStubbedSubmit(mockTx); + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef( + new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation( + FlowCapableNodeConnectorUpdated.class, + new FlowCapableNodeConnectorUpdatedBuilder().build()).build()); + + waitForSubmit(submitLatch); + + ArgumentCaptor mergedNode = ArgumentCaptor.forClass(TerminationPoint.class); + NodeId expNodeId = new NodeId("node1"); + TpId expTpId = new TpId("tp1"); + InstanceIdentifier expTpPath = topologyIID.child( + Node.class, new NodeKey(expNodeId)).child(TerminationPoint.class, + new TerminationPointKey(expTpId)); + verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath), + mergedNode.capture(), eq(true)); + assertEquals("getTpId", expTpId, mergedNode.getValue().getTpId()); + InventoryNodeConnector augmentation = mergedNode.getValue().getAugmentation( + InventoryNodeConnector.class); + assertNotNull("Missing augmentation", augmentation); + assertEquals("getInventoryNodeConnectorRef", new NodeConnectorRef(invNodeConnID), + augmentation.getInventoryNodeConnectorRef()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testOnNodeConnectorUpdatedWithLinkStateDown() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey = + newInvNodeConnKey("tp1"); + + InstanceIdentifier invNodeConnID = newNodeConnID(nodeKey, ncKey); + + List linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest"))); + Topology topology = new TopologyBuilder().setLink(linkList).build(); + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx) + .read(LogicalDatastoreType.OPERATIONAL, topologyIID); + setupStubbedSubmit(mockTx); + + CountDownLatch deleteLatch = new CountDownLatch(1); + ArgumentCaptor deletedLinkIDs = + ArgumentCaptor.forClass(InstanceIdentifier.class); + setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch); + + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef( + new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation( + FlowCapableNodeConnectorUpdated.class, + new FlowCapableNodeConnectorUpdatedBuilder().setState( + new StateBuilder().setLinkDown(true).build()).build()).build()); + + waitForDeletes(1, deleteLatch); + + InstanceIdentifier expTpPath = topologyIID.child( + Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class, + new TerminationPointKey(new TpId("tp1"))); + + verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath), + any(TerminationPoint.class), eq(true)); + + assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class, + linkList.get(0).getKey())}, deletedLinkIDs); + } + + + @SuppressWarnings("rawtypes") + @Test + public void testOnNodeConnectorUpdatedWithPortDown() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + nodeKey = newInvNodeKey("node1"); + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey = + newInvNodeConnKey("tp1"); + + InstanceIdentifier invNodeConnID = newNodeConnID(nodeKey, ncKey); + + List linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest"))); + Topology topology = new TopologyBuilder().setLink(linkList).build(); + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx) + .read(LogicalDatastoreType.OPERATIONAL, topologyIID); + setupStubbedSubmit(mockTx); + + CountDownLatch deleteLatch = new CountDownLatch(1); + ArgumentCaptor deletedLinkIDs = + ArgumentCaptor.forClass(InstanceIdentifier.class); + setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch); + + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef( + new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation( + FlowCapableNodeConnectorUpdated.class, + new FlowCapableNodeConnectorUpdatedBuilder().setConfiguration( + new PortConfig(true, true, true, true)).build()).build()); + + waitForDeletes(1, deleteLatch); + + InstanceIdentifier expTpPath = topologyIID.child( + Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class, + new TerminationPointKey(new TpId("tp1"))); + + verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath), + any(TerminationPoint.class), eq(true)); + + assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class, + linkList.get(0).getKey())}, deletedLinkIDs); + } + + @Test + public void testOnLinkDiscovered() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + sourceNodeKey = newInvNodeKey("sourceNode"); + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey + sourceNodeConnKey = newInvNodeConnKey("sourceTP"); + InstanceIdentifier sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey); + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + destNodeKey = newInvNodeKey("destNode"); + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey + destNodeConnKey = newInvNodeConnKey("destTP"); + InstanceIdentifier destConnID = newNodeConnID(destNodeKey, destNodeConnKey); + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + CountDownLatch submitLatch = setupStubbedSubmit(mockTx); + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onLinkDiscovered(new LinkDiscoveredBuilder().setSource( + new NodeConnectorRef(sourceConnID)).setDestination( + new NodeConnectorRef(destConnID)).build()); + + waitForSubmit(submitLatch); + + ArgumentCaptor mergedNode = ArgumentCaptor.forClass(Link.class); + verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child( + Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))), + mergedNode.capture(), eq(true)); + assertEquals("Source node ID", "sourceNode", + mergedNode.getValue().getSource().getSourceNode().getValue()); + assertEquals("Dest TP ID", "sourceTP", + mergedNode.getValue().getSource().getSourceTp().getValue()); + assertEquals("Dest node ID", "destNode", + mergedNode.getValue().getDestination().getDestNode().getValue()); + assertEquals("Dest TP ID", "destTP", + mergedNode.getValue().getDestination().getDestTp().getValue()); + } + + @Test + public void testOnLinkRemoved() { + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + sourceNodeKey = newInvNodeKey("sourceNode"); + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey + sourceNodeConnKey = newInvNodeConnKey("sourceTP"); + InstanceIdentifier sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey); + + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + destNodeKey = newInvNodeKey("destNode"); + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey + destNodeConnKey = newInvNodeConnKey("destTP"); + InstanceIdentifier destConnID = newNodeConnID(destNodeKey, destNodeConnKey); + + ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class); + CountDownLatch submitLatch = setupStubbedSubmit(mockTx); + doReturn(mockTx).when(mockTxChain).newReadWriteTransaction(); + + exporter.onLinkRemoved(new LinkRemovedBuilder().setSource( + new NodeConnectorRef(sourceConnID)).setDestination( + new NodeConnectorRef(destConnID)).build()); + + waitForSubmit(submitLatch); + + verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child( + Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))); + } + + private void verifyMockTx(ReadWriteTransaction mockTx) { + InOrder inOrder = inOrder(mockTx); + inOrder.verify(mockTx, atLeast(0)).submit(); + inOrder.verify(mockTx, never()).delete(eq(LogicalDatastoreType.OPERATIONAL), + any(InstanceIdentifier.class)); + } + + @SuppressWarnings("rawtypes") + private void assertDeletedIDs(InstanceIdentifier[] expDeletedIIDs, + ArgumentCaptor deletedLinkIDs) { + Set actualIIDs = new HashSet<>(deletedLinkIDs.getAllValues()); + for(InstanceIdentifier id: expDeletedIIDs) { + assertTrue("Missing expected deleted IID " + id, actualIIDs.contains(id)); + } + } + + private void setReadFutureAsync(final Topology topology, + final SettableFuture> readFuture) { + new Thread() { + @Override + public void run() { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + readFuture.set(Optional.of(topology)); + } + + }.start(); + } + + private void waitForSubmit(CountDownLatch latch) { + assertEquals("Transaction submitted", true, + Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS)); + } + + private void waitForDeletes(int expDeleteCalls, final CountDownLatch latch) { + boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS); + if(!done) { + fail("Expected " + expDeleteCalls + " delete calls. Actual: " + + (expDeleteCalls - latch.getCount())); + } + } + + private CountDownLatch setupStubbedSubmit(ReadWriteTransaction mockTx) { + final CountDownLatch latch = new CountDownLatch(1); + doAnswer(new Answer>() { + @Override + public CheckedFuture answer( + InvocationOnMock invocation) { + latch.countDown(); + return Futures.immediateCheckedFuture(null); + } + }).when(mockTx).submit(); + + return latch; + } + + @SuppressWarnings("rawtypes") + private void setupStubbedDeletes(ReadWriteTransaction mockTx, + ArgumentCaptor deletedLinkIDs, final CountDownLatch latch) { + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + latch.countDown(); + return null; + } + }).when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), deletedLinkIDs.capture()); + } + + private org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey + newInvNodeKey(String id) { + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey = + new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey( + new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory. + rev130819.NodeId(id)); + return nodeKey; + } + + private NodeConnectorKey newInvNodeConnKey(String id) { + return new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey( + new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819. + NodeConnectorId(id)); + } + + private KeyedInstanceIdentifier newNodeConnID( + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey, + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey) { + return InstanceIdentifier.create(Nodes.class).child( + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, + nodeKey).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory. + rev130819.node.NodeConnector.class, ncKey); + } + + private Link newLink(String id, Source source, Destination dest) { + return new LinkBuilder().setLinkId(new LinkId(id)) + .setSource(source).setDestination(dest).build(); + } + + private Destination newDestTp(String id) { + return new DestinationBuilder().setDestTp(new TpId(id)).build(); + } + + private Source newSourceTp(String id) { + return new SourceBuilder().setSourceTp(new TpId(id)).build(); + } + + private Destination newDestNode(String id) { + return new DestinationBuilder().setDestNode(new NodeId(id)).build(); + } + + private Source newSourceNode(String id) { + return new SourceBuilder().setSourceNode(new NodeId(id)).build(); + } +}