Merge "Bug 1819 - Moved bundle up in features.xml to avoid exception in log Change...
authorTony Tkacik <ttkacik@cisco.com>
Tue, 16 Sep 2014 10:32:53 +0000 (10:32 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 16 Sep 2014 10:32:53 +0000 (10:32 +0000)
47 files changed:
opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/FlowForwarder.java
opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/GroupForwarder.java
opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/frm/impl/MeterForwarder.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java with 87% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto [moved from opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto with 82% similarity]
opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/ThreadExecutorStatsMXBeanImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/jmx/InMemoryDataStoreStats.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/topology-manager/pom.xml
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java [new file with mode: 0644]

index e0c16a080676691080def9e86e12d22fe87883b0..9951bf744810dd3228de7815bb3c240d2b950e81 100644 (file)
@@ -77,7 +77,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
             final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
             builder.setFlowRef(new FlowRef(identifier));
-            builder.setNode(new NodeRef(nodeIdent));
+            builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
             this.provider.getSalFlowService().removeFlow(builder.build());
@@ -93,7 +93,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (tableIdValidationPrecondition(tableKey, update)) {
             final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
 
-            builder.setNode(new NodeRef(nodeIdent));
+            builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
             builder.setFlowRef(new FlowRef(identifier));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
             builder.setUpdatedFlow((new UpdatedFlowBuilder(update)).build());
@@ -112,7 +112,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (tableIdValidationPrecondition(tableKey, addDataObj)) {
             final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
 
-            builder.setNode(new NodeRef(nodeIdent));
+            builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
             builder.setFlowRef(new FlowRef(identifier));
             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
index 72e35ce8dbd84b2b22c766c3a9da3d893b855872..1b2c5323233edb3d30d04306801bee798e581288 100644 (file)
@@ -78,7 +78,7 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
         final Group group = (removeDataObj);
         final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group);
 
-        builder.setNode(new NodeRef(nodeIdent));
+        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setGroupRef(new GroupRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
         this.provider.getSalGroupService().removeGroup(builder.build());
@@ -93,7 +93,7 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
         final Group updatedGroup = (update);
         final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder();
 
-        builder.setNode(new NodeRef(nodeIdent));
+        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setGroupRef(new GroupRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
         builder.setUpdatedGroup((new UpdatedGroupBuilder(updatedGroup)).build());
@@ -109,7 +109,7 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
         final Group group = (addDataObj);
         final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
 
-        builder.setNode(new NodeRef(nodeIdent));
+        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setGroupRef(new GroupRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
         this.provider.getSalGroupService().addGroup(builder.build());
index 8a805b029729116b8094d13ef504e05bdd918262..2f3de2a171f2a0c2e0f07385ca503c69761252a9 100644 (file)
@@ -77,7 +77,7 @@ public class MeterForwarder extends AbstractListeningCommiter<Meter> {
 
         final RemoveMeterInputBuilder builder = new RemoveMeterInputBuilder(removeDataObj);
 
-        builder.setNode(new NodeRef(nodeIdent));
+        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setMeterRef(new MeterRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
         this.provider.getSalMeterService().removeMeter(builder.build());
@@ -90,7 +90,7 @@ public class MeterForwarder extends AbstractListeningCommiter<Meter> {
 
         final UpdateMeterInputBuilder builder = new UpdateMeterInputBuilder();
 
-        builder.setNode(new NodeRef(nodeIdent));
+        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setMeterRef(new MeterRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
         builder.setUpdatedMeter((new UpdatedMeterBuilder(update)).build());
@@ -105,7 +105,7 @@ public class MeterForwarder extends AbstractListeningCommiter<Meter> {
 
         final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj);
 
-        builder.setNode(new NodeRef(nodeIdent));
+        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setMeterRef(new MeterRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
         this.provider.getSalMeterService().addMeter(builder.build());
index c4ff108611d9fbdb177f2ef4ace98bb030d69991..3bfdf732cf01cd3d3898158bc4b8e62585e5a9f5 100644 (file)
@@ -67,11 +67,15 @@ public class ExampleActor extends RaftActor {
             }
 
         } else if (message instanceof PrintState) {
-            LOG.debug("State of the node:{} has entries={}, {}",
-                getId(), state.size(), getReplicatedLogState());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("State of the node:{} has entries={}, {}",
+                    getId(), state.size(), getReplicatedLogState());
+            }
 
         } else if (message instanceof PrintRole) {
-            LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+            }
 
         } else {
             super.onReceiveCommand(message);
@@ -106,7 +110,9 @@ public class ExampleActor extends RaftActor {
         } catch (Exception e) {
            LOG.error("Exception in applying snapshot", e);
         }
-        LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+        }
     }
 
     private ByteString fromObject(Object snapshot) throws Exception {
index 75c237f5035e57abd61c839835ec3c78548a6157..9d06f6360473097beefbbce34962d7433f447f88 100644 (file)
@@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class DefaultConfigParamsImpl implements ConfigParams {
 
-    private static final int SNAPSHOT_BATCH_COUNT = 100000;
+    private static final int SNAPSHOT_BATCH_COUNT = 20000;
 
     /**
      * The maximum election time variance
index 778f5c68f6551e4a9ffe59c88c4b3c9921d3fa42..91bbeeca504b607147e39927dae1481ee9c4df6e 100644 (file)
@@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     @Override public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.info("SnapshotOffer called..");
             SnapshotOffer offer = (SnapshotOffer) message;
             Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -135,10 +135,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
 
-            LOG.debug("Applied snapshot to replicatedLog. " +
-                "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+            LOG.info("Applied snapshot to replicatedLog. " +
+                    "snapshotIndex={}, snapshotTerm={}, journal-size={}",
                 replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
-                replicatedLog.size());
+                replicatedLog.size()
+            );
 
             // Apply the snapshot to the actors state
             applySnapshot(ByteString.copyFrom(snapshot.getState()));
@@ -156,13 +157,16 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else if (message instanceof UpdateElectionTerm) {
             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
-            LOG.debug(
-                "RecoveryCompleted - Switching actor to Follower - " +
-                    "Persistence Id =  " + persistenceId() +
-                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                    "journal-size={}",
-                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                replicatedLog.snapshotTerm, replicatedLog.size());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "RecoveryCompleted - Switching actor to Follower - " +
+                        "Persistence Id =  " + persistenceId() +
+                        " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                        "journal-size={}",
+                    replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+                    replicatedLog.snapshotTerm, replicatedLog.size()
+                );
+            }
             currentBehavior = switchBehavior(RaftState.Follower);
             onStateChanged();
         }
@@ -172,9 +176,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
-            LOG.debug("Applying state for log index {} data {}",
-                applyState.getReplicatedLogEntry().getIndex(),
-                applyState.getReplicatedLogEntry().getData());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Applying state for log index {} data {}",
+                    applyState.getReplicatedLogEntry().getIndex(),
+                    applyState.getReplicatedLogEntry().getData());
+            }
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
@@ -182,9 +188,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
-            LOG.debug("ApplySnapshot called on Follower Actor " +
-                "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
-                snapshot.getLastAppliedTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("ApplySnapshot called on Follower Actor " +
+                        "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                    snapshot.getLastAppliedTerm()
+                );
+            }
             applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
@@ -236,23 +245,25 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.removePeer(rrp.getName());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("CaptureSnapshot received by actor");
+            LOG.info("CaptureSnapshot received by actor");
             CaptureSnapshot cs = (CaptureSnapshot)message;
             captureSnapshot = cs;
             createSnapshot();
 
         } else if (message instanceof CaptureSnapshotReply){
-            LOG.debug("CaptureSnapshotReply received by actor");
+            LOG.info("CaptureSnapshotReply received by actor");
             CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
 
             ByteString stateInBytes = csr.getSnapshot();
-            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
             handleCaptureSnapshotReply(stateInBytes);
 
         } else {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
-                LOG.debug("onReceiveCommand: message:" + message.getClass());
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("onReceiveCommand: message:" + message.getClass());
+                }
             }
 
             RaftState state =
@@ -293,7 +304,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
 
-        LOG.debug("Persist data {}", replicatedLogEntry);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Persist data {}", replicatedLogEntry);
+        }
 
         replicatedLog
             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
@@ -482,8 +495,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
             return null;
         }
         String peerAddress = context.getPeerAddress(leaderId);
-        LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
-            + peerAddress);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+                + peerAddress);
+        }
 
         return peerAddress;
     }
@@ -583,10 +598,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
                                 lastAppliedTerm = lastAppliedEntry.getTerm();
                             }
 
-                            LOG.debug("Snapshot Capture logSize: {}", journal.size());
-                            LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
-                            LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
-                            LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("Snapshot Capture logSize: {}", journal.size());
+                                LOG.debug("Snapshot Capture lastApplied:{} ",
+                                    context.getLastApplied());
+                                LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+                                LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                            }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
                             getSelf().tell(new CaptureSnapshot(
@@ -638,8 +656,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
 
         @Override public void update(long currentTerm, String votedFor) {
-            LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+            }
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;
         }
index 7e896fed29c4889f6aec5ce39436a1970a50e03b..35d563b784cf3f4705784a78952249ce06badbea 100644 (file)
@@ -272,6 +272,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return null;
     }
 
+    /**
+     * Find the client request tracker for a specific logIndex
+     *
+     * @param logIndex
+     * @return
+     */
+    protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+        return null;
+    }
+
+
     /**
      * Find the log index from the previous to last entry in the log
      *
@@ -311,7 +322,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
              i < index + 1; i++) {
             ActorRef clientActor = null;
             String identifier = null;
-            ClientRequestTracker tracker = findClientRequestTracker(i);
+            ClientRequestTracker tracker = removeClientRequestTracker(i);
 
             if (tracker != null) {
                 clientActor = tracker.getClientActor();
@@ -321,19 +332,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 context.getReplicatedLog().get(i);
 
             if (replicatedLogEntry != null) {
+                // Send a local message to the local RaftActor (it's derived class to be
+                // specific to apply the log to it's index)
                 actor().tell(new ApplyState(clientActor, identifier,
                     replicatedLogEntry), actor());
                 newLastApplied = i;
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
-                context.getLogger().error(
+                context.getLogger().warning(
                     "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
                 break;
             }
         }
-        // Send a local message to the local RaftActor (it's derived class to be
-        // specific to apply the log to it's index)
         context.getLogger().debug("Setting last applied to {}", newLastApplied);
         context.setLastApplied(newLastApplied);
     }
index 610fdc987fde7a1a51491ef25dc2f764011b9eda..1cfdf9dba8b912a5bc23f78b85c447621298d908 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.event.LoggingAdapter;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -38,9 +39,13 @@ import java.util.ArrayList;
 public class Follower extends AbstractRaftActorBehavior {
     private ByteString snapshotChunksCollected = ByteString.EMPTY;
 
+    private final LoggingAdapter LOG;
+
     public Follower(RaftActorContext context) {
         super(context);
 
+        LOG = context.getLogger();
+
         scheduleElection(electionDuration());
     }
 
@@ -48,8 +53,9 @@ public class Follower extends AbstractRaftActorBehavior {
         AppendEntries appendEntries) {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
-            context.getLogger()
-                .debug(appendEntries.toString());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(appendEntries.toString());
+            }
         }
 
         // TODO : Refactor this method into a bunch of smaller methods
@@ -79,9 +85,10 @@ public class Follower extends AbstractRaftActorBehavior {
             // an entry at prevLogIndex and this follower has no entries in
             // it's log.
 
-            context.getLogger().debug(
-                "The followers log is empty and the senders prevLogIndex is {}",
-                appendEntries.getPrevLogIndex());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
+                    appendEntries.getPrevLogIndex());
+            }
 
         } else if (lastIndex() > -1
             && appendEntries.getPrevLogIndex() != -1
@@ -90,9 +97,10 @@ public class Follower extends AbstractRaftActorBehavior {
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry was not found in it's log
 
-            context.getLogger().debug(
-                "The log is not empty but the prevLogIndex {} was not found in it",
-                appendEntries.getPrevLogIndex());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
+                    appendEntries.getPrevLogIndex());
+            }
 
         } else if (lastIndex() > -1
             && previousEntry != null
@@ -102,10 +110,12 @@ public class Follower extends AbstractRaftActorBehavior {
             // prevLogIndex entry does exist in the follower's log but it has
             // a different term in it
 
-            context.getLogger().debug(
-                "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
-                , previousEntry.getTerm()
-                , appendEntries.getPrevLogTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                    , previousEntry.getTerm()
+                    , appendEntries.getPrevLogTerm());
+            }
         } else {
             outOfSync = false;
         }
@@ -113,9 +123,12 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
-            context.getLogger().debug("Follower is out-of-sync, " +
-                "so sending negative reply, lastIndex():{}, lastTerm():{}",
-                lastIndex(), lastTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Follower is out-of-sync, " +
+                        "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                    lastIndex(), lastTerm()
+                );
+            }
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -125,10 +138,12 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (appendEntries.getEntries() != null
             && appendEntries.getEntries().size() > 0) {
-            context.getLogger().debug(
-                "Number of entries to be appended = " + appendEntries
-                    .getEntries().size()
-            );
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Number of entries to be appended = " + appendEntries
+                        .getEntries().size()
+                );
+            }
 
             // 3. If an existing entry conflicts with a new one (same index
             // but different terms), delete the existing entry and all that
@@ -151,10 +166,12 @@ public class Follower extends AbstractRaftActorBehavior {
                         continue;
                     }
 
-                    context.getLogger().debug(
-                        "Removing entries from log starting at "
-                            + matchEntry.getIndex()
-                    );
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug(
+                            "Removing entries from log starting at "
+                                + matchEntry.getIndex()
+                        );
+                    }
 
                     // Entries do not match so remove all subsequent entries
                     context.getReplicatedLog()
@@ -163,10 +180,12 @@ public class Follower extends AbstractRaftActorBehavior {
                 }
             }
 
-            context.getLogger().debug(
-                "After cleanup entries to be added from = " + (addEntriesFrom
-                    + lastIndex())
-            );
+            if(LOG.isDebugEnabled()) {
+                context.getLogger().debug(
+                    "After cleanup entries to be added from = " + (addEntriesFrom
+                        + lastIndex())
+                );
+            }
 
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
@@ -181,8 +200,9 @@ public class Follower extends AbstractRaftActorBehavior {
                     .appendAndPersist(appendEntries.getEntries().get(i));
             }
 
-            context.getLogger().debug(
-                "Log size is now " + context.getReplicatedLog().size());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Log size is now " + context.getReplicatedLog().size());
+            }
         }
 
 
@@ -195,8 +215,9 @@ public class Follower extends AbstractRaftActorBehavior {
             context.getReplicatedLog().lastIndex()));
 
         if (prevCommitIndex != context.getCommitIndex()) {
-            context.getLogger()
-                .debug("Commit index set to " + context.getCommitIndex());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Commit index set to " + context.getCommitIndex());
+            }
         }
 
         // If commitIndex > lastApplied: increment lastApplied, apply
@@ -204,10 +225,14 @@ public class Follower extends AbstractRaftActorBehavior {
         // check if there are any entries to be applied. last-applied can be equal to last-index
         if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
             context.getLastApplied() < lastIndex()) {
-            context.getLogger().debug("applyLogToStateMachine, " +
-                "appendEntries.getLeaderCommit():{}," +
-                "context.getLastApplied():{}, lastIndex():{}",
-                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("applyLogToStateMachine, " +
+                        "appendEntries.getLeaderCommit():{}," +
+                        "context.getLastApplied():{}, lastIndex():{}",
+                    appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
+                );
+            }
+
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -259,9 +284,13 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
-        context.getLogger().debug("InstallSnapshot received by follower " +
-            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
-            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("InstallSnapshot received by follower " +
+                    "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+                installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
+            );
+        }
 
         try {
             if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
@@ -283,8 +312,11 @@ public class Follower extends AbstractRaftActorBehavior {
             } else {
                 // we have more to go
                 snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
-                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
-                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
+                        installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+                }
             }
 
             sender.tell(new InstallSnapshotReply(
index 90948ffef7d8a5e1341bb8aede6b03ccf8dae344..199d2d61cf5bbbba34ad8cfa62709228331d2b0f 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.event.LoggingAdapter;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
@@ -80,9 +81,13 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private final int minReplicationCount;
 
+    private final LoggingAdapter LOG;
+
     public Leader(RaftActorContext context) {
         super(context);
 
+        LOG = context.getLogger();
+
         if (lastIndex() >= 0) {
             context.setCommitIndex(lastIndex());
         }
@@ -98,7 +103,9 @@ public class Leader extends AbstractRaftActorBehavior {
             followerToLog.put(followerId, followerLogInformation);
         }
 
-        context.getLogger().debug("Election:Leader has following peers:"+ followers);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Election:Leader has following peers:" + followers);
+        }
 
         if (followers.size() > 0) {
             minReplicationCount = (followers.size() + 1) / 2 + 1;
@@ -123,7 +130,9 @@ public class Leader extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
-        context.getLogger().debug(appendEntries.toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(appendEntries.toString());
+        }
 
         return state();
     }
@@ -132,8 +141,9 @@ public class Leader extends AbstractRaftActorBehavior {
         AppendEntriesReply appendEntriesReply) {
 
         if(! appendEntriesReply.isSuccess()) {
-            context.getLogger()
-                .debug(appendEntriesReply.toString());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(appendEntriesReply.toString());
+            }
         }
 
         // Update the FollowerLogInformation
@@ -142,7 +152,7 @@ public class Leader extends AbstractRaftActorBehavior {
             followerToLog.get(followerId);
 
         if(followerLogInformation == null){
-            context.getLogger().error("Unknown follower {}", followerId);
+            LOG.error("Unknown follower {}", followerId);
             return state();
         }
 
@@ -196,6 +206,16 @@ public class Leader extends AbstractRaftActorBehavior {
         return state();
     }
 
+    protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+
+        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
+        if(toRemove != null) {
+            trackerList.remove(toRemove);
+        }
+
+        return toRemove;
+    }
+
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         for (ClientRequestTracker tracker : trackerList) {
             if (tracker.getIndex() == logIndex) {
@@ -260,10 +280,13 @@ public class Leader extends AbstractRaftActorBehavior {
             if (reply.isSuccess()) {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
-                    context.getLogger().debug("InstallSnapshotReply received, " +
-                        "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
-                        reply.getChunkIndex(), followerId,
-                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("InstallSnapshotReply received, " +
+                                "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                            reply.getChunkIndex(), followerId,
+                            context.getReplicatedLog().getSnapshotIndex() + 1
+                        );
+                    }
 
                     FollowerLogInformation followerLogInformation =
                         followerToLog.get(followerId);
@@ -272,31 +295,38 @@ public class Leader extends AbstractRaftActorBehavior {
                     followerLogInformation.setNextIndex(
                         context.getReplicatedLog().getSnapshotIndex() + 1);
                     mapFollowerToSnapshot.remove(followerId);
-                    context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                        followerToLog.get(followerId).getNextIndex().get());
+
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                            followerToLog.get(followerId).getNextIndex().get());
+                    }
 
                 } else {
                     followerToSnapshot.markSendStatus(true);
                 }
             } else {
-                context.getLogger().info("InstallSnapshotReply received, " +
-                    "sending snapshot chunk failed, Will retry, Chunk:{}",
-                    reply.getChunkIndex());
+                LOG.info("InstallSnapshotReply received, " +
+                        "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex()
+                );
                 followerToSnapshot.markSendStatus(false);
             }
 
         } else {
-            context.getLogger().error("ERROR!!" +
-                "FollowerId in InstallSnapshotReply not known to Leader" +
-                " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+            LOG.error("ERROR!!" +
+                    "FollowerId in InstallSnapshotReply not known to Leader" +
+                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+            );
         }
     }
 
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        context.getLogger().debug("Replicate message " + logIndex);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Replicate message " + logIndex);
+        }
 
         // Create a tracker entry we will use this later to notify the
         // client actor
@@ -350,10 +380,13 @@ public class Leader extends AbstractRaftActorBehavior {
                         if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
                             // if the follower is just not starting and leader's index
                             // is more than followers index
-                            context.getLogger().debug("SendInstallSnapshot to follower:{}," +
-                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                "leader-last-index:{}", followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("SendInstallSnapshot to follower:{}," +
+                                        "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                        "leader-last-index:{}", followerId,
+                                    followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+                                );
+                            }
 
                             actor().tell(new SendInstallSnapshot(), actor());
                         } else {
@@ -412,11 +445,11 @@ public class Leader extends AbstractRaftActorBehavior {
                 ).toSerializable(),
                 actor()
             );
-            context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+            LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
         } catch (IOException e) {
-            context.getLogger().error("InstallSnapshot failed for Leader.", e);
+            LOG.error("InstallSnapshot failed for Leader.", e);
         }
     }
 
@@ -431,7 +464,9 @@ public class Leader extends AbstractRaftActorBehavior {
             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
         }
         ByteString nextChunk = followerToSnapshot.getNextChunk();
-        context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+        }
 
         return nextChunk;
     }
@@ -526,8 +561,10 @@ public class Leader extends AbstractRaftActorBehavior {
             int size = snapshotBytes.size();
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
-            context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
-                size, totalChunks);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
+                    size, totalChunks);
+            }
         }
 
         public ByteString getSnapshotBytes() {
@@ -591,8 +628,10 @@ public class Leader extends AbstractRaftActorBehavior {
                 }
             }
 
-            context.getLogger().debug("length={}, offset={},size={}",
-                snapshotLength, start, size);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("length={}, offset={},size={}",
+                    snapshotLength, start, size);
+            }
             return getSnapshotBytes().substring(start, start + size);
 
         }
index 9d40fa3d9edb3858969797e929776ddcba424333..c084cba82210823ada7f79a0edb35472ec6ed326 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 import com.google.protobuf.ByteString;
-import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
 public class InstallSnapshot extends AbstractRaftRPC {
 
index 73c9f96b82a0a582f4cf5e61b5d68c488f9bc198..c4ef51d968422533f9df668bb23fd56563dc2ad2 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapsho
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
-import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
index ef56d02a2efc245e8f7cb35f01bac9c1a8eba3d6..cf37cbdd005effaacb2294edd9308b8598e9788f 100644 (file)
@@ -17,7 +17,9 @@ public abstract class AbstractUntypedActor extends UntypedActor {
         Logging.getLogger(getContext().system(), this);
 
     public AbstractUntypedActor() {
-        LOG.debug("Actor created {}", getSelf());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Actor created {}", getSelf());
+        }
         getContext().
             system().
             actorSelection("user/termination-monitor").
@@ -27,11 +29,13 @@ public abstract class AbstractUntypedActor extends UntypedActor {
 
     @Override public void onReceive(Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
-        LOG.debug("Received message {}", messageType);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received message {}", messageType);
+        }
         handleReceive(message);
-
-        LOG.debug("Done handling message {}", messageType);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Done handling message {}", messageType);
+        }
     }
 
     protected abstract void handleReceive(Object message) throws Exception;
@@ -41,7 +45,9 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     }
 
     protected void unknownMessage(Object message) throws Exception {
-        LOG.debug("Received unhandled message {}", message);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received unhandled message {}", message);
+        }
         unhandled(message);
     }
 }
@@ -1,7 +1,7 @@
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // source: InstallSnapshot.proto
 
-package org.opendaylight.controller.cluster.raft.protobuff.messages;
+package org.opendaylight.controller.protobuff.messages.cluster.raft;
 
 public final class InstallSnapshotMessages {
   private InstallSnapshotMessages() {}
@@ -186,14 +186,14 @@ public final class InstallSnapshotMessages {
     }
     public static final com.google.protobuf.Descriptors.Descriptor
         getDescriptor() {
-      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
     }
 
     protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
-      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+      return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
           .ensureFieldAccessorsInitialized(
-              org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+              org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class);
     }
 
     public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
@@ -245,7 +245,7 @@ public final class InstallSnapshotMessages {
       if (ref instanceof java.lang.String) {
         return (java.lang.String) ref;
       } else {
-        com.google.protobuf.ByteString bs = 
+        com.google.protobuf.ByteString bs =
             (com.google.protobuf.ByteString) ref;
         java.lang.String s = bs.toStringUtf8();
         if (bs.isValidUtf8()) {
@@ -261,7 +261,7 @@ public final class InstallSnapshotMessages {
         getLeaderIdBytes() {
       java.lang.Object ref = leaderId_;
       if (ref instanceof java.lang.String) {
-        com.google.protobuf.ByteString b = 
+        com.google.protobuf.ByteString b =
             com.google.protobuf.ByteString.copyFromUtf8(
                 (java.lang.String) ref);
         leaderId_ = b;
@@ -442,53 +442,53 @@ public final class InstallSnapshotMessages {
       return super.writeReplace();
     }
 
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
         com.google.protobuf.ByteString data)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
         com.google.protobuf.ByteString data,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data, extensionRegistry);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
         byte[] data,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       return PARSER.parseFrom(data, extensionRegistry);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
         throws java.io.IOException {
       return PARSER.parseFrom(input);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
         java.io.InputStream input,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws java.io.IOException {
       return PARSER.parseFrom(input, extensionRegistry);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
         throws java.io.IOException {
       return PARSER.parseDelimitedFrom(input);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
         java.io.InputStream input,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws java.io.IOException {
       return PARSER.parseDelimitedFrom(input, extensionRegistry);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
         com.google.protobuf.CodedInputStream input)
         throws java.io.IOException {
       return PARSER.parseFrom(input);
     }
-    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+    public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
         com.google.protobuf.CodedInputStream input,
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws java.io.IOException {
@@ -497,7 +497,7 @@ public final class InstallSnapshotMessages {
 
     public static Builder newBuilder() { return Builder.create(); }
     public Builder newBuilderForType() { return newBuilder(); }
-    public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+    public static Builder newBuilder(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot prototype) {
       return newBuilder().mergeFrom(prototype);
     }
     public Builder toBuilder() { return newBuilder(this); }
@@ -513,20 +513,20 @@ public final class InstallSnapshotMessages {
      */
     public static final class Builder extends
         com.google.protobuf.GeneratedMessage.Builder<Builder>
-       implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+       implements org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshotOrBuilder {
       public static final com.google.protobuf.Descriptors.Descriptor
           getDescriptor() {
-        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+        return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
       }
 
       protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
           internalGetFieldAccessorTable() {
-        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+        return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
             .ensureFieldAccessorsInitialized(
-                org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+                org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class);
       }
 
-      // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+      // Construct using org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.newBuilder()
       private Builder() {
         maybeForceBuilderInitialization();
       }
@@ -569,23 +569,23 @@ public final class InstallSnapshotMessages {
 
       public com.google.protobuf.Descriptors.Descriptor
           getDescriptorForType() {
-        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+        return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
       }
 
-      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
-        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+      public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+        return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
       }
 
-      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
-        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+      public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot build() {
+        org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
         if (!result.isInitialized()) {
           throw newUninitializedMessageException(result);
         }
         return result;
       }
 
-      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
-        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+      public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+        org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot(this);
         int from_bitField0_ = bitField0_;
         int to_bitField0_ = 0;
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
@@ -622,16 +622,16 @@ public final class InstallSnapshotMessages {
       }
 
       public Builder mergeFrom(com.google.protobuf.Message other) {
-        if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
-          return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+        if (other instanceof org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) {
+          return mergeFrom((org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot)other);
         } else {
           super.mergeFrom(other);
           return this;
         }
       }
 
-      public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
-        if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+      public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot other) {
+        if (other == org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
         if (other.hasTerm()) {
           setTerm(other.getTerm());
         }
@@ -667,11 +667,11 @@ public final class InstallSnapshotMessages {
           com.google.protobuf.CodedInputStream input,
           com.google.protobuf.ExtensionRegistryLite extensionRegistry)
           throws java.io.IOException {
-        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+        org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
         try {
           parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
         } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-          parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+          parsedMessage = (org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
           throw e;
         } finally {
           if (parsedMessage != null) {
@@ -744,7 +744,7 @@ public final class InstallSnapshotMessages {
           getLeaderIdBytes() {
         java.lang.Object ref = leaderId_;
         if (ref instanceof String) {
-          com.google.protobuf.ByteString b = 
+          com.google.protobuf.ByteString b =
               com.google.protobuf.ByteString.copyFromUtf8(
                   (java.lang.String) ref);
           leaderId_ = b;
@@ -988,8 +988,8 @@ public final class InstallSnapshotMessages {
       "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
       "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
       " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
-      "light.controller.cluster.raft.protobuff." +
-      "messagesB\027InstallSnapshotMessagesH\001"
+      "light.controller.protobuff.messages.clus" +
+      "ter.raftB\027InstallSnapshotMessagesH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
index c9d5e89ae1079c51249acaaeaf61fe27e2b7577c..0f93f43c566639641c32a3a1472d0c84ae8e513f 100644 (file)
@@ -100,7 +100,9 @@ public class XmlStreamUtils {
     for (Entry<URI, String> e: prefixes.getPrefixes()) {
       writer.writeNamespace(e.getValue(), e.getKey().toString());
     }
-    LOG.debug("Instance identifier with Random prefix is now {}", str);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Instance identifier with Random prefix is now {}", str);
+    }
     writer.writeCharacters(str);
   }
 
@@ -169,7 +171,7 @@ public class XmlStreamUtils {
         DataSchemaNode childSchema = null;
         if (schema instanceof DataNodeContainer) {
           childSchema = SchemaUtils.findFirstSchema(child.getNodeType(), ((DataNodeContainer) schema).getChildNodes()).orNull();
-          if (childSchema == null) {
+          if (childSchema == null && LOG.isDebugEnabled()) {
             LOG.debug("Probably the data node \"{}\" does not conform to schema", child == null ? "" : child.getNodeType().getLocalName());
           }
         }
@@ -192,7 +194,9 @@ public class XmlStreamUtils {
    */
   public void writeValue(final @Nonnull XMLStreamWriter writer, final @Nonnull TypeDefinition<?> type, final Object value) throws XMLStreamException {
     if (value == null) {
-      LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName());
+      if(LOG.isDebugEnabled()){
+        LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName());
+      }
       return;
     }
 
@@ -232,18 +236,24 @@ public class XmlStreamUtils {
       writer.writeNamespace(prefix, qname.getNamespace().toString());
       writer.writeCharacters(prefix + ':' + qname.getLocalName());
     } else {
-      LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+      }
       writer.writeCharacters(String.valueOf(value));
     }
   }
 
   private static void write(final @Nonnull XMLStreamWriter writer, final @Nonnull InstanceIdentifierTypeDefinition type, final @Nonnull Object value) throws XMLStreamException {
     if (value instanceof YangInstanceIdentifier) {
-      LOG.debug("Writing InstanceIdentifier object {}", value);
+      if(LOG.isDebugEnabled()) {
+          LOG.debug("Writing InstanceIdentifier object {}", value);
+      }
       write(writer, (YangInstanceIdentifier)value);
     } else {
-      LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
-      writer.writeCharacters(String.valueOf(value));
+      if(LOG.isDebugEnabled()) {
+          LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+      }
+        writer.writeCharacters(String.valueOf(value));
     }
   }
 }
index ea8f4a3ef19810a6d95ebc4211b0f4569b6e2716..d0cc2adb5f06e1a61859e57b6b086857e36d3f22 100644 (file)
@@ -74,7 +74,9 @@ public class XmlUtils {
    * @return xml String
    */
   public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
-    LOG.debug("Converting input composite node to xml {}", cNode);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Converting input composite node to xml {}", cNode);
+    }
     if (cNode == null) {
         return BLANK;
     }
@@ -88,12 +90,14 @@ public class XmlUtils {
       Set<RpcDefinition> rpcs =  schemaContext.getOperations();
       for(RpcDefinition rpc : rpcs) {
         if(rpc.getQName().equals(cNode.getNodeType())){
-          LOG.debug("Found the rpc definition from schema context matching with input composite node  {}", rpc.getQName());
-
+          if(LOG.isDebugEnabled()) {
+              LOG.debug("Found the rpc definition from schema context matching with input composite node  {}", rpc.getQName());
+          }
           CompositeNode inputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "input"));
           domTree = XmlDocumentUtils.toDocument(inputContainer, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider());
-
-          LOG.debug("input composite node to document conversion complete, document is   {}", domTree);
+          if(LOG.isDebugEnabled()) {
+              LOG.debug("input composite node to document conversion complete, document is   {}", domTree);
+          }
           break;
         }
       }
@@ -111,7 +115,9 @@ public class XmlUtils {
    * @return xml string
    */
   public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
-    LOG.debug("Converting output composite node to xml {}", cNode);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Converting output composite node to xml {}", cNode);
+    }
     if (cNode == null) {
         return BLANK;
     }
@@ -125,12 +131,14 @@ public class XmlUtils {
       Set<RpcDefinition> rpcs =  schemaContext.getOperations();
       for(RpcDefinition rpc : rpcs) {
         if(rpc.getQName().equals(cNode.getNodeType())){
-          LOG.debug("Found the rpc definition from schema context matching with output composite node  {}", rpc.getQName());
-
+          if(LOG.isDebugEnabled()) {
+              LOG.debug("Found the rpc definition from schema context matching with output composite node  {}", rpc.getQName());
+          }
           CompositeNode outputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "output"));
           domTree = XmlDocumentUtils.toDocument(outputContainer, rpc.getOutput(), XmlDocumentUtils.defaultValueCodecProvider());
-
-          LOG.debug("output composite node to document conversion complete, document is   {}", domTree);
+          if(LOG.isDebugEnabled()) {
+              LOG.debug("output composite node to document conversion complete, document is   {}", domTree);
+          }
           break;
         }
       }
@@ -152,8 +160,9 @@ public class XmlUtils {
 
       LOG.error("Error during translation of Document to OutputStream", e);
     }
-    LOG.debug("Document to string conversion complete, xml string is  {} ",  writer.toString());
-
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Document to string conversion complete, xml string is  {} ", writer.toString());
+    }
     return writer.toString();
   }
 
@@ -188,7 +197,9 @@ public class XmlUtils {
    * @return CompositeNode object based on the input, if any of the input parameter is null, a null object is returned
    */
   public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml,  SchemaContext schemaContext){
-    LOG.debug("Converting input xml to composite node {}", xml);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Converting input xml to composite node {}", xml);
+    }
     if (xml==null || xml.length()==0) {
         return null;
     }
@@ -208,8 +219,9 @@ public class XmlUtils {
       Set<RpcDefinition> rpcs =  schemaContext.getOperations();
       for(RpcDefinition rpcDef : rpcs) {
         if(rpcDef.getQName().equals(rpc)){
-          LOG.debug("found the rpc definition from schema context matching rpc  {}", rpc);
-
+          if(LOG.isDebugEnabled()) {
+              LOG.debug("found the rpc definition from schema context matching rpc  {}", rpc);
+          }
           if(rpcDef.getInput() == null) {
             LOG.warn("found rpc definition's input is null");
             return null;
@@ -225,9 +237,9 @@ public class XmlUtils {
 
           List<Node<?>> dataNodes = XmlDocumentUtils.toDomNodes(xmlData,
               Optional.of(rpcDef.getInput().getChildNodes()), schemaContext);
-
-          LOG.debug("Converted xml input to list of nodes  {}", dataNodes);
-
+          if(LOG.isDebugEnabled()) {
+              LOG.debug("Converted xml input to list of nodes  {}", dataNodes);
+          }
           final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
           it.setQName(rpc);
           it.add(ImmutableCompositeNode.create(input, dataNodes));
@@ -240,8 +252,9 @@ public class XmlUtils {
     } catch (IOException e) {
       LOG.error("Error during building data tree from XML", e);
     }
-
-    LOG.debug("Xml to composite node conversion complete {} ", compositeNode);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Xml to composite node conversion complete {} ", compositeNode);
+    }
     return compositeNode;
   }
 
similarity index 82%
rename from opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto
rename to opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto
index 14f821b5e20f1fd44f1f436cf76ce08e39e1ccf0..4198644b13952f8e05c5f9f4a0db75316594c7ba 100644 (file)
@@ -1,6 +1,6 @@
 package org.opendaylight.controller.cluster.raft;
 
-option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_package = "org.opendaylight.controller.protobuff.messages.cluster.raft";
 option java_outer_classname = "InstallSnapshotMessages";
 option optimize_for = SPEED;
 
index b67855d7312c697b2a7a0c1049a3b628701f4e08..58677103c2df020fd7ac3fdaaa38353a59a328a4 100644 (file)
@@ -16,6 +16,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import javax.annotation.Nullable;
 import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
 import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * MXBean implementation of the ThreadExecutorStatsMXBean interface that retrieves statistics
@@ -25,7 +27,7 @@ import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue;
  */
 public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean
                                            implements ThreadExecutorStatsMXBean {
-
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadExecutorStatsMXBeanImpl.class);
     private final ThreadPoolExecutor executor;
 
     /**
@@ -36,14 +38,31 @@ public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean
      * @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
      * @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
      */
-    public ThreadExecutorStatsMXBeanImpl(Executor executor, String mBeanName,
-            String mBeanType, @Nullable String mBeanCategory) {
+    public ThreadExecutorStatsMXBeanImpl(final ThreadPoolExecutor executor, final String mBeanName,
+            final String mBeanType, @Nullable final String mBeanCategory) {
         super(mBeanName, mBeanType, mBeanCategory);
+        this.executor = Preconditions.checkNotNull(executor);
+    }
+
+    /**
+     * Create a new bean for the statistics, which is already registered.
+     *
+     * @param executor
+     * @param mBeanName
+     * @param mBeanType
+     * @param mBeanCategory
+     * @return
+     */
+    public static ThreadExecutorStatsMXBeanImpl create(final Executor executor, final String mBeanName,
+            final String mBeanType, @Nullable final String mBeanCategory) {
+        if (executor instanceof ThreadPoolExecutor) {
+            final ThreadExecutorStatsMXBeanImpl ret = new ThreadExecutorStatsMXBeanImpl((ThreadPoolExecutor) executor, mBeanName, mBeanType, mBeanCategory);
+            ret.registerMBean();
+            return ret;
+        }
 
-        Preconditions.checkArgument(executor instanceof ThreadPoolExecutor,
-                "The ExecutorService of type {} is not an instanceof ThreadPoolExecutor",
-                executor.getClass());
-        this.executor = (ThreadPoolExecutor)executor;
+        LOG.info("Executor {} is not supported", executor);
+        return null;
     }
 
     @Override
index bf541d95deadeb3e9ecce59c83cdb988934289a5..c780881a2ffad1ed50695b7a38111068ec2f8e3f 100644 (file)
@@ -76,9 +76,9 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         Preconditions.checkNotNull(path, "path should not be null");
         Preconditions.checkNotNull(listener, "listener should not be null");
-
-        LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+        }
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
             DataChangeListener.props(listener ));
 
@@ -108,11 +108,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
             }, actorContext.getActorSystem().dispatcher());
             return listenerRegistrationProxy;
         }
-
-        LOG.debug(
-            "No local shard for shardName {} was found so returning a noop registration",
-            shardName);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(
+                "No local shard for shardName {} was found so returning a noop registration",
+                shardName);
+        }
         return new NoOpDataChangeListenerRegistration(listener);
     }
 
index 713996b13b295725802832bb2b6f8b7e69aa8f8f..0fa27706e19382c0b84cc44b93d890ee9f0d1c8e 100644 (file)
@@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
@@ -172,8 +171,11 @@ public class Shard extends RaftActor {
     }
 
     @Override public void onReceiveRecover(Object message) {
-        LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
-            getSender());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("onReceiveRecover: Received message {} from {}",
+                message.getClass().toString(),
+                getSender());
+        }
 
         if (message instanceof RecoveryFailure){
             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
@@ -183,8 +185,11 @@ public class Shard extends RaftActor {
     }
 
     @Override public void onReceiveCommand(Object message) {
-        LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
-            getSender());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("onReceiveCommand: Received message {} from {}",
+                message.getClass().toString(),
+                getSender());
+        }
 
         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
             // This must be for install snapshot. Don't want to open this up and trigger
@@ -193,6 +198,7 @@ public class Shard extends RaftActor {
                 .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
                     self());
 
+            createSnapshotTransaction = null;
             // Send a PoisonPill instead of sending close transaction because we do not really need
             // a response
             getSender().tell(PoisonPill.getInstance(), self());
@@ -298,7 +304,9 @@ public class Shard extends RaftActor {
             ShardTransactionIdentifier.builder()
                 .remoteTransactionId(remoteTransactionId)
                 .build();
-        LOG.debug("Creating transaction : {} ", transactionId);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Creating transaction : {} ", transactionId);
+        }
         ActorRef transactionActor =
             createTypedTransactionActor(transactionType, transactionId, transactionChainId);
 
@@ -325,13 +333,19 @@ public class Shard extends RaftActor {
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
-            LOG.debug(
-                "Could not find cohort for modification : {}. Writing modification using a new transaction",
-                modification);
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Could not find cohort for modification : {}. Writing modification using a new transaction",
+                    modification);
+            }
+
             DOMStoreWriteTransaction transaction =
                 store.newWriteOnlyTransaction();
 
-            LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+            }
 
             modification.apply(transaction);
             try {
@@ -352,13 +366,12 @@ public class Shard extends RaftActor {
             return;
         }
 
-        final ListenableFuture<Void> future = cohort.commit();
-        final ActorRef self = getSelf();
+        ListenableFuture<Void> future = cohort.commit();
 
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-                sender.tell(new CommitTransactionReply().toSerializable(), self);
+                sender.tell(new CommitTransactionReply().toSerializable(), getSelf());
                 shardMBean.incrementCommittedTransactionCount();
                 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
             }
@@ -367,7 +380,7 @@ public class Shard extends RaftActor {
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during commit");
                 shardMBean.incrementFailedTransactionsCount();
-                sender.tell(new akka.actor.Status.Failure(t), self);
+                sender.tell(new akka.actor.Status.Failure(t), getSelf());
             }
         });
 
@@ -401,8 +414,10 @@ public class Shard extends RaftActor {
     private void registerChangeListener(
         RegisterChangeListener registerChangeListener) {
 
-        LOG.debug("registerDataChangeListener for {}", registerChangeListener
-            .getPath());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("registerDataChangeListener for {}", registerChangeListener
+                .getPath());
+        }
 
 
         ActorSelection dataChangeListenerPath = getContext()
@@ -430,23 +445,17 @@ public class Shard extends RaftActor {
             getContext().actorOf(
                 DataChangeListenerRegistration.props(registration));
 
-        LOG.debug(
-            "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
-            , listenerRegistration.path().toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(
+                "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
+                , listenerRegistration.path().toString());
+        }
 
         getSender()
             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
                 getSelf());
     }
 
-    private void createTransactionChain() {
-        DOMStoreTransactionChain chain = store.createTransactionChain();
-        ActorRef transactionChain = getContext().actorOf(
-                ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
-        getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
-                getSelf());
-    }
-
     private boolean isMetricsCaptureEnabled(){
         CommonConfig config = new CommonConfig(getContext().system().settings().config());
         return config.isMetricCaptureEnabled();
@@ -503,6 +512,8 @@ public class Shard extends RaftActor {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
+
+        LOG.info("Applying snapshot");
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
@@ -517,6 +528,8 @@ public class Shard extends RaftActor {
             syncCommitTransaction(transaction);
         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred when applying snapshot");
+        } finally {
+            LOG.info("Done applying snapshot");
         }
     }
 
@@ -526,14 +539,17 @@ public class Shard extends RaftActor {
                 .tell(new EnableNotification(isLeader()), getSelf());
         }
 
-
         shardMBean.setRaftState(getRaftState().name());
         shardMBean.setCurrentTerm(getCurrentTerm());
 
         // If this actor is no longer the leader close all the transaction chains
         if(!isLeader()){
             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
-                LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+                        entry.getKey(), getId());
+                }
                 entry.getValue().close();
             }
 
@@ -542,10 +558,6 @@ public class Shard extends RaftActor {
     }
 
     @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
-        if((oldLeader == null && newLeader == null) || (newLeader != null && newLeader.equals(oldLeader)) ){
-            return;
-        }
-        LOG.info("Current state = {}, Leader = {}", getRaftState().name(), newLeader);
         shardMBean.setLeader(newLeader);
     }
 
index 13ecaa5619614e133b8afe5124acc25a9bacfc84..a97c00f1d88227fb9d01c90ce38a80b8ccbb1e50 100644 (file)
@@ -337,11 +337,11 @@ public class ShardManager extends AbstractUntypedActorWithMetering {
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
-
-                LOG.debug(
-                    "Sending PeerAddressResolved for peer {} with address {} to {}",
-                    peerId, peerAddress, actor.path());
-
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        "Sending PeerAddressResolved for peer {} with address {} to {}",
+                        peerId, peerAddress, actor.path());
+                }
                 actor
                     .tell(new PeerAddressResolved(peerId, peerAddress),
                         getSelf());
index b810ed9575a7af8d1f85a7b337639e2f3dfc72ca..f5ca6e3c5aa2334eb26c52408dd7279f08e33d3a 100644 (file)
@@ -105,7 +105,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
             getSender().tell(new GetCompositeModificationReply(
                     new ImmutableCompositeModification(modification)), getSelf());
         } else if (message instanceof ReceiveTimeout) {
-            LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+            }
             closeTransaction(false);
         } else {
             throw new UnknownMessageException(message);
@@ -163,8 +165,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
         modification.addModification(
                 new WriteModification(message.getPath(), message.getData(),schemaContext));
-        LOG.debug("writeData at path : " + message.getPath().toString());
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("writeData at path : " + message.getPath().toString());
+        }
         try {
             transaction.write(message.getPath(), message.getData());
             getSender().tell(new WriteDataReply().toSerializable(), getSelf());
@@ -176,7 +179,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
         modification.addModification(
                 new MergeModification(message.getPath(), message.getData(), schemaContext));
-        LOG.debug("mergeData at path : " + message.getPath().toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("mergeData at path : " + message.getPath().toString());
+        }
         try {
             transaction.merge(message.getPath(), message.getData());
             getSender().tell(new MergeDataReply().toSerializable(), getSelf());
@@ -186,7 +191,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     }
 
     protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
-        LOG.debug("deleteData at path : " + message.getPath().toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("deleteData at path : " + message.getPath().toString());
+        }
         modification.addModification(new DeleteModification(message.getPath()));
         try {
             transaction.delete(message.getPath());
index e6ac7f8dbc368665d9cb13d623e531533f51e04a..0c3d33a78c4801002571e2b5dc0ab02f8ddae971 100644 (file)
@@ -25,7 +25,9 @@ public class TerminationMonitor extends UntypedActor{
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof Terminated){
             Terminated terminated = (Terminated) message;
-            LOG.debug("Actor terminated : {}", terminated.actor());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Actor terminated : {}", terminated.actor());
+            }
         } else if(message instanceof Monitor){
             Monitor monitor = (Monitor) message;
             getContext().watch(monitor.getActorRef());
index e3ae5dac7b7950f3fc300a3189bc83922df24033..df85bb136a93b51084676a39cb1d7b53b54b7037 100644 (file)
@@ -101,7 +101,9 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
 
     private void commit(CommitTransaction message) {
         // Forward the commit to the shard
-        log.debug("Forward commit transaction to Shard {} ", shardActor);
+        if(log.isDebugEnabled()) {
+            log.debug("Forward commit transaction to Shard {} ", shardActor);
+        }
         shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
             getContext());
 
index a5be69531d73ede89f7bba5978a85d2e045d8989..a7a5b31b174e4e0d03db192aa367a43ebe67ad62 100644 (file)
@@ -65,9 +65,10 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             @Override
             public Void apply(Iterable<ActorPath> paths) {
                 cohortPaths = Lists.newArrayList(paths);
-
-                LOG.debug("Tx {} successfully built cohort path list: {}",
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {} successfully built cohort path list: {}",
                         transactionId, cohortPaths);
+                }
                 return null;
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
@@ -75,8 +76,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Boolean> canCommit() {
-        LOG.debug("Tx {} canCommit", transactionId);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} canCommit", transactionId);
+        }
         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
         // The first phase of canCommit is to gather the list of cohort actor paths that will
@@ -89,7 +91,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             @Override
             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
                 if(failure != null) {
-                    LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+                    }
                     returnFuture.setException(failure);
                 } else {
                     finishCanCommit(returnFuture);
@@ -101,9 +105,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     }
 
     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
-
-        LOG.debug("Tx {} finishCanCommit", transactionId);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} finishCanCommit", transactionId);
+        }
         // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
         // their canCommit processing. If any one fails then we'll fail canCommit.
 
@@ -114,7 +118,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
                 if(failure != null) {
-                    LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
+                    }
                     returnFuture.setException(failure);
                     return;
                 }
@@ -135,9 +141,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         return;
                     }
                 }
-
-                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
-
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+                }
                 returnFuture.set(Boolean.valueOf(result));
             }
         }, actorContext.getActorSystem().dispatcher());
@@ -146,9 +152,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private Future<Iterable<Object>> invokeCohorts(Object message) {
         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
         for(ActorPath actorPath : cohortPaths) {
-
-            LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
+            }
             ActorSelection cohort = actorContext.actorSelection(actorPath);
 
             futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
@@ -184,8 +190,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
             final Class<?> expectedResponseClass, final boolean propagateException) {
 
-        LOG.debug("Tx {} {}", transactionId, operationName);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} {}", transactionId, operationName);
+        }
         final SettableFuture<Void> returnFuture = SettableFuture.create();
 
         // The cohort actor list should already be built at this point by the canCommit phase but,
@@ -199,9 +206,10 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 @Override
                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
                     if(failure != null) {
-                        LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
                                 operationName, failure);
-
+                        }
                         if(propagateException) {
                             returnFuture.setException(failure);
                         } else {
@@ -221,9 +229,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private void finishVoidOperation(final String operationName, final Object message,
             final Class<?> expectedResponseClass, final boolean propagateException,
             final SettableFuture<Void> returnFuture) {
-
-        LOG.debug("Tx {} finish {}", transactionId, operationName);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} finish {}", transactionId, operationName);
+        }
         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
 
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@@ -243,9 +251,10 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 if(exceptionToPropagate != null) {
-                    LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
                             operationName, exceptionToPropagate);
-
+                    }
                     if(propagateException) {
                         // We don't log the exception here to avoid redundant logging since we're
                         // propagating to the caller in MD-SAL core who will log it.
@@ -254,12 +263,16 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         // Since the caller doesn't want us to propagate the exception we'll also
                         // not log it normally. But it's usually not good to totally silence
                         // exceptions so we'll log it to debug level.
-                        LOG.debug(String.format("%s failed",  message.getClass().getSimpleName()),
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
                                 exceptionToPropagate);
+                        }
                         returnFuture.set(null);
                     }
                 } else {
-                    LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
+                    }
                     returnFuture.set(null);
                 }
             }
index 97a9ff0bf379ef3b8f6568ed37603ed285ba35a5..6cf16b44268c6c16e26e0658632f61994ee33971 100644 (file)
@@ -224,8 +224,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 new TransactionProxyCleanupPhantomReference(this);
             phantomReferenceCache.put(cleanup, cleanup);
         }
-
-        LOG.debug("Created txn {} of type {}", identifier, transactionType);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Created txn {} of type {}", identifier, transactionType);
+        }
     }
 
     @Override
@@ -235,8 +236,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Read operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} read {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} read {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         return transactionContext(path).readData(path);
@@ -248,8 +250,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Exists operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} exists {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} exists {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         return transactionContext(path).dataExists(path);
@@ -267,8 +270,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} write {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} write {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).writeData(path, data);
@@ -279,8 +283,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} merge {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} merge {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).mergeData(path, data);
@@ -290,9 +295,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public void delete(YangInstanceIdentifier path) {
 
         checkModificationState();
-
-        LOG.debug("Tx {} delete {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} delete {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).deleteData(path);
@@ -305,16 +310,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         inReadyState = true;
 
-        LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
                 remoteTransactionPaths.size());
-
+        }
         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
 
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {}", identifier,
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Readying transaction for shard {}", identifier,
                     transactionContext.getShardName());
-
+            }
             cohortPathFutures.add(transactionContext.readyTransaction());
         }
 
@@ -381,8 +388,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 String transactionPath = reply.getTransactionPath();
 
-                LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
-
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
+                }
                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
                 if (transactionType == TransactionType.READ_ONLY) {
@@ -404,7 +412,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     "Invalid reply type {} for CreateTransaction", response.getClass()));
             }
         } catch (Exception e) {
-            LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+            }
             remoteTransactionPaths
                 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
         }
@@ -489,15 +499,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         @Override
         public void closeTransaction() {
-            LOG.debug("Tx {} closeTransaction called", identifier);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} closeTransaction called", identifier);
+            }
             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
         @Override
         public Future<ActorPath> readyTransaction() {
-            LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
                     identifier, recordedOperationFutures.size());
-
+            }
             // Send the ReadyTransaction message to the Tx actor.
 
             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
@@ -522,10 +535,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
                 @Override
                 public ActorPath apply(Iterable<Object> notUsed) {
-
-                    LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
                             identifier);
-
+                    }
                     // At this point all the Futures succeeded and we need to extract the cohort
                     // actor path from the ReadyTransactionReply. For the recorded operations, they
                     // don't return any data so we're only interested that they completed
@@ -543,9 +556,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         String resolvedCohortPath = getResolvedCohortPath(
                                 reply.getCohortPath().toString());
 
-                        LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
                                 identifier, resolvedCohortPath);
-
+                        }
                         return actorContext.actorFor(resolvedCohortPath);
                     } else {
                         // Throwing an exception here will fail the Future.
@@ -559,21 +573,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
-            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            }
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
                     new DeleteData(path).toSerializable() ));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            }
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
                     new MergeData(path, data, schemaContext).toSerializable()));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            }
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
                     new WriteData(path, data, schemaContext).toSerializable()));
         }
@@ -582,8 +602,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 final YangInstanceIdentifier path) {
 
-            LOG.debug("Tx {} readData called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readData called path = {}", identifier, path);
+            }
             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
 
             // If there were any previous recorded put/merge/delete operation reply Futures then we
@@ -593,9 +614,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(recordedOperationFutures.isEmpty()) {
                 finishReadData(path, returnFuture);
             } else {
-                LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {} readData: verifying {} previous recorded operations",
                         identifier, recordedOperationFutures.size());
-
+                }
                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
                 // Futures#sequence accesses the passed List on a different thread, as
                 // recordedOperationFutures is not synchronized.
@@ -608,9 +630,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
                             throws Throwable {
                         if(failure != null) {
-                            LOG.debug("Tx {} readData: a recorded operation failed: {}",
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("Tx {} readData: a recorded operation failed: {}",
                                     identifier, failure);
-
+                            }
                             returnFuture.setException(new ReadFailedException(
                                     "The read could not be performed because a previous put, merge,"
                                     + "or delete operation failed", failure));
@@ -629,20 +652,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private void finishReadData(final YangInstanceIdentifier path,
                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
 
-            LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+            }
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
                     if(failure != null) {
-                        LOG.debug("Tx {} read operation failed: {}", identifier, failure);
-
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+                        }
                         returnFuture.setException(new ReadFailedException(
                                 "Error reading data for path " + path, failure));
 
                     } else {
-                        LOG.debug("Tx {} read operation succeeded", identifier, failure);
-
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} read operation succeeded", identifier, failure);
+                        }
                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
                                     path, readResponse);
@@ -669,8 +695,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 final YangInstanceIdentifier path) {
 
-            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            }
             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
             // If there were any previous recorded put/merge/delete operation reply Futures then we
@@ -681,9 +708,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(recordedOperationFutures.isEmpty()) {
                 finishDataExists(path, returnFuture);
             } else {
-                LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
                         identifier, recordedOperationFutures.size());
-
+                }
                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
                 // Futures#sequence accesses the passed List on a different thread, as
                 // recordedOperationFutures is not synchronized.
@@ -696,9 +724,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
                             throws Throwable {
                         if(failure != null) {
-                            LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
                                     identifier, failure);
-
+                            }
                             returnFuture.setException(new ReadFailedException(
                                     "The data exists could not be performed because a previous "
                                     + "put, merge, or delete operation failed", failure));
@@ -717,19 +746,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private void finishDataExists(final YangInstanceIdentifier path,
                 final SettableFuture<Boolean> returnFuture) {
 
-            LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+            }
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
                 public void onComplete(Throwable failure, Object response) throws Throwable {
                     if(failure != null) {
-                        LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
-
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+                        }
                         returnFuture.setException(new ReadFailedException(
                                 "Error checking data exists for path " + path, failure));
                     } else {
-                        LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
-
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+                        }
                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
                             returnFuture.set(Boolean.valueOf(DataExistsReply.
                                         fromSerializable(response).exists()));
@@ -761,34 +793,46 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         @Override
         public void closeTransaction() {
-            LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+            }
         }
 
         @Override
         public Future<ActorPath> readyTransaction() {
-            LOG.debug("Tx {} readyTransaction called", identifier);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readyTransaction called", identifier);
+            }
             return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
-            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            }
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            }
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            }
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
-            LOG.debug("Tx {} readData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readData called path = {}", identifier, path);
+            }
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
@@ -796,7 +840,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         @Override
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
             YangInstanceIdentifier path) {
-            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            }
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
index 0a1964b0533bfc7ead91025e5792f5edda85b844..74a91d08cf4373918f069cc3cae44b665c146a9d 100644 (file)
@@ -74,7 +74,7 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     }
 
     public void setDataStoreExecutor(ExecutorService dsExecutor) {
-        this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+        this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor,
                 "notification-executor", getMBeanType(), getMBeanCategory());
     }
 
@@ -82,7 +82,7 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
                 "notification-manager", getMBeanType(), getMBeanCategory());
 
-        this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+        this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(),
                 "data-store-executor", getMBeanType(), getMBeanCategory());
     }
 
index 7b5588cb196a66fa68de947fecc58137977275ae..8ba333d2799a5177c7b1b8b5b09ab6b4ec87d126 100644 (file)
@@ -125,8 +125,9 @@ public class ActorContext {
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
 
-            LOG.debug("Local shard found {}", found.getPath());
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Local shard found {}", found.getPath());
+            }
             return found.getPath();
         }
 
@@ -141,8 +142,9 @@ public class ActorContext {
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
 
-            LOG.debug("Primary found {}", found.getPrimaryPath());
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Primary found {}", found.getPrimaryPath());
+            }
             return found.getPrimaryPath();
         }
         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
@@ -175,9 +177,10 @@ public class ActorContext {
      */
     public Object executeRemoteOperation(ActorSelection actor, Object message) {
 
-        LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
-            actor.toString());
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
+                actor.toString());
+        }
         Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
@@ -197,8 +200,9 @@ public class ActorContext {
      */
     public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
 
-        LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+        }
         return ask(actor, message, operationTimeout);
     }
 
index 4c550a768cce258e3a151139f753751281b439d6..022ef9bbafef949921ec24041357cff64013ea12 100644 (file)
@@ -25,12 +25,16 @@ public abstract class AbstractActorTest {
 
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
+
+        deletePersistenceFiles();
     }
 
     @AfterClass
     public static void tearDownClass() throws IOException {
         JavaTestKit.shutdownActorSystem(system);
         system = null;
+
+        deletePersistenceFiles();
     }
 
     protected static void deletePersistenceFiles() throws IOException {
index 06bcac8d786b943a0bf12087c31c0ba659c1f017..deb71c2df4aa9cc522904e4014ed66d536aa2fa4 100644 (file)
@@ -343,11 +343,16 @@ public class ShardTest extends AbstractActorTest {
                     subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
                         getRef());
 
-                    waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
+                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
+                        getRef());
+
+                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
                 }
             };
 
-            Thread.sleep(2000);
             deletePersistenceFiles();
         }};
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java
new file mode 100644 (file)
index 0000000..0e492f0
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import com.google.common.collect.Iterables;
+import scala.concurrent.Future;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InMemorySnapshotStore extends SnapshotStore {
+
+    Map<String, List<Snapshot>> snapshots = new HashMap<>();
+
+    @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+        SnapshotSelectionCriteria snapshotSelectionCriteria) {
+        List<Snapshot> snapshotList = snapshots.get(s);
+        if(snapshotList == null){
+            return Futures.successful(Option.<SelectedSnapshot>none());
+        }
+
+        Snapshot snapshot = Iterables.getLast(snapshotList);
+        SelectedSnapshot selectedSnapshot =
+            new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
+        return Futures.successful(Option.some(selectedSnapshot));
+    }
+
+    @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+        if(snapshotList == null){
+            snapshotList = new ArrayList<>();
+            snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
+        }
+        snapshotList.add(new Snapshot(snapshotMetadata, o));
+
+        return Futures.successful(null);
+    }
+
+    @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+    }
+
+    @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+        if(snapshotList == null){
+            return;
+        }
+
+        int deleteIndex = -1;
+
+        for(int i=0;i<snapshotList.size(); i++){
+            Snapshot snapshot = snapshotList.get(i);
+            if(snapshotMetadata.equals(snapshot.getMetadata())){
+                deleteIndex = i;
+                break;
+            }
+        }
+
+        if(deleteIndex != -1){
+            snapshotList.remove(deleteIndex);
+        }
+
+    }
+
+    @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+        throws Exception {
+        List<Snapshot> snapshotList = snapshots.get(s);
+
+        if(snapshotList == null){
+            return;
+        }
+
+        // TODO : This is a quick and dirty implementation. Do actual match later.
+        snapshotList.clear();
+        snapshots.remove(s);
+    }
+
+    private static class Snapshot {
+        private final SnapshotMetadata metadata;
+        private final Object data;
+
+        private Snapshot(SnapshotMetadata metadata, Object data) {
+            this.metadata = metadata;
+            this.data = data;
+        }
+
+        public SnapshotMetadata getMetadata() {
+            return metadata;
+        }
+
+        public Object getData() {
+            return data;
+        }
+    }
+}
index 794b376af8c694e3773c353dbd99668713ef9f10..f0dadc618b2b4769b0240ff1c55567b28c924fb9 100644 (file)
@@ -1,4 +1,6 @@
 akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
     actor {
@@ -14,6 +16,14 @@ akka {
         }
     }
 }
+
+in-memory-snapshot-store {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
 bounded-mailbox {
   mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
   mailbox-capacity = 1000
index 88336526e9746facb01bf5473a6243c9d1996c81..ac62974d290e5cb37744e39b085952a57aa28543 100644 (file)
@@ -12,6 +12,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
 import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
 import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl;
@@ -96,22 +97,23 @@ public final class DomInmemoryDataBrokerModule extends
                 newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE);
         commitStatsMXBean.registerMBean();
 
-        final ThreadExecutorStatsMXBeanImpl commitExecutorStatsMXBean =
-                new ThreadExecutorStatsMXBeanImpl(commitExecutor, "CommitExecutorStats",
+        final AbstractMXBean commitExecutorStatsMXBean =
+                ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats",
                         JMX_BEAN_TYPE, null);
-        commitExecutorStatsMXBean.registerMBean();
-
-        final ThreadExecutorStatsMXBeanImpl commitFutureStatsMXBean =
-                new ThreadExecutorStatsMXBeanImpl(listenableFutureExecutor,
+        final AbstractMXBean commitFutureStatsMXBean =
+                ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor,
                         "CommitFutureExecutorStats", JMX_BEAN_TYPE, null);
-        commitFutureStatsMXBean.registerMBean();
 
         newDataBroker.setCloseable(new AutoCloseable() {
             @Override
             public void close() {
                 commitStatsMXBean.unregisterMBean();
-                commitExecutorStatsMXBean.unregisterMBean();
-                commitFutureStatsMXBean.unregisterMBean();
+                if (commitExecutorStatsMXBean != null) {
+                    commitExecutorStatsMXBean.unregisterMBean();
+                }
+                if (commitFutureStatsMXBean != null) {
+                    commitFutureStatsMXBean.unregisterMBean();
+                }
             }
         });
 
index 96ddb9e0cea13478feba4edc40cd9394ab908d2e..d1f11ba9a36e693e223930b2fb4269e03acf21da 100644 (file)
@@ -492,13 +492,15 @@ public class XSQLAdapter extends Thread implements SchemaContextListener {
                     out.print(prompt);
                     char c = 0;
                     byte data[] = new byte[1];
-                    while (c != '\n') {
+                    while (!socket.isClosed() && socket.isConnected() && !socket.isInputShutdown() && c != '\n') {
                         try {
                             in.read(data);
                             c = (char) data[0];
                             inputString.append(c);
                         } catch (Exception err) {
                             err.printStackTrace(out);
+                            stopped = true;
+                            break;
                         }
                     }
 
index 3d61c7b6b65b3816bcc12247ace3f3b57656177e..74fa73afb92f869f7cb2e945a625d489b71e71c2 100644 (file)
@@ -14,7 +14,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -80,29 +79,26 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
     private final ListenerTree listenerTree = ListenerTree.create();
     private final AtomicLong txCounter = new AtomicLong(0);
-    private final ListeningExecutorService listeningExecutor;
 
     private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
     private final ExecutorService dataChangeListenerExecutor;
-
-    private final ExecutorService domStoreExecutor;
+    private final ListeningExecutorService commitExecutor;
     private final boolean debugTransactions;
     private final String name;
 
     private volatile AutoCloseable closeable;
 
-    public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
+    public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
             final ExecutorService dataChangeListenerExecutor) {
-        this(name, domStoreExecutor, dataChangeListenerExecutor,
+        this(name, commitExecutor, dataChangeListenerExecutor,
              InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
     }
 
-    public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
+    public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
             final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
             final boolean debugTransactions) {
         this.name = Preconditions.checkNotNull(name);
-        this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor);
-        this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor);
+        this.commitExecutor = Preconditions.checkNotNull(commitExecutor);
         this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
         this.debugTransactions = debugTransactions;
 
@@ -121,7 +117,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
     }
 
     public ExecutorService getDomStoreExecutor() {
-        return domStoreExecutor;
+        return commitExecutor;
     }
 
     @Override
@@ -156,7 +152,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
     @Override
     public void close() {
-        ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+        ExecutorServiceUtil.tryGracefulShutdown(commitExecutor, 30, TimeUnit.SECONDS);
         ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
 
         if(closeable != null) {
@@ -386,7 +382,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            return listeningExecutor.submit(new Callable<Boolean>() {
+            return commitExecutor.submit(new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws TransactionCommitFailedException {
                     try {
@@ -410,7 +406,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            return listeningExecutor.submit(new Callable<Void>() {
+            return commitExecutor.submit(new Callable<Void>() {
                 @Override
                 public Void call() {
                     candidate = dataTree.prepare(modification);
index dc1482c6abaefb7880c7f6b55cc37c4d6ad65e3f..2ee8e182c255fef59d8b219fa565473e2e8f362a 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
@@ -57,7 +59,7 @@ public final class InMemoryDOMDataStoreFactory {
             @Nullable final InMemoryDOMDataStoreConfigProperties properties) {
 
         InMemoryDOMDataStoreConfigProperties actualProperties = properties;
-        if(actualProperties == null) {
+        if (actualProperties == null) {
             actualProperties = InMemoryDOMDataStoreConfigProperties.getDefault();
         }
 
@@ -65,21 +67,18 @@ public final class InMemoryDOMDataStoreFactory {
         // task execution time to get higher throughput as DataChangeListeners typically provide
         // much of the business logic for a data model. If the executor queue size limit is reached,
         // subsequent submitted notifications will block the calling thread.
-
         int dclExecutorMaxQueueSize = actualProperties.getMaxDataChangeExecutorQueueSize();
         int dclExecutorMaxPoolSize = actualProperties.getMaxDataChangeExecutorPoolSize();
 
         ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
                 dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
 
-        ExecutorService domStoreExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
-                actualProperties.getMaxDataStoreExecutorQueueSize(), "DOMStore-" + name );
-
-        InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
-                domStoreExecutor, dataChangeListenerExecutor,
+        final ListeningExecutorService commitExecutor = MoreExecutors.sameThreadExecutor();
+        final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
+            commitExecutor, dataChangeListenerExecutor,
                 actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
 
-        if(schemaService != null) {
+        if (schemaService != null) {
             schemaService.registerSchemaContextListener(dataStore);
         }
 
index b3608eceef13d7006c007a90e6c715323d693221..e00be2446a5e690b1053b6373974d4f767ef0740 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
 
 import java.util.concurrent.ExecutorService;
-
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
 import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
@@ -21,24 +21,28 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
  */
 public class InMemoryDataStoreStats implements AutoCloseable {
 
-    private final ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
-    private final ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+    private final AbstractMXBean notificationExecutorStatsBean;
+    private final AbstractMXBean dataStoreExecutorStatsBean;
     private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
 
-    public InMemoryDataStoreStats(String mBeanType, QueuedNotificationManager<?, ?> manager,
-            ExecutorService dataStoreExecutor) {
+    public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager,
+            final ExecutorService dataStoreExecutor) {
 
-        this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+        notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
                 "notification-manager", mBeanType, null);
         notificationManagerStatsBean.registerMBean();
 
-        this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+        notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(),
                 "notification-executor", mBeanType, null);
-        this.notificationExecutorStatsBean.registerMBean();
+        if (notificationExecutorStatsBean != null) {
+            notificationExecutorStatsBean.registerMBean();
+        }
 
-        this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dataStoreExecutor,
+        dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dataStoreExecutor,
                 "data-store-executor", mBeanType, null);
-        this.dataStoreExecutorStatsBean.registerMBean();
+        if (dataStoreExecutorStatsBean != null) {
+            dataStoreExecutorStatsBean.registerMBean();
+        }
     }
 
     @Override
index 2e355d4f5146b13c31ee9f4a66a1bbed757d9354..c82a72eaa56c82e40388b8fb612eed7b198dbae8 100644 (file)
@@ -38,8 +38,9 @@ public class RemoteRpcProviderFactory {
                         Thread.currentThread().getContextClassLoader());
 
         Config actorSystemConfig = config.get();
-        LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
+        }
         if (config.isMetricCaptureEnabled()) {
             LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
                     config.getActorSystemName());
index 98cf6a329f655f3caeebb39b989a1b761ca9a5cd..2aaac5a78ed531fc830bfca7540d2603a2e0f41b 100644 (file)
@@ -53,7 +53,9 @@ public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext,
    * @param announcements
    */
   private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
-    LOG.debug("Announcing [{}]", announcements);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Announcing [{}]", announcements);
+    }
     RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
     rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
   }
@@ -63,7 +65,9 @@ public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext,
    * @param removals
    */
   private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
-    LOG.debug("Removing [{}]", removals);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Removing [{}]", removals);
+    }
     RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
     rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
   }
index 6b02235dc7d218c967fbd1e7d1d1a087d3304a30..2046e419d9f2602b444becf6986fe2c73bb9756e 100644 (file)
@@ -79,8 +79,9 @@ public class RpcBroker extends AbstractUntypedActor {
     }
 
     private void invokeRemoteRpc(final InvokeRpc msg) {
-        LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+        }
         RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
                 null, msg.getRpc(), msg.getIdentifier());
         RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
@@ -147,8 +148,9 @@ public class RpcBroker extends AbstractUntypedActor {
     }
 
     private void executeRpc(final ExecuteRpc msg) {
-        LOG.debug("Executing rpc {}", msg.getRpc());
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Executing rpc {}", msg.getRpc());
+        }
         Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
                 XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
                         schemaContext));
index dee98521ae9f2d56893d591b30bc30a8489d89ef..22879dda2f903f6008c5a7c21a2a6447acca12bf 100644 (file)
@@ -31,7 +31,9 @@ public class RpcListener implements RpcRegistrationListener{
 
   @Override
   public void onRpcImplementationAdded(QName rpc) {
-    LOG.debug("Adding registration for [{}]", rpc);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Adding registration for [{}]", rpc);
+    }
     RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
     List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
     routeIds.add(routeId);
@@ -41,7 +43,9 @@ public class RpcListener implements RpcRegistrationListener{
 
   @Override
   public void onRpcImplementationRemoved(QName rpc) {
-    LOG.debug("Removing registration for [{}]", rpc);
+    if(LOG.isDebugEnabled()) {
+        LOG.debug("Removing registration for [{}]", rpc);
+    }
     RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
     List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
     routeIds.add(routeId);
index abe2008c2936cb1ed12b8624f072b65c33d8c518..48ccd824d41cf6b1456007012d6801d028443fce 100644 (file)
@@ -25,7 +25,9 @@ public class TerminationMonitor extends UntypedActor{
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof Terminated){
             Terminated terminated = (Terminated) message;
-            LOG.debug("Actor terminated : {}", terminated.actor());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Actor terminated : {}", terminated.actor());
+            }
         }else if(message instanceof Monitor){
           Monitor monitor = (Monitor) message;
           getContext().watch(monitor.getActorRef());
index 3de3fc00d0328215ef8f127e8272458f14326708..b50dfb1ba3e196f66e33d74438d43a2aca2d81ee 100644 (file)
@@ -111,7 +111,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
             receiveUpdateRemoteBuckets(
                     ((UpdateRemoteBuckets) message).getBuckets());
         } else {
-            log.debug("Unhandled message [{}]", message);
+            if(log.isDebugEnabled()) {
+                log.debug("Unhandled message [{}]", message);
+            }
             unhandled(message);
         }
     }
@@ -236,8 +238,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
                 versions.put(entry.getKey(), remoteVersion);
             }
         }
-
-        log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+        if(log.isDebugEnabled()) {
+            log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+        }
     }
 
     ///
index 85c6ebe26f859e0751122d4ab60065a0f1b48aba..1bbcc69f5ed4d5fa6d7d8ea773823c97c9bb6e05 100644 (file)
@@ -170,7 +170,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         }
 
         clusterMembers.remove(member.address());
-        log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        if(log.isDebugEnabled()) {
+            log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        }
     }
 
     /**
@@ -184,8 +186,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
         if (!clusterMembers.contains(member.address()))
             clusterMembers.add(member.address());
-
-        log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        if(log.isDebugEnabled()) {
+            log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        }
     }
 
     /**
@@ -205,8 +208,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
         }
-
-        log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+        if(log.isDebugEnabled()) {
+            log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+        }
         getLocalStatusAndSendTo(remoteMemberToGossipTo);
     }
 
@@ -244,7 +248,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     void receiveGossip(GossipEnvelope envelope){
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
-            log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+            if(log.isDebugEnabled()) {
+                log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+            }
             return;
         }
 
@@ -291,7 +297,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
 
-        log.debug("Sending bucket versions to [{}]", remoteRef);
+        if(log.isDebugEnabled()) {
+            log.debug("Sending bucket versions to [{}]", remoteRef);
+        }
 
         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
 
@@ -416,7 +424,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             public Void apply(Object msg) {
                 if (msg instanceof GetBucketsByMembersReply) {
                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+                    if(log.isDebugEnabled()) {
+                        log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+                    }
                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
                     sender.tell(envelope, getSelf());
                 }
index fe1813a19943dfe7f3446b55f9c49b1986df6729..57313d2948960b038791d75a5fdb9287cce41530 100644 (file)
       <artifactId>org.osgi.core</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index c1996f4691632637abc9fc7dffacce0bcb12f2ad..361373d78da93f114f51c887ab95c78bc6ab3265 100644 (file)
@@ -15,9 +15,9 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -50,17 +50,19 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
 
-    private final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
     private final InstanceIdentifier<Topology> topology;
     private final OperationProcessor processor;
 
-    FlowCapableTopologyExporter(final OperationProcessor processor, final InstanceIdentifier<Topology> topology) {
+    FlowCapableTopologyExporter(final OperationProcessor processor,
+            final InstanceIdentifier<Topology> topology) {
         this.processor = Preconditions.checkNotNull(processor);
         this.topology = Preconditions.checkNotNull(topology);
     }
@@ -73,15 +75,14 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
 
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                removeAffectedLinks(nodeId);
+            public void applyOperation(ReadWriteTransaction transaction) {
+                removeAffectedLinks(nodeId, transaction);
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
             }
-        });
 
-        processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(ReadWriteTransaction transaction) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+            public String toString() {
+                return "onNodeRemoved";
             }
         });
     }
@@ -97,6 +98,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
                     final InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
                     transaction.merge(LogicalDatastoreType.OPERATIONAL, path, node, true);
                 }
+
+                @Override
+                public String toString() {
+                    return "onNodeUpdated";
+                }
             });
         }
     }
@@ -104,28 +110,30 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     @Override
     public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) {
 
-        final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
-                .getNodeConnectorRef());
+        final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(
+                notification.getNodeConnectorRef());
 
-        processor.enqueueOperation(new TopologyOperation() {
-            @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                final TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
-                removeAffectedLinks(tpId);
-            }
-        });
+        final TpId tpId = toTerminationPointId(getNodeConnectorKey(
+                notification.getNodeConnectorRef()).getId());
 
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
+                removeAffectedLinks(tpId, transaction);
                 transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
             }
+
+            @Override
+            public String toString() {
+                return "onNodeConnectorRemoved";
+            }
         });
     }
 
     @Override
     public void onNodeConnectorUpdated(final NodeConnectorUpdated notification) {
-        final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class);
+        final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(
+                FlowCapableNodeConnectorUpdated.class);
         if (fcncu != null) {
             processor.enqueueOperation(new TopologyOperation() {
                 @Override
@@ -137,9 +145,14 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
                     transaction.merge(LogicalDatastoreType.OPERATIONAL, path, point, true);
                     if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
                             || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
-                        removeAffectedLinks(point.getTpId());
+                        removeAffectedLinks(point.getTpId(), transaction);
                     }
                 }
+
+                @Override
+                public String toString() {
+                    return "onNodeConnectorUpdated";
+                }
             });
         }
     }
@@ -153,6 +166,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
                 final InstanceIdentifier<Link> path = linkPath(link);
                 transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
             }
+
+            @Override
+            public String toString() {
+                return "onLinkDiscovered";
+            }
         });
     }
 
@@ -168,6 +186,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
             public void applyOperation(final ReadWriteTransaction transaction) {
                 transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
             }
+
+            @Override
+            public String toString() {
+                return "onLinkRemoved";
+            }
         });
     }
 
@@ -188,62 +211,92 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
     }
 
-    private void removeAffectedLinks(final NodeId id) {
-        processor.enqueueOperation(new TopologyOperation() {
+    private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
+        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
+                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
             @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-                    @Override
-                    public void onSuccess(Optional<Topology> topologyOptional) {
-                        if (topologyOptional.isPresent()) {
-                            List<Link> linkList = topologyOptional.get().getLink() != null
-                                    ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
-                            for (Link link : linkList) {
-                                if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
-                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
-                                }
-                            }
-                        }
-                    }
+            public void onSuccess(Optional<Topology> topologyOptional) {
+                removeAffectedLinks(id, topologyOptional);
+            }
 
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Error reading topology data for topology {}", topology, throwable);
-                    }
-                });
+            @Override
+            public void onFailure(Throwable throwable) {
+                LOG.error("Error reading topology data for topology {}", topology, throwable);
             }
         });
     }
 
-    private void removeAffectedLinks(final TpId id) {
-        processor.enqueueOperation(new TopologyOperation() {
-            @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-                    @Override
-                    public void onSuccess(Optional<Topology> topologyOptional) {
-                        if (topologyOptional.isPresent()) {
-                            List<Link> linkList = topologyOptional.get().getLink() != null
-                                    ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
-                            for (Link link : linkList) {
-                                if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
-                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
-                                }
-                            }
-                        }
-                    }
+    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+        if (!topologyOptional.isPresent()) {
+            return;
+        }
+
+        List<Link> linkList = topologyOptional.get().getLink() != null ?
+                topologyOptional.get().getLink() : Collections.<Link> emptyList();
+        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
+        for (Link link : linkList) {
+            if (id.equals(link.getSource().getSourceNode()) ||
+                    id.equals(link.getDestination().getDestNode())) {
+                linkIDsToDelete.add(linkPath(link));
+            }
+        }
+
+        enqueueLinkDeletes(linkIDsToDelete);
+    }
 
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Error reading topology data for topology {}", topology, throwable);
+    private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
+        if(!linkIDsToDelete.isEmpty()) {
+            processor.enqueueOperation(new TopologyOperation() {
+                @Override
+                public void applyOperation(ReadWriteTransaction transaction) {
+                    for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
+                        transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
                     }
-                });
+                }
+
+                @Override
+                public String toString() {
+                    return "Delete Links " + linkIDsToDelete.size();
+                }
+            });
+        }
+    }
+
+    private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
+        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
+                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
+            @Override
+            public void onSuccess(Optional<Topology> topologyOptional) {
+                removeAffectedLinks(id, topologyOptional);
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                LOG.error("Error reading topology data for topology {}", topology, throwable);
             }
         });
     }
 
+    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional) {
+        if (!topologyOptional.isPresent()) {
+            return;
+        }
+
+        List<Link> linkList = topologyOptional.get().getLink() != null
+                ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
+        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
+        for (Link link : linkList) {
+            if (id.equals(link.getSource().getSourceTp()) ||
+                    id.equals(link.getDestination().getDestTp())) {
+                linkIDsToDelete.add(linkPath(link));
+            }
+        }
+
+        enqueueLinkDeletes(linkIDsToDelete);
+    }
+
     private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
         return topology.child(Node.class, new NodeKey(nodeId));
     }
index 1cf648eb975c521d6c81e22b927e8f276f888e99..f09da0045930cf7cc843de1a924e64841f2db508 100644 (file)
@@ -11,14 +11,17 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +53,9 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
             for (; ; ) {
                 TopologyOperation op = queue.take();
 
-                LOG.debug("New operations available, starting transaction");
-                final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+                LOG.debug("New {} operation available, starting transaction", op);
 
+                final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
 
                 int ops = 0;
                 do {
@@ -64,14 +67,16 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
                     } else {
                         op = null;
                     }
+
+                    LOG.debug("Next operation {}", op);
                 } while (op != null);
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                final CheckedFuture txResultFuture = tx.submit();
-                Futures.addCallback(txResultFuture, new FutureCallback() {
+                CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
+                Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
                     @Override
-                    public void onSuccess(Object o) {
+                    public void onSuccess(Void notUsed) {
                         LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
                     }
 
diff --git a/opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java b/opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java
new file mode 100644 (file)
index 0000000..b7a56a4
--- /dev/null
@@ -0,0 +1,666 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.md.controller.topology.manager;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscoveredBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortConfig;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.flow.capable.port.StateBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.LinkId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Destination;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.DestinationBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Source;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.SourceBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class FlowCapableTopologyExporterTest {
+
+    @Mock
+    private DataBroker mockDataBroker;
+
+    @Mock
+    private BindingTransactionChain mockTxChain;
+
+    private OperationProcessor processor;
+
+    private FlowCapableTopologyExporter exporter;
+
+    private InstanceIdentifier<Topology> topologyIID;
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(1);
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(mockTxChain).when(mockDataBroker)
+                .createTransactionChain(any(TransactionChainListener.class));
+
+        processor = new OperationProcessor(mockDataBroker);
+
+        topologyIID = InstanceIdentifier.create(NetworkTopology.class)
+                .child(Topology.class, new TopologyKey(new TopologyId("test")));
+        exporter = new FlowCapableTopologyExporter(processor, topologyIID);
+
+        executor.execute(processor);
+    }
+
+    @After
+    public void tearDown() {
+        executor.shutdownNow();
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    @Test
+    public void testOnNodeRemoved() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                nodeKey = newInvNodeKey("node1");
+        InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey);
+
+        List<Link> linkList = Arrays.asList(
+                newLink("link1", newSourceNode("node1"), newDestNode("dest")),
+                newLink("link2", newSourceNode("source"), newDestNode("node1")),
+                newLink("link2", newSourceNode("source2"), newDestNode("dest2")));
+        final Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Link.class, linkList.get(0).getKey()),
+                topologyIID.child(Link.class, linkList.get(1).getKey()),
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+            };
+
+        SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
+        doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+
+        CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
+
+        int expDeleteCalls = expDeletedIIDs.length;
+        CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
+
+        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
+        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
+        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
+
+        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
+
+        waitForSubmit(submitLatch1);
+
+        setReadFutureAsync(topology, readFuture);
+
+        waitForDeletes(expDeleteCalls, deleteLatch);
+
+        waitForSubmit(submitLatch2);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+
+        verifyMockTx(mockTx1);
+        verifyMockTx(mockTx2);
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    @Test
+    public void testOnNodeRemovedWithNoTopology() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                nodeKey = newInvNodeKey("node1");
+        InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey);
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+            };
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
+
+        waitForSubmit(submitLatch);
+
+        waitForDeletes(1, deleteLatch);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorRemoved() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                  nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        List<Link> linkList = Arrays.asList(
+                newLink("link1", newSourceTp("tp1"), newDestTp("dest")),
+                newLink("link2", newSourceTp("source"), newDestTp("tp1")),
+                newLink("link3", newSourceTp("source2"), newDestTp("dest2")));
+        final Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Link.class, linkList.get(0).getKey()),
+                topologyIID.child(Link.class, linkList.get(1).getKey()),
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+                        .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1")))
+            };
+
+        final SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
+        doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+
+        CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
+
+        int expDeleteCalls = expDeletedIIDs.length;
+        CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
+
+        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
+        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
+        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
+
+        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).build());
+
+        waitForSubmit(submitLatch1);
+
+        setReadFutureAsync(topology, readFuture);
+
+        waitForDeletes(expDeleteCalls, deleteLatch);
+
+        waitForSubmit(submitLatch2);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+
+        verifyMockTx(mockTx1);
+        verifyMockTx(mockTx2);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorRemovedWithNoTopology() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                  nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+                        .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1")))
+            };
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        waitForDeletes(1, deleteLatch);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+    }
+
+    @Test
+    public void testOnNodeUpdated() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                            nodeKey = newInvNodeKey("node1");
+        InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeUpdated(new NodeUpdatedBuilder().setNodeRef(new NodeRef(invNodeID))
+                .setId(nodeKey.getId()).addAugmentation(FlowCapableNodeUpdated.class,
+                        new FlowCapableNodeUpdatedBuilder().build()).build());
+
+        waitForSubmit(submitLatch);
+
+        ArgumentCaptor<Node> mergedNode = ArgumentCaptor.forClass(Node.class);
+        NodeId expNodeId = new NodeId("node1");
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(Node.class,
+                new NodeKey(expNodeId))), mergedNode.capture(), eq(true));
+        assertEquals("getNodeId", expNodeId, mergedNode.getValue().getNodeId());
+        InventoryNode augmentation = mergedNode.getValue().getAugmentation(InventoryNode.class);
+        assertNotNull("Missing augmentation", augmentation);
+        assertEquals("getInventoryNodeRef", new NodeRef(invNodeID), augmentation.getInventoryNodeRef());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorUpdated() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                 nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+                        FlowCapableNodeConnectorUpdated.class,
+                        new FlowCapableNodeConnectorUpdatedBuilder().build()).build());
+
+        waitForSubmit(submitLatch);
+
+        ArgumentCaptor<TerminationPoint> mergedNode = ArgumentCaptor.forClass(TerminationPoint.class);
+        NodeId expNodeId = new NodeId("node1");
+        TpId expTpId = new TpId("tp1");
+        InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+                Node.class, new NodeKey(expNodeId)).child(TerminationPoint.class,
+                        new TerminationPointKey(expTpId));
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+                mergedNode.capture(), eq(true));
+        assertEquals("getTpId", expTpId, mergedNode.getValue().getTpId());
+        InventoryNodeConnector augmentation = mergedNode.getValue().getAugmentation(
+                InventoryNodeConnector.class);
+        assertNotNull("Missing augmentation", augmentation);
+        assertEquals("getInventoryNodeConnectorRef", new NodeConnectorRef(invNodeConnID),
+                augmentation.getInventoryNodeConnectorRef());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorUpdatedWithLinkStateDown() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                 nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        List<Link> linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest")));
+        Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+                        FlowCapableNodeConnectorUpdated.class,
+                        new FlowCapableNodeConnectorUpdatedBuilder().setState(
+                                new StateBuilder().setLinkDown(true).build()).build()).build());
+
+        waitForDeletes(1, deleteLatch);
+
+        InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+                Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class,
+                        new TerminationPointKey(new TpId("tp1")));
+
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+                any(TerminationPoint.class), eq(true));
+
+        assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class,
+                linkList.get(0).getKey())}, deletedLinkIDs);
+    }
+
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorUpdatedWithPortDown() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                 nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        List<Link> linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest")));
+        Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+                        FlowCapableNodeConnectorUpdated.class,
+                        new FlowCapableNodeConnectorUpdatedBuilder().setConfiguration(
+                                new PortConfig(true, true, true, true)).build()).build());
+
+        waitForDeletes(1, deleteLatch);
+
+        InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+                Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class,
+                        new TerminationPointKey(new TpId("tp1")));
+
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+                any(TerminationPoint.class), eq(true));
+
+        assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class,
+                linkList.get(0).getKey())}, deletedLinkIDs);
+    }
+
+    @Test
+    public void testOnLinkDiscovered() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                sourceNodeKey = newInvNodeKey("sourceNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+        InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                destNodeKey = newInvNodeKey("destNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                destNodeConnKey = newInvNodeConnKey("destTP");
+        InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onLinkDiscovered(new LinkDiscoveredBuilder().setSource(
+                new NodeConnectorRef(sourceConnID)).setDestination(
+                        new NodeConnectorRef(destConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        ArgumentCaptor<Link> mergedNode = ArgumentCaptor.forClass(Link.class);
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
+                mergedNode.capture(), eq(true));
+        assertEquals("Source node ID", "sourceNode",
+                mergedNode.getValue().getSource().getSourceNode().getValue());
+        assertEquals("Dest TP ID", "sourceTP",
+                mergedNode.getValue().getSource().getSourceTp().getValue());
+        assertEquals("Dest node ID", "destNode",
+                mergedNode.getValue().getDestination().getDestNode().getValue());
+        assertEquals("Dest TP ID", "destTP",
+                mergedNode.getValue().getDestination().getDestTp().getValue());
+    }
+
+    @Test
+    public void testOnLinkRemoved() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                sourceNodeKey = newInvNodeKey("sourceNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+        InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                destNodeKey = newInvNodeKey("destNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                destNodeConnKey = newInvNodeConnKey("destTP");
+        InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
+                new NodeConnectorRef(sourceConnID)).setDestination(
+                        new NodeConnectorRef(destConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+    }
+
+    private void verifyMockTx(ReadWriteTransaction mockTx) {
+        InOrder inOrder = inOrder(mockTx);
+        inOrder.verify(mockTx, atLeast(0)).submit();
+        inOrder.verify(mockTx, never()).delete(eq(LogicalDatastoreType.OPERATIONAL),
+              any(InstanceIdentifier.class));
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void assertDeletedIDs(InstanceIdentifier[] expDeletedIIDs,
+            ArgumentCaptor<InstanceIdentifier> deletedLinkIDs) {
+        Set<InstanceIdentifier> actualIIDs = new HashSet<>(deletedLinkIDs.getAllValues());
+        for(InstanceIdentifier id: expDeletedIIDs) {
+            assertTrue("Missing expected deleted IID " + id, actualIIDs.contains(id));
+        }
+    }
+
+    private void setReadFutureAsync(final Topology topology,
+            final SettableFuture<Optional<Topology>> readFuture) {
+        new Thread() {
+            @Override
+            public void run() {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+                readFuture.set(Optional.of(topology));
+            }
+
+        }.start();
+    }
+
+    private void waitForSubmit(CountDownLatch latch) {
+        assertEquals("Transaction submitted", true,
+                Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
+    }
+
+    private void waitForDeletes(int expDeleteCalls, final CountDownLatch latch) {
+        boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
+        if(!done) {
+            fail("Expected " + expDeleteCalls + " delete calls. Actual: " +
+                    (expDeleteCalls - latch.getCount()));
+        }
+    }
+
+    private CountDownLatch setupStubbedSubmit(ReadWriteTransaction mockTx) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(new Answer<CheckedFuture<Void, TransactionCommitFailedException>>() {
+            @Override
+            public CheckedFuture<Void, TransactionCommitFailedException> answer(
+                                                            InvocationOnMock invocation) {
+                latch.countDown();
+                return Futures.immediateCheckedFuture(null);
+            }
+        }).when(mockTx).submit();
+
+        return latch;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void setupStubbedDeletes(ReadWriteTransaction mockTx,
+            ArgumentCaptor<InstanceIdentifier> deletedLinkIDs, final CountDownLatch latch) {
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                latch.countDown();
+                return null;
+            }
+        }).when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), deletedLinkIDs.capture());
+    }
+
+    private org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                        newInvNodeKey(String id) {
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey =
+                new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey(
+                        new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.
+                                                                      rev130819.NodeId(id));
+        return nodeKey;
+    }
+
+    private NodeConnectorKey newInvNodeConnKey(String id) {
+        return new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey(
+                new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.
+                                                               NodeConnectorId(id));
+    }
+
+    private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> newNodeConnID(
+            org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey,
+            org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey) {
+        return InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.
+                        rev130819.node.NodeConnector.class, ncKey);
+    }
+
+    private Link newLink(String id, Source source, Destination dest) {
+        return new LinkBuilder().setLinkId(new LinkId(id))
+                .setSource(source).setDestination(dest).build();
+    }
+
+    private Destination newDestTp(String id) {
+        return new DestinationBuilder().setDestTp(new TpId(id)).build();
+    }
+
+    private Source newSourceTp(String id) {
+        return new SourceBuilder().setSourceTp(new TpId(id)).build();
+    }
+
+    private Destination newDestNode(String id) {
+        return new DestinationBuilder().setDestNode(new NodeId(id)).build();
+    }
+
+    private Source newSourceNode(String id) {
+        return new SourceBuilder().setSourceNode(new NodeId(id)).build();
+    }
+}