Merge "Elide front-end 3PC for single-shard Tx"
authorMoiz Raja <moraja@cisco.com>
Tue, 21 Apr 2015 20:25:27 +0000 (20:25 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 21 Apr 2015 20:25:27 +0000 (20:25 +0000)
41 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataBroker.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExistsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractDOMStoreTreeChangePublisher.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/EditConfig.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/custom/ConfigReader.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcherImpl.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorFactory.java
opendaylight/netconf/netconf-testtool/edit.txt [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/pom.xml
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ConfigurableClientDispatcher.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ExecutionStrategy.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/SyncExecutionStrategy.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XmlUtil.java
opendaylight/netconf/pom.xml

index 157a53ed2d1797b662f6fadccd3acf4027ae442f..4485f3b14457c5f61fe614eaaec226218d0ab3a4 100644 (file)
@@ -127,7 +127,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
             delegatingPersistenceProvider, LOG);
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
     }
 
     @Override
@@ -136,6 +136,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getConfigParams().getJournalRecoveryLogBatchSize());
 
         super.preStart();
+
+        snapshotSupport = newRaftActorSnapshotMessageSupport();
     }
 
     @Override
@@ -177,8 +179,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
-        return new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
-                getRaftActorRecoveryCohort());
+        return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
     }
 
     protected void initializeBehavior(){
@@ -193,15 +194,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleCommand(Object message) {
-        if(snapshotSupport == null) {
-            snapshotSupport = newRaftActorSnapshotMessageSupport();
-        }
-
-        boolean handled = snapshotSupport.handleSnapshotMessage(message);
-        if(handled) {
-            return;
-        }
-
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -235,7 +227,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             );
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else {
+        } else if(!snapshotSupport.handleSnapshotMessage(message)) {
             reusableBehaviorStateHolder.init(getCurrentBehavior());
 
             setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
@@ -245,8 +237,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
-        return new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
-                currentBehavior, getRaftActorSnapshotCohort());
+        return new RaftActorSnapshotMessageSupport(context, currentBehavior,
+                getRaftActorSnapshotCohort());
     }
 
     private void onGetOnDemandRaftStats() {
index 57603a505833af58b99d2916eaa5ed2a09a065c0..ccc1b909a935b4363240839e28b83ae689c1800d 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -24,7 +23,6 @@ import org.slf4j.Logger;
  * @author Thomas Pantelis
  */
 class RaftActorRecoverySupport {
-    private final DataPersistenceProvider persistence;
     private final RaftActorContext context;
     private final RaftActorBehavior currentBehavior;
     private final RaftActorRecoveryCohort cohort;
@@ -34,9 +32,8 @@ class RaftActorRecoverySupport {
     private Stopwatch recoveryTimer;
     private final Logger log;
 
-    RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
-            RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
-        this.persistence = persistence;
+    RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
+            RaftActorRecoveryCohort cohort) {
         this.context = context;
         this.currentBehavior = currentBehavior;
         this.cohort = cohort;
@@ -45,7 +42,7 @@ class RaftActorRecoverySupport {
 
     boolean handleRecoveryMessage(Object message) {
         boolean recoveryComplete = false;
-        if(persistence.isRecoveryApplicable()) {
+        if(context.getPersistenceProvider().isRecoveryApplicable()) {
             if (message instanceof SnapshotOffer) {
                 onRecoveredSnapshot((SnapshotOffer) message);
             } else if (message instanceof ReplicatedLogEntry) {
@@ -97,7 +94,7 @@ class RaftActorRecoverySupport {
         // The replicated log can be used later on to retrieve this snapshot
         // when we need to install it on a peer
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
 
index 790ff89510ab222473a68b8105eab31c32158dd2..cab76e90ce69756f084668bd3f695c12a9a71012 100644 (file)
@@ -10,9 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
@@ -25,7 +23,6 @@ import org.slf4j.Logger;
 class RaftActorSnapshotMessageSupport {
     static final String COMMIT_SNAPSHOT = "commit_snapshot";
 
-    private final DataPersistenceProvider persistence;
     private final RaftActorContext context;
     private final RaftActorBehavior currentBehavior;
     private final RaftActorSnapshotCohort cohort;
@@ -38,13 +35,14 @@ class RaftActorSnapshotMessageSupport {
         }
     };
 
-    RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
-            RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
-        this.persistence = persistence;
+    RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
+            RaftActorSnapshotCohort cohort) {
         this.context = context;
         this.currentBehavior = currentBehavior;
         this.cohort = cohort;
         this.log = context.getLogger();
+
+        context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
     }
 
     boolean handleSnapshotMessage(Object message) {
@@ -57,14 +55,11 @@ class RaftActorSnapshotMessageSupport {
         } else if (message instanceof SaveSnapshotFailure) {
             onSaveSnapshotFailure((SaveSnapshotFailure) message);
             return true;
-        } else if (message instanceof CaptureSnapshot) {
-            onCaptureSnapshot(message);
-            return true;
         } else if (message instanceof CaptureSnapshotReply) {
             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
             return true;
         } else if (message.equals(COMMIT_SNAPSHOT)) {
-            context.getSnapshotManager().commit(persistence, -1);
+            context.getSnapshotManager().commit(-1);
             return true;
         } else {
             return false;
@@ -74,13 +69,7 @@ class RaftActorSnapshotMessageSupport {
     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
         log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
 
-        context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
-    }
-
-    private void onCaptureSnapshot(Object message) {
-        log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
-
-        context.getSnapshotManager().create(createSnapshotProcedure);
+        context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
     }
 
     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
@@ -95,7 +84,7 @@ class RaftActorSnapshotMessageSupport {
 
         long sequenceNumber = success.metadata().sequenceNr();
 
-        context.getSnapshotManager().commit(persistence, sequenceNumber);
+        context.getSnapshotManager().commit(sequenceNumber);
     }
 
     private void onApplySnapshot(Snapshot snapshot) {
@@ -108,8 +97,7 @@ class RaftActorSnapshotMessageSupport {
         cohort.applySnapshot(snapshot.getState());
 
         //clears the followers log, sets the snapshot index to ensure adjusted-index works
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
-                currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
         context.setLastApplied(snapshot.getLastAppliedIndex());
     }
 }
index 1cfe1538699b9748557b385c92c7f81e2b536756..c32839c490eb0b53d69192b2c58f4a141fe422f0 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import java.util.Collections;
 import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
@@ -22,7 +21,6 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     private long dataSizeSinceLastSnapshot = 0L;
     private final RaftActorContext context;
-    private final DataPersistenceProvider persistence;
     private final RaftActorBehavior currentBehavior;
 
     private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
@@ -32,22 +30,20 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     };
 
     static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
-            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+            RaftActorBehavior currentBehavior) {
         return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+                snapshot.getUnAppliedEntries(), context, currentBehavior);
     }
 
-    static ReplicatedLog newInstance(RaftActorContext context,
-            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+    static ReplicatedLog newInstance(RaftActorContext context, RaftActorBehavior currentBehavior) {
         return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
-                persistence, currentBehavior);
+                currentBehavior);
     }
 
     private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
-            RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+            RaftActorContext context, RaftActorBehavior currentBehavior) {
         super(snapshotIndex, snapshotTerm, unAppliedEntries);
         this.context = context;
-        this.persistence = persistence;
         this.currentBehavior = currentBehavior;
     }
 
@@ -56,7 +52,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         // FIXME: Maybe this should be done after the command is saved
         long adjustedIndex = removeFrom(logEntryIndex);
         if(adjustedIndex >= 0) {
-            persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
         }
     }
 
@@ -81,7 +77,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         // persist call and the execution(s) of the associated event
         // handler. This also holds for multiple persist calls in context
         // of a single command.
-        persistence.persist(replicatedLogEntry,
+        context.getPersistenceProvider().persist(replicatedLogEntry,
             new Procedure<ReplicatedLogEntry>() {
                 @Override
                 public void apply(ReplicatedLogEntry evt) throws Exception {
index f4f936bf161b2260f9cae4f397ded3af706f2294..847954816ccd33f1c916ead7641b57200e2303e2 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -21,7 +21,6 @@ import org.slf4j.Logger;
 public class SnapshotManager implements SnapshotState {
 
     private final SnapshotState IDLE = new Idle();
-    private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
     private final SnapshotState CREATING = new Creating();
 
@@ -37,6 +36,8 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
+    private Procedure<Void> createSnapshotProcedure;
+
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
         this.LOG = logger;
@@ -58,19 +59,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void create(Procedure<Void> callback) {
-        currentState.create(callback);
+    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
     }
 
     @Override
-    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                        RaftActorBehavior currentBehavior, long totalMemory) {
-        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
-    }
-
-    @Override
-    public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
-        currentState.commit(persistenceProvider, sequenceNumber);
+    public void commit(long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -83,6 +78,15 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex, currentBehavior);
     }
 
+    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+        this.createSnapshotProcedure = createSnapshotProcedure;
+    }
+
+    @VisibleForTesting
+    public CaptureSnapshot getCaptureSnapshot() {
+        return captureSnapshot;
+    }
+
     private boolean hasFollowers(){
         return context.getPeerAddresses().keySet().size() > 0;
     }
@@ -111,18 +115,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void create(Procedure<Void> callback) {
-            LOG.debug("create should not be called in state {}", this);
-        }
-
-        @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                            RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        public void commit(long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -192,8 +190,6 @@ public class SnapshotManager implements SnapshotState {
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
                     newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
-            SnapshotManager.this.currentState = CAPTURING;
-
             if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
@@ -205,8 +201,14 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
 
-            context.getActor().tell(captureSnapshot, context.getActor());
+            try {
+                createSnapshotProcedure.apply(null);
+            } catch (Exception e) {
+                LOG.error("Error creating snapshot", e);
+                return false;
+            }
 
+            SnapshotManager.this.currentState = CREATING;
             return true;
         }
 
@@ -231,30 +233,6 @@ public class SnapshotManager implements SnapshotState {
         }
     }
 
-    private class Capturing extends AbstractSnapshotState {
-
-        @Override
-        public boolean isCapturing() {
-            return true;
-        }
-
-        @Override
-        public void create(Procedure<Void> callback) {
-            try {
-                callback.apply(null);
-                SnapshotManager.this.currentState = CREATING;
-            } catch (Exception e) {
-                LOG.error("Unexpected error occurred", e);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "Capturing";
-        }
-
-    }
-
     private class Creating extends AbstractSnapshotState {
 
         @Override
@@ -263,8 +241,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                            RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -273,7 +250,7 @@ public class SnapshotManager implements SnapshotState {
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
-            persistenceProvider.saveSnapshot(sn);
+            context.getPersistenceProvider().saveSnapshot(sn);
 
             LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
@@ -339,12 +316,12 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        public void commit(long sequenceNumber) {
             context.getReplicatedLog().snapshotCommit();
-            persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+            context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(lastSequenceNumber);
+            context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
             lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;
index 9a9bf1c774a49c3a87cb285a599da382ecad9931..9949211c63c416d69fb6a97ba6d6ae6bc823c086 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 public interface SnapshotState {
@@ -40,31 +38,21 @@ public interface SnapshotState {
      */
     boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
 
-    /**
-     * Create the snapshot
-     *
-     * @param callback a procedure to be called which should create the snapshot
-     */
-    void create(Procedure<Void> callback);
-
     /**
      * Persist the snapshot
      *
-     * @param persistenceProvider
      * @param snapshotBytes
      * @param currentBehavior
      * @param totalMemory
      */
-    void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
-            ,long totalMemory);
+    void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory);
 
     /**
      * Commit the snapshot by trimming the log
      *
-     * @param persistenceProvider
      * @param sequenceNumber
      */
-    void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+    void commit(long sequenceNumber);
 
     /**
      * Rollback the snapshot
index adf7778fe7daa068d5f6e1bfab0110f6665bc62c..4aa3b2fb4e62349f8c96b06e235b5ea8e7378bcf 100644 (file)
@@ -200,6 +200,7 @@ public class MockRaftActorContext implements RaftActorContext {
     public SnapshotManager getSnapshotManager() {
         if(this.snapshotManager == null){
             this.snapshotManager = new SnapshotManager(this, getLogger());
+            this.snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
         }
         return this.snapshotManager;
     }
index c4e0ef8f5fa5ad2d063cf8bf1bd721a4a5c61b2b..71ebbb4e6a3dec745f01d47b44edafd10ac57bda 100644 (file)
@@ -62,11 +62,11 @@ public class RaftActorRecoverySupportTest {
         context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
 
-        support = new RaftActorRecoverySupport(mockPersistence, context , mockBehavior, mockCohort);
+        support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
 
         doReturn(true).when(mockPersistence).isRecoveryApplicable();
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
     }
 
     private void sendMessageToSupport(Object message) {
index ae9c784a556496a6edb8d14b82fe9e7e315ea61e..7509aa09a234eef3dc6196d43fb8fc7c96e8836d 100644 (file)
@@ -13,7 +13,6 @@ import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
-import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
@@ -21,14 +20,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
@@ -76,11 +73,11 @@ public class RaftActorSnapshotMessageSupportTest {
             }
         };
 
-        support = new RaftActorSnapshotMessageSupport(mockPersistence, context, mockBehavior, mockCohort);
+        support = new RaftActorSnapshotMessageSupport(context, mockBehavior, mockCohort);
 
         doReturn(true).when(mockPersistence).isRecoveryApplicable();
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
     }
 
     private void sendMessageToSupport(Object message) {
@@ -120,27 +117,13 @@ public class RaftActorSnapshotMessageSupportTest {
         verify(mockCohort).applySnapshot(snapshotBytes);
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Test
-    public void testOnCaptureSnapshot() throws Exception {
-
-        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
-
-        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
-        verify(mockSnapshotManager).create(procedure.capture());
-
-        procedure.getValue().apply(null);
-
-        verify(mockCohort).createSnapshot(same(mockRaftActorRef));
-    }
-
     @Test
     public void testOnCaptureSnapshotReply() {
 
         byte[] snapshot = {1,2,3,4,5};
         sendMessageToSupport(new CaptureSnapshotReply(snapshot));
 
-        verify(mockSnapshotManager).persist(same(mockPersistence), same(snapshot), same(mockBehavior), anyLong());
+        verify(mockSnapshotManager).persist(same(snapshot), same(mockBehavior), anyLong());
     }
 
     @Test
@@ -149,7 +132,7 @@ public class RaftActorSnapshotMessageSupportTest {
         long sequenceNumber = 100;
         sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
 
-        verify(mockSnapshotManager).commit(mockPersistence, sequenceNumber);
+        verify(mockSnapshotManager).commit(sequenceNumber);
     }
 
     @Test
@@ -166,7 +149,7 @@ public class RaftActorSnapshotMessageSupportTest {
 
         sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-        verify(mockSnapshotManager).commit(mockPersistence, -1);
+        verify(mockSnapshotManager).commit(-1);
     }
 
     @Test
index 82ebcd1fbd5e4293aa114f6102b0eaf7a0f839fa..e5c8677b0466fa0b282a413ef2291dd0318ce01e 100644 (file)
@@ -472,6 +472,7 @@ public class RaftActorTest extends AbstractActorTest {
                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
             }
 
+            assertNotNull(matches);
             assertEquals(2, matches.size());
 
             // check if the notifier got a role change from null to Follower
@@ -561,13 +562,13 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
 
-                leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
-                        , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+                leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
+                        leader, Runtime.getRuntime().totalMemory());
 
                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -671,7 +672,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
+                followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
index a8f490e75119678fc84084c50c85dec204941b1d..33e5c4bcdf4e324ebd842c651fdbb99e8ad46f59 100644 (file)
@@ -15,7 +15,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -53,7 +52,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         send2InitialPayloads();
 
         // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
@@ -64,13 +62,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
 
         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
-
-        // First, deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
         // Send another payload.
         MockPayload payload4 = sendPayloadData(leaderActor, "four");
 
@@ -102,60 +93,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
     }
 
     @Test
-    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
-
-        send2InitialPayloads();
-
-        // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
-        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
-
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
-
-        // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
-
-        // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
-
-        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
-
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
-
-        // First, deliver the AppendEntries to the follower
-        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
-
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
-
-        // Now deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for snapshot complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
-
-        reinstateLeaderActor();
-
-        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
-        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
-        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
-        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
-        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
-        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
-
-        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
-        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
-        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
-        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
-        // This is a side effect of trimming the persisted log to the sequence number captured at the time
-        // the snapshot was initiated.
-        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
-                payload3, payload4), leaderActor.underlyingActor().getState());
-    }
-
-    @Test
-    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+    public void testStatePersistedAfterSnapshotPersisted() {
 
         send2InitialPayloads();
 
index 92e384e19a3b5e8a99c72d30546e36397192e097..fbe6fc4501f6b6b62b60490036e005e8ca065207 100644 (file)
@@ -85,7 +85,7 @@ public class ReplicatedLogImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testAppendAndPersistExpectingNoCapture() throws Exception {
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
 
         MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
 
@@ -114,7 +114,7 @@ public class ReplicatedLogImplTest {
 
         doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
 
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
 
         MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
         MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
@@ -144,7 +144,7 @@ public class ReplicatedLogImplTest {
             }
         });
 
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
 
         int dataSize = 600;
         MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
@@ -171,7 +171,7 @@ public class ReplicatedLogImplTest {
     @Test
     public void testRemoveFromAndPersist() throws Exception {
 
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
 
         log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
         log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
index c74705d13f6c682f30d2553665a34552f62bbc47..be748f3d26c3ae64d32ebac624e8100972459783 100644 (file)
@@ -16,7 +16,6 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -300,14 +299,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
 
         // Send the payload.
         payload7 = sendPayloadData(leaderActor, "seven");
 
-        // Capture the CaptureSnapshot message so we can send it later.
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
+        // Capture the CaptureSnapshotReply message so we can send it later.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
+                CaptureSnapshotReply.class);
 
         // Wait for the state to be applied in the leader.
         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
@@ -327,12 +326,9 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
 
-        // Now deliver the CaptureSnapshot.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for CaptureSnapshotReply to complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
+        // Now deliver the CaptureSnapshotReply.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -347,8 +343,6 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        expSnapshotState.add(payload7);
-
         // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
         // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
         // when the snapshot is created (ie when the CaptureSnapshot is processed).
@@ -398,6 +392,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
         assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
 
+        expSnapshotState.add(payload7);
+
         testLog.info("testSecondSnapshot ending");
     }
 
index 8ab762f78612a5b540d93512128c4e820b0bbc64..89aadbe0ae0fb4dc0ba7646e24942a7323be0376 100644 (file)
@@ -7,6 +7,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -19,7 +20,6 @@ import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,6 +82,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
         doReturn(actorRef).when(mockRaftActorContext).getActor();
 
+        snapshotManager.setCreateSnapshotCallable(mockProcedure);
     }
 
     @After
@@ -95,7 +96,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCaptureToInstall(){
+    public void testCaptureToInstall() throws Exception {
 
         // Force capturing toInstall = true
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
@@ -103,7 +104,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
+
+        CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
         // LastIndex and LastTerm are picked up from the lastLogEntry
         assertEquals(0L, captureSnapshot.getLastIndex());
@@ -120,7 +123,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCapture(){
+    public void testCapture() throws Exception {
         boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
@@ -128,7 +131,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
+
+        CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
+
         // LastIndex and LastTerm are picked up from the lastLogEntry
         assertEquals(9L, captureSnapshot.getLastIndex());
         assertEquals(1L, captureSnapshot.getLastTerm());
@@ -145,6 +151,20 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testCaptureWithCreateProcedureError () throws Exception {
+        doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertFalse(capture);
+
+        assertEquals(false, snapshotManager.isCapturing());
+
+        verify(mockProcedure).apply(null);
+    }
+
     @Test
     public void testIllegalCapture() throws Exception {
         boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
@@ -152,9 +172,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
 
-        assertEquals(1, allMatching.size());
+        reset(mockProcedure);
 
         // This will not cause snapshot capture to start again
         capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
@@ -162,9 +182,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertFalse(capture);
 
-        allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
-
-        assertEquals(1, allMatching.size());
+        verify(mockProcedure, never()).apply(null);
     }
 
     @Test
@@ -188,11 +206,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(lastLogEntry, -1);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -209,60 +224,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
     }
 
-
-    @Test
-    public void testCreate() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure).apply(null);
-
-        assertEquals("isCapturing", true, snapshotManager.isCapturing());
-    }
-
-    @Test
-    public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(1)).apply(null);
-    }
-
-    @Test
-    public void testCallingCreateBeforeCapture() throws Exception {
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(0)).apply(null);
-    }
-
-    @Test
-    public void testCallingCreateAfterPersist() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(1)).apply(null);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
-
-        reset(mockProcedure);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, never()).apply(null);
-    }
-
     @Test
     public void testPersistWhenReplicatedToAllIndexNotMinus(){
         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
@@ -276,11 +237,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -308,10 +266,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
                 new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -328,12 +283,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
 
-        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -351,8 +303,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCallingPersistWithoutCaptureWillDoNothing(){
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
 
@@ -368,13 +319,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -391,12 +338,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
-
-        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+        snapshotManager.commit(100L);
 
         verify(mockReplicatedLog).snapshotCommit();
 
@@ -417,7 +361,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+        snapshotManager.commit(100L);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -429,7 +373,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCommitBeforeCapture(){
-        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+        snapshotManager.commit(100L);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -447,14 +391,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
-        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+        snapshotManager.commit(100L);
 
-        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+        snapshotManager.commit(100L);
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
@@ -469,10 +410,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -504,10 +442,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
index 26e43648787530c8911bf2139a035df11224f768..c9cec158376faa6484f6c498bdaf41252875e411 100644 (file)
@@ -606,6 +606,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 ApplySnapshot.class);
         Snapshot snapshot = applySnapshot.getSnapshot();
+        assertNotNull(lastInstallSnapshot);
         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
                 snapshot.getLastAppliedTerm());
index ba0bd0f29c96237ab758487b719eac959d115edc..0255020328655dfe0dee151207f66f9a51eddcc5 100644 (file)
@@ -536,7 +536,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
     }
 
     @Test
@@ -584,7 +584,9 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
         assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
@@ -595,8 +597,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
-        assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
     @Test
index cc8deb81b31777ee6dd4aa3b5bf19a97f3ababb5..8b085bae2d283d9761136e4ef37c3ac9545c6058 100644 (file)
@@ -24,7 +24,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
  * @see AsyncDataBroker
  * @see TransactionChainFactory
  */
-public interface DataBroker extends TransactionFactory, AsyncDataBroker<InstanceIdentifier<?>, DataObject, DataChangeListener>, BindingService, TransactionChainFactory<InstanceIdentifier<?>, DataObject> {
+public interface DataBroker extends  AsyncDataBroker<InstanceIdentifier<?>, DataObject, DataChangeListener>,
+    TransactionChainFactory<InstanceIdentifier<?>, DataObject>, TransactionFactory, BindingService, DataTreeChangeService {
     /**
      * {@inheritDoc}
      */
index 52b171c13d78c02c3f3f32a5db02fdeffe44bbbc..59cc03625ae2f450373c712a9d10f84b62a0e68a 100644 (file)
@@ -76,7 +76,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
         input = new DataInputStream(stream);
     }
 
-    public NormalizedNodeInputStreamReader(DataInput input) throws IOException {
+    public NormalizedNodeInputStreamReader(DataInput input) {
         this.input = Preconditions.checkNotNull(input);
     }
 
index 066f01b092d701b08556c5fd7319db4d64a6859c..81125a7152bf562baede534f7b84b2560f4f9bae 100644 (file)
@@ -137,14 +137,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final YangInstanceIdentifier path = message.getPath();
 
         try {
-            Boolean exists = transaction.exists(path).checkedGet();
-            DataExistsReply dataExistsReply = new DataExistsReply(exists);
+            boolean exists = transaction.exists(path).checkedGet();
+            DataExistsReply dataExistsReply = DataExistsReply.create(exists);
             getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
                 dataExistsReply, getSelf());
         } catch (ReadFailedException e) {
             getSender().tell(new akka.actor.Status.Failure(e),getSelf());
         }
-
     }
 
     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
index 24ca6464543911c9d4c6d05fe1f3f2d8731c920e..0ea865aa07e06b8f201955d9bfcac61c27d83725 100644 (file)
@@ -10,28 +10,47 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class DataExistsReply implements SerializableMessage{
+public class DataExistsReply implements SerializableMessage {
     public static final Class<ShardTransactionMessages.DataExistsReply> SERIALIZABLE_CLASS =
             ShardTransactionMessages.DataExistsReply.class;
 
+    private static final DataExistsReply TRUE = new DataExistsReply(true, null);
+    private static final DataExistsReply FALSE = new DataExistsReply(false, null);
+    private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_TRUE =
+            ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build();
+    private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_FALSE =
+            ShardTransactionMessages.DataExistsReply.newBuilder().setExists(false).build();
+
     private final boolean exists;
 
-    public DataExistsReply(final boolean exists) {
+    private DataExistsReply(final boolean exists, final Void dummy) {
         this.exists = exists;
     }
 
+    /**
+     * @deprecated Use {@link #create(boolean)} instead.
+     * @param exists
+     */
+    @Deprecated
+    public DataExistsReply(final boolean exists) {
+        this(exists, null);
+    }
+
+    public static DataExistsReply create(final boolean exists) {
+        return exists ? TRUE : FALSE;
+    }
+
     public boolean exists() {
         return exists;
     }
 
-    @Override public Object toSerializable() {
-        return ShardTransactionMessages.DataExistsReply.newBuilder()
-            .setExists(exists).build();
+    @Override
+    public Object toSerializable() {
+        return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
     }
 
-    public static DataExistsReply fromSerializable(final Object serializable){
+    public static DataExistsReply fromSerializable(final Object serializable) {
         ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
-        return new DataExistsReply(o.getExists());
+        return create(o.getExists());
     }
-
 }
index be965608144cd68dca83b0a2cf8835b82bc30886..abe7f7678c3c817da1ebde3f1e3a579c781fe096 100644 (file)
@@ -248,18 +248,18 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected Future<Object> dataExistsSerializedReply(boolean exists) {
-        return Futures.successful(new DataExistsReply(exists).toSerializable());
+        return Futures.successful(DataExistsReply.create(exists).toSerializable());
     }
 
     protected Future<DataExistsReply> dataExistsReply(boolean exists) {
-        return Futures.successful(new DataExistsReply(exists));
+        return Futures.successful(DataExistsReply.create(exists));
     }
 
     protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
         return Futures.successful(new BatchedModificationsReply(count));
     }
 
-    protected Future<Object> incompleteFuture(){
+    protected Future<Object> incompleteFuture() {
         return mock(Future.class);
     }
 
index e7afe262b96a6f564b5b672f7473b6c6e12d48db..00900d35fb0a29f9a79ef748ca2916610647c748 100644 (file)
@@ -29,10 +29,10 @@ public class OperationCompleterTest {
 
         OperationCompleter completer = new OperationCompleter(operationLimiter );
 
-        completer.onComplete(null, new DataExistsReply(true));
+        completer.onComplete(null, DataExistsReply.create(true));
         assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
 
-        completer.onComplete(null, new DataExistsReply(true));
+        completer.onComplete(null, DataExistsReply.create(true));
         assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
 
         completer.onComplete(null, new IllegalArgumentException());
index f8ac0d849f32845a697f70319aea789949aa2d5b..d191fc397c787ff4120269f7fbdf8c54b24d19ff 100644 (file)
@@ -16,7 +16,6 @@ import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListe
 import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree;
 import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode;
 import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeSnapshot;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -43,7 +42,7 @@ public abstract class AbstractDOMStoreTreeChangePublisher extends AbstractRegist
 
     /**
      * Callback notifying the subclass that the specified registration is being closed and it's user no longer
-     * wishes to receive notifications. This notification is invoked while the {@link ListenerRegistration#close()}
+     * wishes to receive notifications. This notification is invoked while the {@link org.opendaylight.yangtools.concepts.ListenerRegistration#close()}
      * method is executing. Subclasses can use this callback to properly remove any delayed notifications pending
      * towards the registration.
      *
@@ -70,7 +69,7 @@ public abstract class AbstractDOMStoreTreeChangePublisher extends AbstractRegist
     }
 
     @Override
-    public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+    public final <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
         // Take the write lock
         takeLock();
         try {
index b617a8087fc771d6f413b1ec6c3dc4e5bf597aa3..354abcf69fe1040a1e24822b7db78aa6e674909d 100644 (file)
@@ -205,8 +205,12 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
     }
 
     @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
-        return changePublisher.registerTreeChangeListener(treeId, listener);
+    public synchronized <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+        /*
+         * Make sure commit is not occurring right now. Listener has to be
+         * registered and its state capture enqueued at a consistent point.
+         */
+        return changePublisher.registerTreeChangeListener(treeId, listener, dataTree.takeSnapshot());
     }
 
     @Override
@@ -224,7 +228,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private static void warnDebugContext(AbstractDOMStoreTransaction<?> transaction) {
+    private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
         final Throwable ctx = transaction.getDebugContext();
         if (ctx != null) {
             LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
index feb1b66dd3738478af4e663e32123d8b41b8142e..2f3b46366a918b9f92d8791c3cf68d39ae6bb3a1 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import com.google.common.base.Optional;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
@@ -14,12 +15,15 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +48,7 @@ final class InMemoryDOMStoreTreeChangePublisher extends AbstractDOMStoreTreeChan
 
     @Override
     protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations, final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
-        final DataTreeCandidate candidate = new DefaultDataTreeCandidate(path, node);
+        final DataTreeCandidate candidate = DataTreeCandidates.newDataTreeCandidate(path, node);
 
         for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
             LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations);
@@ -59,6 +63,18 @@ final class InMemoryDOMStoreTreeChangePublisher extends AbstractDOMStoreTreeChan
         // FIXME: remove the queue for this registration and make sure we clear it
     }
 
+    <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener, final DataTreeSnapshot snapshot) {
+        final AbstractDOMDataTreeChangeListenerRegistration<L> reg = registerTreeChangeListener(treeId, listener);
+
+        final Optional<NormalizedNode<?, ?>> node = snapshot.readNode(treeId);
+        if (node.isPresent()) {
+            final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(treeId, node.get());
+            notificationManager.submitNotification(reg, candidate);
+        }
+
+        return reg;
+    }
+
     synchronized void publishChange(@Nonnull final DataTreeCandidate candidate) {
         // Runs synchronized with registrationRemoved()
         processCandidateTree(candidate);
index fb4931098b34776cf334af7ef835323e915661a2..07a9fb7a88a7c5c37a3f748d84c7659e825dc9d5 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
@@ -34,7 +35,8 @@ import org.slf4j.LoggerFactory;
  * Computes data change events for all affected registered listeners in data
  * tree.
  */
-final class ResolveDataChangeEventsTask {
+@Beta
+public final class ResolveDataChangeEventsTask {
     private static final Logger LOG = LoggerFactory.getLogger(ResolveDataChangeEventsTask.class);
 
     private final DataTreeCandidate candidate;
@@ -42,7 +44,7 @@ final class ResolveDataChangeEventsTask {
 
     private Multimap<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> collectedEvents;
 
-    public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
+    private ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
         this.candidate = Preconditions.checkNotNull(candidate);
         this.listenerRoot = Preconditions.checkNotNull(listenerTree);
     }
index df990af7e12e9d8fc326e4a3796251813a04d9fa..4257e172b4a5c22847b1f0469e810ab86dca828b 100644 (file)
@@ -11,6 +11,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Type;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -33,6 +34,8 @@ import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlUtils;
 import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
+import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode;
+import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
@@ -111,7 +114,7 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
 
         final List<Element> elements = Collections.singletonList(doc.getDocumentElement());
         final SchemaNode schemaNodeContext = pathContext.getSchemaNode();
-        DataSchemaNode schemaNode = null;
+        DataSchemaNode schemaNode;
         if (schemaNodeContext instanceof RpcDefinition) {
             schemaNode = ((RpcDefinition) schemaNodeContext).getInput();
         } else if (schemaNodeContext instanceof DataSchemaNode) {
@@ -124,12 +127,9 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
         final String schemaNodeName = pathContext.getSchemaNode().getQName().getLocalName();
 
         if (!schemaNodeName.equalsIgnoreCase(docRootElm)) {
-            final Collection<DataSchemaNode> children = ((DataNodeContainer) schemaNode).getChildNodes();
-            for (final DataSchemaNode child : children) {
-                if (child.getQName().getLocalName().equalsIgnoreCase(docRootElm)) {
-                    schemaNode = child;
-                    break;
-                }
+            final DataSchemaNode foundSchemaNode = findSchemaNodeOrParentChoiceByName(schemaNode, docRootElm);
+            if (foundSchemaNode != null) {
+                schemaNode = foundSchemaNode;
             }
         }
 
@@ -137,12 +137,42 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
         final DomToNormalizedNodeParserFactory parserFactory =
                 DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext());
 
+        NormalizedNode<?, ?> parsed = null;
         if(schemaNode instanceof ContainerSchemaNode) {
             return parserFactory.getContainerNodeParser().parse(Collections.singletonList(doc.getDocumentElement()), (ContainerSchemaNode) schemaNode);
         } else if(schemaNode instanceof ListSchemaNode) {
             final ListSchemaNode casted = (ListSchemaNode) schemaNode;
             return parserFactory.getMapEntryNodeParser().parse(elements, casted);
-        } // FIXME : add another DataSchemaNode extensions e.g. LeafSchemaNode
+        } else if (schemaNode instanceof ChoiceSchemaNode) {
+            final ChoiceSchemaNode casted = (ChoiceSchemaNode) schemaNode;
+            return parserFactory.getChoiceNodeParser().parse(elements, casted);
+        }
+
+        // FIXME : add another DataSchemaNode extensions e.g. LeafSchemaNode
+
+        return parsed;
+    }
+
+    private static DataSchemaNode findSchemaNodeOrParentChoiceByName(DataSchemaNode schemaNode, String elementName) {
+        final ArrayList<ChoiceSchemaNode> choiceSchemaNodes = new ArrayList<>();
+        final Collection<DataSchemaNode> children = ((DataNodeContainer) schemaNode).getChildNodes();
+        for (final DataSchemaNode child : children) {
+            if (child instanceof ChoiceSchemaNode) {
+                choiceSchemaNodes.add((ChoiceSchemaNode) child);
+            } else if (child.getQName().getLocalName().equalsIgnoreCase(elementName)) {
+                return child;
+            }
+        }
+
+        for (final ChoiceSchemaNode choiceNode : choiceSchemaNodes) {
+            for (final ChoiceCaseNode caseNode : choiceNode.getCases()) {
+                final DataSchemaNode resultFromRecursion = findSchemaNodeOrParentChoiceByName(caseNode, elementName);
+                if (resultFromRecursion != null) {
+                    // this returns top choice node in which child element is found
+                    return choiceNode;
+                }
+            }
+        }
         return null;
     }
 }
index fbefb5c56d818ffd3e039cea449202530a113e42..a2aa0e20679473951cacd1191097f1368e76bac6 100644 (file)
@@ -23,8 +23,6 @@ import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorT
 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext;
 import org.opendaylight.controller.netconf.mdsal.connector.TransactionProvider;
-import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
-import org.opendaylight.controller.netconf.util.exception.UnexpectedNamespaceException;
 import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -47,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
 
 public class EditConfig extends AbstractSingletonNetconfOperation {
 
@@ -182,35 +181,32 @@ public class EditConfig extends AbstractSingletonNetconfOperation {
             LOG.debug("DataNode from module is not ContainerSchemaNode nor ListSchemaNode, aborting..");
             return Optional.absent();
         }
-
     }
 
     private Datastore extractTargetParameter(final XmlElement operationElement) throws NetconfDocumentedException {
-        final XmlElement targetChildNode;
-        try {
-            final XmlElement targetElement = operationElement.getOnlyChildElementWithSameNamespace(TARGET_KEY);
-            targetChildNode = targetElement.getOnlyChildElementWithSameNamespace();
-        } catch (final MissingNameSpaceException | UnexpectedNamespaceException e) {
-            LOG.trace("Can't get only child element with same namespace", e);
-            throw NetconfDocumentedException.wrap(e);
+        final NodeList elementsByTagName = operationElement.getDomElement().getElementsByTagName(TARGET_KEY);
+        // Direct lookup instead of using XmlElement class due to performance
+        if (elementsByTagName.getLength() == 0) {
+            throw new NetconfDocumentedException("Missing target element", ErrorType.rpc, ErrorTag.missing_attribute, ErrorSeverity.error);
+        } else if (elementsByTagName.getLength() > 1) {
+            throw new NetconfDocumentedException("Multiple target elements", ErrorType.rpc, ErrorTag.unknown_attribute, ErrorSeverity.error);
+        } else {
+            final XmlElement targetChildNode = XmlElement.fromDomElement((Element) elementsByTagName.item(0)).getOnlyChildElement();
+            return Datastore.valueOf(targetChildNode.getName());
         }
-
-        return Datastore.valueOf(targetChildNode.getName());
     }
 
-    private ModifyAction getDefaultOperation(final XmlElement operationElement) throws NetconfDocumentedException{
-        try {
-            return ModifyAction.fromXmlValue(getElement(operationElement, DEFAULT_OPERATION_KEY).getTextContent());
-        } catch (NetconfDocumentedException e) {
-            if (e.getErrorType() == ErrorType.protocol
-                    && e.getErrorSeverity() == ErrorSeverity.error
-                    && e.getErrorTag() == ErrorTag.missing_element) {
-                return ModifyAction.MERGE;
-            }
-            else {
-                throw e;
-            }
+    private ModifyAction getDefaultOperation(final XmlElement operationElement) throws NetconfDocumentedException {
+        final NodeList elementsByTagName = operationElement.getDomElement().getElementsByTagName(DEFAULT_OPERATION_KEY);
+        if(elementsByTagName.getLength() == 0) {
+            return ModifyAction.MERGE;
+        } else if(elementsByTagName.getLength() > 1) {
+            throw new NetconfDocumentedException("Multiple " + DEFAULT_OPERATION_KEY + " elements",
+                    ErrorType.rpc, ErrorTag.unknown_attribute, ErrorSeverity.error);
+        } else {
+            return ModifyAction.fromXmlValue(elementsByTagName.item(0).getTextContent());
         }
+
     }
 
     private XmlElement getElement(final XmlElement operationElement, String elementName) throws NetconfDocumentedException {
index 111a2420c5619a5b464c46b749d295d7a0b25bfb..75582ed72df6061e0a9357394e86bca819be0ae4 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.netconf.cli.reader.custom;
 
 import static org.opendaylight.controller.netconf.cli.io.IOUtil.isSkipInput;
-
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -34,8 +33,10 @@ import org.opendaylight.controller.netconf.cli.reader.AbstractReader;
 import org.opendaylight.controller.netconf.cli.reader.ReadingException;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
@@ -89,7 +90,7 @@ public class ConfigReader extends AbstractReader<DataSchemaNode> {
             filterPartsQNames.add(qName);
         }
 
-        List<NormalizedNode<?, ?>> previous = readInnerNode(rawValue);
+        List<? extends NormalizedNode<?, ?>> previous = readInnerNode(rawValue);
 
         for (final QName qName : Lists.reverse(filterPartsQNames).subList(1, filterPartsQNames.size())) {
             previous = Collections.<NormalizedNode<?, ?>>singletonList(
@@ -99,12 +100,15 @@ public class ConfigReader extends AbstractReader<DataSchemaNode> {
             );
         }
 
-        final DataContainerChild<?, ?> newNode = previous == null ? null
-                : ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new NodeIdentifier(schemaNode.getQName()))
-                        .withValue((Collection) previous).build();
+        if (previous == null) {
+            return Collections.singletonList(null);
+        }
+
+        final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> builder = ImmutableContainerNodeBuilder.create();
+        builder.withNodeIdentifier(new NodeIdentifier(schemaNode.getQName()));
+        builder.withValue((Collection<DataContainerChild<?, ?>>) previous);
 
-        return Collections.<NormalizedNode<?, ?>> singletonList(newNode);
+        return Collections.<NormalizedNode<?, ?>> singletonList(builder.build());
     }
 
     private List<NormalizedNode<?, ?>> readInnerNode(final String pathString) throws ReadingException {
index 5d584f3b987f90ca65920a9befcb945616b1b8c7..039900327b388c9981d318420b3f1a7c9ab1cb7a 100644 (file)
@@ -32,6 +32,10 @@ public class NetconfClientDispatcherImpl extends AbstractDispatcher<NetconfClien
         this.timer = timer;
     }
 
+    protected Timer getTimer() {
+        return timer;
+    }
+
     @Override
     public Future<NetconfClientSession> createClient(final NetconfClientConfiguration clientConfiguration) {
         switch (clientConfiguration.getProtocol()) {
index ac13729d885fbf5b83ff21d4e9a90e91ab2dfa64..4c5fd1d1ec040ef6218e36d619c47409cddbb8d8 100644 (file)
@@ -33,11 +33,22 @@ import org.slf4j.LoggerFactory;
 
 public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfClientSession, NetconfClientSessionListener> {
 
-    public static final Set<String> CLIENT_CAPABILITIES = ImmutableSet.of(
+    public static final Set<String> EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
 
+    public static final Set<String> LEGACY_EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
+
+    public static final Set<String> DEFAULT_CLIENT_CAPABILITIES = ImmutableSet.of(
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
+
+    public static final Set<String> LEGACY_FRAMING_CLIENT_CAPABILITIES = ImmutableSet.of(
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0);
+
     private static final Logger LOG = LoggerFactory.getLogger(NetconfClientSessionNegotiatorFactory.class);
     private static final String START_EXI_MESSAGE_ID = "default-start-exi";
     private static final EXIOptions DEFAULT_OPTIONS;
@@ -61,19 +72,35 @@ public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorF
         DEFAULT_OPTIONS = opts;
     }
 
+    private final Set<String> clientCapabilities;
+
     public NetconfClientSessionNegotiatorFactory(final Timer timer,
                                                  final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
                                                  final long connectionTimeoutMillis) {
         this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS);
     }
 
+    public NetconfClientSessionNegotiatorFactory(final Timer timer,
+                                                 final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+                                                 final long connectionTimeoutMillis, final Set<String> capabilities) {
+        this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS, capabilities);
+
+    }
+
     public NetconfClientSessionNegotiatorFactory(final Timer timer,
                                                  final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
                                                  final long connectionTimeoutMillis, final EXIOptions exiOptions) {
+        this(timer, additionalHeader, connectionTimeoutMillis, exiOptions, EXI_CLIENT_CAPABILITIES);
+    }
+
+    public NetconfClientSessionNegotiatorFactory(final Timer timer,
+                                                 final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+                                                 final long connectionTimeoutMillis, final EXIOptions exiOptions, final Set<String> capabilities) {
         this.timer = Preconditions.checkNotNull(timer);
         this.additionalHeader = additionalHeader;
         this.connectionTimeoutMillis = connectionTimeoutMillis;
         this.options = exiOptions;
+        this.clientCapabilities = capabilities;
     }
 
     @Override
@@ -84,9 +111,9 @@ public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorF
         NetconfMessage startExiMessage = NetconfStartExiMessage.create(options, START_EXI_MESSAGE_ID);
         NetconfHelloMessage helloMessage = null;
         try {
-            helloMessage = NetconfHelloMessage.createClientHello(CLIENT_CAPABILITIES, additionalHeader);
+            helloMessage = NetconfHelloMessage.createClientHello(clientCapabilities, additionalHeader);
         } catch (NetconfDocumentedException e) {
-            LOG.error("Unable to create client hello message with capabilities {} and additional handler {}",CLIENT_CAPABILITIES,additionalHeader);
+            LOG.error("Unable to create client hello message with capabilities {} and additional handler {}", clientCapabilities,additionalHeader);
             throw new IllegalStateException(e);
         }
 
diff --git a/opendaylight/netconf/netconf-testtool/edit.txt b/opendaylight/netconf/netconf-testtool/edit.txt
new file mode 100644 (file)
index 0000000..1e7bbb6
--- /dev/null
@@ -0,0 +1,7 @@
+<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+<module>
+<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
+<name>name{MSG_ID}</name>
+<name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
+</module>
+</modules>
\ No newline at end of file
index 6548d87b49bf827d20c94e5ea60316b65b78f8ef..f15dcc341714c2e6f22e729bd585d094bb8a193f 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>netconf-connector-config</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-netconf-connector</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>logback-config</artifactId>
                             <shadedClassifierName>executable</shadedClassifierName>
                         </configuration>
                     </execution>
+
+                    <execution>
+                        <id>stress-client</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <shadedArtifactId>stress-client</shadedArtifactId>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.opendaylight.controller.netconf.test.tool.client.stress.StressClient</mainClass>
+                                </transformer>
+                            </transformers>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <shadedClassifierName>executable</shadedClassifierName>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
         </plugins>
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java
new file mode 100644 (file)
index 0000000..7b60a17
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncExecutionStrategy implements ExecutionStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class);
+
+    private final Parameters params;
+    private final List<NetconfMessage> preparedMessages;
+    private final NetconfDeviceCommunicator sessionListener;
+    private final List<Integer> editBatches;
+
+    public AsyncExecutionStrategy(final Parameters params, final List<NetconfMessage> editConfigMsgs, final NetconfDeviceCommunicator sessionListener) {
+        this.params = params;
+        this.preparedMessages = editConfigMsgs;
+        this.sessionListener = sessionListener;
+        this.editBatches = countEditBatchSizes(params);
+    }
+
+    private static List<Integer> countEditBatchSizes(final Parameters params) {
+        final List<Integer> editBatches = Lists.newArrayList();
+        if (params.editBatchSize != params.editCount) {
+            final int fullBatches = params.editCount / params.editBatchSize;
+            for (int i = 0; i < fullBatches; i++) {
+                editBatches.add(params.editBatchSize);
+            }
+
+            if (params.editCount % params.editBatchSize != 0) {
+                editBatches.add(params.editCount % params.editBatchSize);
+            }
+        } else {
+            editBatches.add(params.editBatchSize);
+        }
+        return editBatches;
+    }
+
+    @Override
+    public void invoke() {
+        final AtomicInteger responseCounter = new AtomicInteger(0);
+        final List<ListenableFuture<RpcResult<NetconfMessage>>> futures = Lists.newArrayList();
+
+        int batchI = 0;
+        for (final Integer editBatch : editBatches) {
+            for (int i = 0; i < editBatch; i++) {
+                final int msgId = i + (batchI * params.editBatchSize);
+                final NetconfMessage msg = preparedMessages.get(msgId);
+                LOG.debug("Sending message {}", msgId);
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+                }
+                final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+                        sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+                futures.add(netconfMessageFuture);
+            }
+            batchI++;
+            LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+            futures.add(sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+        }
+
+        LOG.info("All batches sent. Waiting for responses");
+        // Wait for every future
+        for (final ListenableFuture<RpcResult<NetconfMessage>> future : futures) {
+            try {
+                final RpcResult<NetconfMessage> netconfMessageRpcResult = future.get(params.msgTimeout, TimeUnit.SECONDS);
+                if(netconfMessageRpcResult.isSuccessful()) {
+                    responseCounter.incrementAndGet();
+                    LOG.debug("Received response {}", responseCounter.get());
+                } else {
+                    LOG.warn("Request failed {}", netconfMessageRpcResult);
+                }
+            } catch (final InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (final ExecutionException | TimeoutException e) {
+                throw new RuntimeException("Request not finished", e);
+            }
+        }
+
+        Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+    }
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ConfigurableClientDispatcher.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ConfigurableClientDispatcher.java
new file mode 100644 (file)
index 0000000..2d96d8f
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.Timer;
+import java.util.Set;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionNegotiatorFactory;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+
+public class ConfigurableClientDispatcher extends NetconfClientDispatcherImpl {
+
+    private final Set<String> capabilities;
+
+    private ConfigurableClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer, final Set<String> capabilities) {
+        super(bossGroup, workerGroup, timer);
+        this.capabilities = capabilities;
+    }
+
+    /**
+     * EXI + chunked framing
+     */
+    public static ConfigurableClientDispatcher createChunkedExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.EXI_CLIENT_CAPABILITIES);
+    }
+
+    /**
+     * EXI + ]]>]]> framing
+     */
+    public static ConfigurableClientDispatcher createLegacyExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_EXI_CLIENT_CAPABILITIES);
+    }
+
+    /**
+     * Chunked framing
+     */
+    public static ConfigurableClientDispatcher createChunked(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.DEFAULT_CLIENT_CAPABILITIES);
+    }
+
+    /**
+     * ]]>]]> framing
+     */
+    public static ConfigurableClientDispatcher createLegacy(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_FRAMING_CLIENT_CAPABILITIES);
+    }
+
+    @Override
+    protected NetconfClientSessionNegotiatorFactory getNegotiatorFactory(final NetconfClientConfiguration cfg) {
+        return new NetconfClientSessionNegotiatorFactory(getTimer(), cfg.getAdditionalHeader(), cfg.getConnectionTimeoutMillis(), capabilities);
+    }
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ExecutionStrategy.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ExecutionStrategy.java
new file mode 100644 (file)
index 0000000..dc2ddf1
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+/**
+ * Created by mmarsale on 18.4.2015.
+ */
+public interface ExecutionStrategy {
+    void invoke();
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java
new file mode 100644 (file)
index 0000000..6648bd4
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.annotation.Arg;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+
+public class Parameters {
+
+    @Arg(dest = "ip")
+    public String ip;
+
+    @Arg(dest = "port")
+    public int port;
+
+    @Arg(dest = "edit-count")
+    public int editCount;
+
+    @Arg(dest = "edit-content")
+    public File editContent;
+
+    @Arg(dest = "edit-batch-size")
+    public int editBatchSize;
+
+    @Arg(dest = "debug")
+    public boolean debug;
+
+    @Arg(dest = "legacy-framing")
+    public boolean legacyFraming;
+
+    @Arg(dest = "exi")
+    public boolean exi;
+
+    @Arg(dest = "async")
+    public boolean async;
+
+    @Arg(dest = "ssh")
+    public boolean ssh;
+
+    @Arg(dest = "msg-timeout")
+    public long msgTimeout;
+
+    static ArgumentParser getParser() {
+        final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
+
+        parser.description("Netconf stress client");
+
+        parser.addArgument("--ip")
+                .type(String.class)
+                .setDefault("127.0.0.1")
+                .type(String.class)
+                .help("Netconf server IP")
+                .dest("ip");
+
+        parser.addArgument("--port")
+                .type(Integer.class)
+                .setDefault(2830)
+                .type(Integer.class)
+                .help("Netconf server port")
+                .dest("port");
+
+        parser.addArgument("--edits")
+                .type(Integer.class)
+                .setDefault(50000)
+                .type(Integer.class)
+                .help("Netconf edit rpcs to be sent")
+                .dest("edit-count");
+
+        parser.addArgument("--edit-content")
+                .type(File.class)
+                .setDefault(new File("edit.txt"))
+                .type(File.class)
+                .dest("edit-content");
+
+        parser.addArgument("--edit-batch-size")
+                .type(Integer.class)
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .dest("edit-batch-size");
+
+        parser.addArgument("--debug")
+                .type(Boolean.class)
+                .setDefault(false)
+                .help("Whether to use debug log level instead of INFO")
+                .dest("debug");
+
+        parser.addArgument("--legacy-framing")
+                .type(Boolean.class)
+                .setDefault(false)
+                .dest("legacy-framing");
+
+        parser.addArgument("--exi")
+                .type(Boolean.class)
+                .setDefault(false)
+                .dest("exi");
+
+        parser.addArgument("--async-requests")
+                .type(Boolean.class)
+                .setDefault(true)
+                .dest("async");
+
+        parser.addArgument("--msg-timeout")
+                .type(Integer.class)
+                .setDefault(60)
+                .dest("msg-timeout");
+
+        parser.addArgument("--ssh")
+                .type(Boolean.class)
+                .setDefault(false)
+                .dest("ssh");
+
+        // TODO add get-config option instead of edit + commit
+        // TODO different edit config content
+
+        return parser;
+    }
+
+    void validate() {
+        Preconditions.checkArgument(port > 0, "Port =< 0");
+        Preconditions.checkArgument(editCount > 0, "Edit count =< 0");
+        if (editBatchSize == -1) {
+            editBatchSize = editCount;
+        } else {
+            Preconditions.checkArgument(editBatchSize <= editCount, "Edit count =< 0");
+        }
+
+        Preconditions.checkArgument(editContent.exists(), "Edit content file missing");
+        Preconditions.checkArgument(editContent.isDirectory() == false, "Edit content file is a dir");
+        Preconditions.checkArgument(editContent.canRead(), "Edit content file is unreadable");
+        // TODO validate
+    }
+
+    public InetSocketAddress getInetAddress() {
+        try {
+            return new InetSocketAddress(InetAddress.getByName(ip), port);
+        } catch (final UnknownHostException e) {
+            throw new IllegalArgumentException("Unknown ip", e);
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java
new file mode 100644 (file)
index 0000000..fe0a0bc
--- /dev/null
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import ch.qos.logback.classic.Level;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.xml.sax.SAXException;
+
+public final class StressClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StressClient.class);
+
+    static final QName COMMIT_QNAME = QName.create(CommitInput.QNAME, "commit");
+    public static final NetconfMessage COMMIT_MSG;
+
+    static {
+        try {
+            COMMIT_MSG = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"commit-batch\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                    "    <commit/>\n" +
+                    "</rpc>"));
+        } catch (SAXException | IOException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    static final QName EDIT_QNAME = QName.create(EditConfigInput.QNAME, "edit-config");
+    static final org.w3c.dom.Document editBlueprint;
+
+    static {
+        try {
+            editBlueprint = XmlUtil.readXmlToDocument(
+                    "<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                            "    <edit-config xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                            "        <target>\n" +
+                            "            <candidate/>\n" +
+                            "        </target>\n" +
+                            "        <config/>\n" +
+                            "    </edit-config>\n" +
+                            "</rpc>");
+        } catch (SAXException | IOException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}";
+    private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}";
+
+    public static void main(final String[] args) {
+        final Parameters params = parseArgs(args, Parameters.getParser());
+        params.validate();
+
+        // TODO remove
+        try {
+            Thread.sleep(10000);
+        } catch (final InterruptedException e) {
+//            e.printStackTrace();
+        }
+
+        final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+        root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
+
+        LOG.info("Preparing messages");
+        // Prepare all msgs up front
+        final List<NetconfMessage> preparedMessages = Lists.newArrayListWithCapacity(params.editCount);
+
+        final String editContentString;
+        boolean needsModification = false;
+        try {
+            editContentString = Files.toString(params.editContent, Charsets.UTF_8);
+            if(editContentString.contains(MSG_ID_PLACEHOLDER)) {
+                needsModification = true;
+            };
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Cannot read content of " + params.editContent);
+        }
+
+        for (int i = 0; i < params.editCount; i++) {
+            final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
+            msg.getDocumentElement().setAttribute("message-id", Integer.toString(i));
+            final NetconfMessage netconfMessage = new NetconfMessage(msg);
+
+            final Element editContentElement;
+            try {
+                // Insert message id where needed
+                final String specificEditContent = needsModification ?
+                        editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) :
+                        editContentString;
+
+                editContentElement = XmlUtil.readXmlToElement(specificEditContent);
+                final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
+                        getElementsByTagName("config").item(0);
+                config.appendChild(msg.importNode(editContentElement, true));
+            } catch (final IOException | SAXException e) {
+                throw new IllegalArgumentException("Edit content file is unreadable", e);
+            }
+
+            preparedMessages.add(netconfMessage);
+
+        }
+
+
+        final NioEventLoopGroup nioGroup = new NioEventLoopGroup();
+        final Timer timer = new HashedWheelTimer();
+
+        final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer);
+
+        final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress());
+
+        final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener);
+
+        LOG.info("Connecting to netconf server {}:{}", params.ip, params.port);
+        final NetconfClientSession netconfClientSession;
+        try {
+            netconfClientSession = netconfClientDispatcher.createClient(cfg).get();
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (final ExecutionException e) {
+            throw new RuntimeException("Unable to connect", e);
+        }
+
+        LOG.info("Starting stress test");
+        final Stopwatch started = Stopwatch.createStarted();
+        getExecutionStrategy(params, preparedMessages, sessionListener).invoke();
+        started.stop();
+
+        LOG.info("FINISHED. Execution time: {}", started);
+        LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS)));
+
+        // Cleanup
+        netconfClientSession.close();
+        timer.stop();
+        try {
+            nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.warn("Unable to close executor properly", e);
+        }
+    }
+
+    private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+        if(params.async) {
+            return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
+        } else {
+            return new SyncExecutionStrategy(params, preparedMessages, sessionListener);
+        }
+    }
+
+    private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) {
+        final NetconfClientDispatcherImpl netconfClientDispatcher;
+        if(params.exi) {
+            if(params.legacyFraming) {
+                netconfClientDispatcher= ConfigurableClientDispatcher.createLegacyExi(nioGroup, nioGroup, timer);
+            } else {
+                netconfClientDispatcher = ConfigurableClientDispatcher.createChunkedExi(nioGroup, nioGroup, timer);
+            }
+        } else {
+            if(params.legacyFraming) {
+                netconfClientDispatcher = ConfigurableClientDispatcher.createLegacy(nioGroup, nioGroup, timer);
+            } else {
+                netconfClientDispatcher = ConfigurableClientDispatcher.createChunked(nioGroup, nioGroup, timer);
+            }
+        }
+        return netconfClientDispatcher;
+    }
+
+    private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {
+        final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
+        netconfClientConfigurationBuilder.withSessionListener(sessionListener);
+        netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
+        netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
+        netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
+        netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+        return netconfClientConfigurationBuilder.build();
+    }
+
+    static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new LoggingRemoteDevice();
+        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+    }
+
+    private static Parameters parseArgs(final String[] args, final ArgumentParser parser) {
+        final Parameters opt = new Parameters();
+        try {
+            parser.parseArgs(args, opt);
+            return opt;
+        } catch (final ArgumentParserException e) {
+            parser.handleError(e);
+        }
+
+        System.exit(1);
+        return null;
+    }
+
+
+    private static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+        @Override
+        public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) {
+            LOG.info("Session established");
+        }
+
+        @Override
+        public void onRemoteSessionDown() {
+            LOG.info("Session down");
+        }
+
+        @Override
+        public void onRemoteSessionFailed(final Throwable throwable) {
+            LOG.info("Session failed");
+        }
+
+        @Override
+        public void onNotification(final NetconfMessage notification) {
+            LOG.info("Notification received: {}", notification.toString());
+        }
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/SyncExecutionStrategy.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/SyncExecutionStrategy.java
new file mode 100644 (file)
index 0000000..34142a7
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO reuse code from org.opendaylight.controller.netconf.test.tool.client.stress.AsyncExecutionStrategy
+class SyncExecutionStrategy implements ExecutionStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(SyncExecutionStrategy.class);
+
+    private final Parameters params;
+    private final List<NetconfMessage> preparedMessages;
+    private final NetconfDeviceCommunicator sessionListener;
+    private final List<Integer> editBatches;
+
+    public SyncExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+        this.params = params;
+        this.preparedMessages = preparedMessages;
+        this.sessionListener = sessionListener;
+        editBatches = countEditBatchSizes(params);
+    }
+
+    private static List<Integer> countEditBatchSizes(final Parameters params) {
+        final List<Integer> editBatches = Lists.newArrayList();
+        if (params.editBatchSize != params.editCount) {
+            final int fullBatches = params.editCount / params.editBatchSize;
+            for (int i = 0; i < fullBatches; i++) {
+                editBatches.add(params.editBatchSize);
+            }
+
+            if (params.editCount % params.editBatchSize != 0) {
+                editBatches.add(params.editCount % params.editBatchSize);
+            }
+        } else {
+            editBatches.add(params.editBatchSize);
+        }
+        return editBatches;
+    }
+
+    public void invoke() {
+        final AtomicInteger responseCounter = new AtomicInteger(0);
+
+        int batchI = 0;
+        for (final Integer editBatch : editBatches) {
+            for (int i = 0; i < editBatch; i++) {
+                final int msgId = i + (batchI * params.editBatchSize);
+                final NetconfMessage msg = preparedMessages.get(msgId);
+                LOG.debug("Sending message {}", msgId);
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+                }
+                final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+                        sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+                // Wait for response
+                waitForResponse(responseCounter, netconfMessageFuture);
+
+            }
+            batchI++;
+            LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+
+            // Commit batch sync
+            waitForResponse(responseCounter,
+                    sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+        }
+
+        Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+    }
+
+    private void waitForResponse(AtomicInteger responseCounter, final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture) {
+        try {
+            final RpcResult<NetconfMessage> netconfMessageRpcResult =
+                    netconfMessageFuture.get(params.msgTimeout, TimeUnit.SECONDS);
+            if (netconfMessageRpcResult.isSuccessful()) {
+                responseCounter.incrementAndGet();
+                LOG.debug("Received response {}", responseCounter.get());
+            } else {
+                LOG.warn("Request failed {}", netconfMessageRpcResult);
+            }
+
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (final ExecutionException | TimeoutException e) {
+            throw new RuntimeException("Request not finished", e);
+        }
+    }
+}
index 4ae65f31f72a6240e92d85be49e07c4f70b3115d..6408fce5d39b199c1105807619bf9e4e0198c4f6 100644 (file)
@@ -55,6 +55,9 @@ public final class XmlUtil {
             factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
             factory.setXIncludeAware(false);
             factory.setExpandEntityReferences(false);
+            // Performance improvement for messages with size <10k according to
+            // https://xerces.apache.org/xerces2-j/faq-performance.html
+            factory.setFeature("http://apache.org/xml/features/dom/defer-node-expansion", false);
         } catch (ParserConfigurationException e) {
             throw new ExceptionInInitializerError(e);
         }
index a990b5c6cb935d1139f54353a5282329f9fd3b0e..ab92128dbd2a30e914d58cf1b4e53dedf6c1d995 100644 (file)
           <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <sourceDirectory>${project.basedir}</sourceDirectory>
           <includes>**\/*.java,**\/*.xml,**\/*.ini,**\/*.sh,**\/*.bat,**\/*.yang</includes>
-          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java</excludes>
+          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java, **\/netconf\/test\/tool\/client\/stress\/StressClient.java</excludes>
         </configuration>
         <dependencies>
           <dependency>