Merge "Adding sal-compatibilty feature"
authorMadhu Venugopal <mavenugo@gmail.com>
Wed, 27 Aug 2014 10:45:26 +0000 (10:45 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 27 Aug 2014 10:45:26 +0000 (10:45 +0000)
39 files changed:
opendaylight/md-sal/model/model-flow-service/src/main/yang/flow-node-inventory.yang
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/samples/toaster-provider/pom.xml
opendaylight/md-sal/samples/toaster-provider/src/test/java/org/opendaylight/controller/sample/toaster/provider/OpenDaylightToasterTest.java [new file with mode: 0644]
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/attributes/fromxml/AttributeConfigElement.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/attributes/fromxml/CompositeAttributeReadingStrategy.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/attributes/fromxml/ObjectXmlReader.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/Get.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringActivator.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringServiceTracker.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/xml/JaxBSerializer.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/xml/model/NetconfState.java
opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/GetTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/xml/JaxBSerializerTest.java
opendaylight/netconf/netconf-netty-util/pom.xml
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfHelloMessageToXMLEncoder.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfXMLToMessageDecoder.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/authentication/AuthenticationHandler.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/authentication/LoginPassword.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/ChannelInputStream.java [deleted file]
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/ChannelOutputStream.java [deleted file]
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/VirtualSocket.java [deleted file]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ChunkedFramingMechanismEncoderTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/EOMFramingMechanismEncoderTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/FramingMechanismHandlerFactoryTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfChunkAggregatorTest.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfHelloMessageToXMLEncoderTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfXMLToHelloMessageDecoderTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfXMLToMessageDecoderTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/authentication/LoginPasswordTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolMemberAware.java

index 605cb9004a65235dd6cf66b0ff065514f54ad183..64c3d9c467bdeeae7726f4f462ab54a75f5e7775 100644 (file)
@@ -122,10 +122,15 @@ module flow-node-inventory {
             uses meter:meter;
         }
     }
-    
-    
-    grouping flow-node {
 
+    grouping ip-address-grouping {
+        leaf ip-address {
+            description "IP address of a flow capable node.";
+            type inet:ip-address;
+        }
+    }
+
+    grouping flow-node {
         leaf manufacturer {
             type string;
         }
@@ -145,6 +150,7 @@ module flow-node-inventory {
         uses tables;
         uses group:groups;
         uses meters;
+        uses ip-address-grouping;
         // TODO: ports
         
         container supported-match-types {
@@ -197,7 +203,16 @@ module flow-node-inventory {
             
         }
     }
-    
+
+    rpc get-node-ip-address {
+        input {
+            uses "inv:node-context-ref";
+        }
+        output {
+            uses ip-address-grouping;
+        }
+    }
+
     grouping flow-node-connector {
 
         uses port:flow-capable-port;
index f843b23f9b2cc9d15a55c2d9a4373cde3baffecc..53f96e44f4105b8ecd3f2125be2360bd54a0b0c4 100644 (file)
@@ -9,15 +9,14 @@ package org.opendaylight.controller.md.sal.binding.impl;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import java.util.ArrayList;
+
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
@@ -88,7 +87,7 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
             final Map<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized) {
         Map<InstanceIdentifier<?>, DataObject> newMap = new HashMap<>();
 
-        for (Map.Entry<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : sortedEntries(normalized)) {
+        for (Map.Entry<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : normalized.entrySet()) {
             try {
                 Optional<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> potential = getCodec().toBinding(entry);
                 if (potential.isPresent()) {
@@ -102,38 +101,6 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
         return newMap;
     }
 
-    private static final Comparator<Entry<YangInstanceIdentifier, ?>> MAP_ENTRY_COMPARATOR = new Comparator<Entry<YangInstanceIdentifier, ?>>() {
-        @Override
-        public int compare(final Entry<YangInstanceIdentifier, ?> left, final Entry<YangInstanceIdentifier, ?> right) {
-            final Iterator<?> li = left.getKey().getPathArguments().iterator();
-            final Iterator<?> ri = right.getKey().getPathArguments().iterator();
-
-            // Iterate until left is exhausted...
-            while (li.hasNext()) {
-                if (!ri.hasNext()) {
-                    // Left is deeper
-                    return 1;
-                }
-
-                li.next();
-                ri.next();
-            }
-
-            // Check if right is exhausted
-            return ri.hasNext() ? -1 : 0;
-        }
-    };
-
-    private static <T> Iterable<Entry<YangInstanceIdentifier, T>> sortedEntries(final Map<YangInstanceIdentifier, T> map) {
-        if (!map.isEmpty()) {
-            ArrayList<Entry<YangInstanceIdentifier, T>> entries = new ArrayList<>(map.entrySet());
-            Collections.sort(entries, MAP_ENTRY_COMPARATOR);
-            return entries;
-        } else {
-            return Collections.emptySet();
-        }
-    }
-
     protected Set<InstanceIdentifier<?>> toBinding(final InstanceIdentifier<?> path,
             final Set<YangInstanceIdentifier> normalized) {
         Set<InstanceIdentifier<?>> hashSet = new HashSet<>();
index 9ad35fcdda85347de6a7680c4f91dabbeed8e2a7..d7314e5362873b0db2e3fa6a2a20b93f527725d2 100644 (file)
@@ -13,6 +13,8 @@ import com.google.common.base.Optional;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import javax.annotation.Nonnull;
+
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
@@ -21,6 +23,7 @@ import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadi
 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
@@ -43,20 +46,19 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener,AutoC
 
     }
 
-    public org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier toNormalized(
-            final InstanceIdentifier<? extends DataObject> binding) {
+    public YangInstanceIdentifier toNormalized(final InstanceIdentifier<? extends DataObject> binding) {
         return codecRegistry.toYangInstanceIdentifier(binding);
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
+    public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
             final InstanceIdentifier<? extends DataObject> bindingPath, final DataObject bindingObject) {
         return codecRegistry.toNormalizedNode((InstanceIdentifier) bindingPath, bindingObject);
 
     }
 
-    public Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
-            final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> binding) {
+    public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
+            final Entry<InstanceIdentifier<? extends DataObject>, DataObject> binding) {
         return toNormalizedNode(binding.getKey(),binding.getValue());
     }
 
@@ -69,11 +71,10 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener,AutoC
      * augmentation.
      *
      */
-    public Optional<InstanceIdentifier<? extends DataObject>> toBinding(
-            final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier normalized)
+    public Optional<InstanceIdentifier<? extends DataObject>> toBinding(final YangInstanceIdentifier normalized)
                     throws DeserializationException {
         try {
-            return  Optional.<InstanceIdentifier<? extends DataObject>>of(codecRegistry.fromYangInstanceIdentifier(normalized));
+            return Optional.<InstanceIdentifier<? extends DataObject>>fromNullable(codecRegistry.fromYangInstanceIdentifier(normalized));
         } catch (IllegalArgumentException e) {
             return Optional.absent();
         }
@@ -83,14 +84,26 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener,AutoC
         return legacyToNormalized;
     }
 
-    @SuppressWarnings("unchecked")
-    public Optional<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
-            final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
+    public Optional<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
+            final @Nonnull Entry<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
                     throws DeserializationException {
         try {
-            @SuppressWarnings("rawtypes")
-            Entry binding = codecRegistry.fromNormalizedNode(normalized.getKey(), normalized.getValue());
-            return Optional.<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>>fromNullable(binding);
+            /*
+             * This cast is required, due to generics behaviour in openjdk / oracle javac
+             *
+             * InstanceIdentifier has definition InstanceIdentifier<T extends DataObject>,
+             * this means '?' is always Â <? extends DataObject>. Eclipse compiler
+             * is able to determine this relationship and treats
+             * Entry<InstanceIdentifier<?>,DataObject> and Entry<InstanceIdentifier<? extends DataObject,DataObject>
+             * as assignable. However openjdk / oracle javac treats this two types
+             * as incompatible and issues a compile error.
+             *
+             * It is safe to  loose generic information and cast it to other generic signature.
+             *
+             */
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            final Entry<InstanceIdentifier<? extends DataObject>, DataObject> binding = (Entry) codecRegistry.fromNormalizedNode(normalized.getKey(), normalized.getValue());
+            return Optional.fromNullable(binding);
         } catch (IllegalArgumentException e) {
             return Optional.absent();
         }
@@ -112,7 +125,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener,AutoC
      * @param path DOM Path
      * @return Node with defaults set on.
      */
-    public NormalizedNode<?, ?> getDefaultNodeFor(final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier path) {
+    public NormalizedNode<?, ?> getDefaultNodeFor(final YangInstanceIdentifier path) {
         Iterator<PathArgument> iterator = path.getPathArguments().iterator();
         DataNormalizationOperation<?> currentOp = legacyToNormalized.getRootOperation();
         while (iterator.hasNext()) {
@@ -131,7 +144,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener,AutoC
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         // NOOP Intentionally
     }
 }
index fc455b193e27118f6dcfcc2de93032f2676c5619..c557118b1e8f92234d597ce6372cda2674f6a6bd 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -31,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
 
 import java.util.Collections;
 import java.util.List;
@@ -40,32 +42,79 @@ import java.util.List;
  */
 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
 
-    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
     private final ActorContext actorContext;
-    private final List<ActorPath> cohortPaths;
+    private final List<Future<ActorPath>> cohortPathFutures;
+    private volatile List<ActorPath> cohortPaths;
     private final String transactionId;
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
-            String transactionId) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+            List<Future<ActorPath>> cohortPathFutures, String transactionId) {
         this.actorContext = actorContext;
-        this.cohortPaths = cohortPaths;
+        this.cohortPathFutures = cohortPathFutures;
         this.transactionId = transactionId;
     }
 
+    private Future<Void> buildCohortPathsList() {
+
+        Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+                actorContext.getActorSystem().dispatcher());
+
+        return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+            @Override
+            public Void apply(Iterable<ActorPath> paths) {
+                cohortPaths = Lists.newArrayList(paths);
+
+                LOG.debug("Tx {} successfully built cohort path list: {}",
+                        transactionId, cohortPaths);
+                return null;
+            }
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+    }
+
     @Override
     public ListenableFuture<Boolean> canCommit() {
-        LOG.debug("txn {} canCommit", transactionId);
+        LOG.debug("Tx {} canCommit", transactionId);
+
+        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+        // The first phase of canCommit is to gather the list of cohort actor paths that will
+        // participate in the commit. buildCohortPathsList combines the cohort path Futures into
+        // one Future which we wait on asynchronously here. The cohort actor paths are
+        // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
+        // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
+
+        buildCohortPathsList().onComplete(new OnComplete<Void>() {
+            @Override
+            public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+                if(failure != null) {
+                    LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+                    returnFuture.setException(failure);
+                } else {
+                    finishCanCommit(returnFuture);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
+
+        return returnFuture;
+    }
+
+    private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
+
+        LOG.debug("Tx {} finishCanCommit", transactionId);
+
+        // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
+        // their canCommit processing. If any one fails then we'll fail canCommit.
 
         Future<Iterable<Object>> combinedFuture =
                 invokeCohorts(new CanCommitTransaction().toSerializable());
 
-        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
                 if(failure != null) {
+                    LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     returnFuture.setException(failure);
                     return;
                 }
@@ -87,18 +136,18 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     }
                 }
 
+                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
                 returnFuture.set(Boolean.valueOf(result));
             }
         }, actorContext.getActorSystem().dispatcher());
-
-        return returnFuture;
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
         for(ActorPath actorPath : cohortPaths) {
 
-            LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+            LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
 
             ActorSelection cohort = actorContext.actorSelection(actorPath);
 
@@ -111,39 +160,73 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> preCommit() {
-        LOG.debug("txn {} preCommit", transactionId);
-        return voidOperation(new PreCommitTransaction().toSerializable(),
+        return voidOperation("preCommit",  new PreCommitTransaction().toSerializable(),
                 PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
     }
 
     @Override
     public ListenableFuture<Void> abort() {
-        LOG.debug("txn {} abort", transactionId);
-
         // Note - we pass false for propagateException. In the front-end data broker, this method
         // is called when one of the 3 phases fails with an exception. We'd rather have that
         // original exception propagated to the client. If our abort fails and we propagate the
         // exception then that exception will supersede and suppress the original exception. But
         // it's the original exception that is the root cause and of more interest to the client.
 
-        return voidOperation(new AbortTransaction().toSerializable(),
+        return voidOperation("abort", new AbortTransaction().toSerializable(),
                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
     }
 
     @Override
     public ListenableFuture<Void> commit() {
-        LOG.debug("txn {} commit", transactionId);
-        return voidOperation(new CommitTransaction().toSerializable(),
+        return voidOperation("commit",  new CommitTransaction().toSerializable(),
                 CommitTransactionReply.SERIALIZABLE_CLASS, true);
     }
 
-    private ListenableFuture<Void> voidOperation(final Object message,
+    private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
             final Class<?> expectedResponseClass, final boolean propagateException) {
 
-        Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+        LOG.debug("Tx {} {}", transactionId, operationName);
 
         final SettableFuture<Void> returnFuture = SettableFuture.create();
 
+        // The cohort actor list should already be built at this point by the canCommit phase but,
+        // if not for some reason, we'll try to build it here.
+
+        if(cohortPaths != null) {
+            finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+                    returnFuture);
+        } else {
+            buildCohortPathsList().onComplete(new OnComplete<Void>() {
+                @Override
+                public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+                    if(failure != null) {
+                        LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+                                operationName, failure);
+
+                        if(propagateException) {
+                            returnFuture.setException(failure);
+                        } else {
+                            returnFuture.set(null);
+                        }
+                    } else {
+                        finishVoidOperation(operationName, message, expectedResponseClass,
+                                propagateException, returnFuture);
+                    }
+                }
+            }, actorContext.getActorSystem().dispatcher());
+        }
+
+        return returnFuture;
+    }
+
+    private void finishVoidOperation(final String operationName, final Object message,
+            final Class<?> expectedResponseClass, final boolean propagateException,
+            final SettableFuture<Void> returnFuture) {
+
+        LOG.debug("Tx {} finish {}", transactionId, operationName);
+
+        Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
@@ -161,6 +244,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 if(exceptionToPropagate != null) {
+                    LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+                            operationName, exceptionToPropagate);
+
                     if(propagateException) {
                         // We don't log the exception here to avoid redundant logging since we're
                         // propagating to the caller in MD-SAL core who will log it.
@@ -174,15 +260,15 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         returnFuture.set(null);
                     }
                 } else {
+                    LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
                     returnFuture.set(null);
                 }
             }
         }, actorContext.getActorSystem().dispatcher());
-
-        return returnFuture;
     }
 
-    public List<ActorPath> getCohortPaths() {
-        return Collections.unmodifiableList(this.cohortPaths);
+    @VisibleForTesting
+    List<Future<ActorPath>> getCohortPathFutures() {
+        return Collections.unmodifiableList(cohortPathFutures);
     }
 }
index 6183c489c4cdbc56a1ba9ac0c3c3939963277edb..2e7b2feb85bd97c7b80d09b8156f4b44c6fb343e 100644 (file)
@@ -9,13 +9,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
-import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.Props;
 import akka.dispatch.OnComplete;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -45,9 +45,10 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Function1;
 import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +73,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         READ_WRITE
     }
 
+    static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
+                                                                          Throwable, Throwable>() {
+        @Override
+        public Throwable apply(Throwable failure) {
+            return failure;
+        }
+    };
+
     private static final AtomicLong counter = new AtomicLong();
 
     private static final Logger
@@ -103,6 +112,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     }
 
+    @VisibleForTesting
+    List<Future<Object>> getRecordedOperationFutures() {
+        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+        }
+
+        return recordedOperationFutures;
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -110,7 +129,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Read operation on write-only transaction is not allowed");
 
-        LOG.debug("txn {} read {}", identifier, path);
+        LOG.debug("Tx {} read {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -123,7 +142,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Exists operation on write-only transaction is not allowed");
 
-        LOG.debug("txn {} exists {}", identifier, path);
+        LOG.debug("Tx {} exists {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -142,7 +161,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} write {}", identifier, path);
+        LOG.debug("Tx {} write {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -154,7 +173,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} merge {}", identifier, path);
+        LOG.debug("Tx {} merge {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -166,7 +185,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} delete {}", identifier, path);
+        LOG.debug("Tx {} delete {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -180,31 +199,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         inReadyState = true;
 
-        List<ActorPath> cohortPaths = new ArrayList<>();
-
-        LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+        LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
                 remoteTransactionPaths.size());
 
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
 
-            LOG.debug("txn {} Readying transaction for shard {}", identifier,
+            LOG.debug("Tx {} Readying transaction for shard {}", identifier,
                     transactionContext.getShardName());
 
-            Object result = transactionContext.readyTransaction();
-
-            if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
-                ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
-                        actorContext.getActorSystem(),result);
-                String resolvedCohortPath = transactionContext.getResolvedCohortPath(
-                        reply.getCohortPath().toString());
-                cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
-            } else {
-                LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
-                        result.getClass());
-            }
+            cohortPathFutures.add(transactionContext.readyTransaction());
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+                identifier.toString());
     }
 
     @Override
@@ -249,7 +258,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 String transactionPath = reply.getTransactionPath();
 
-                LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
+                LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
 
                 ActorSelection transactionActor =
                     actorContext.actorSelection(transactionPath);
@@ -259,11 +268,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
-                LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
-                        response.getClass());
+                throw new IllegalArgumentException(String.format(
+                        "Invalid reply type {} for CreateTransaction", response.getClass()));
             }
         } catch(Exception e){
-            LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+            LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
         }
     }
@@ -271,11 +280,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private interface TransactionContext {
         String getShardName();
 
-        String getResolvedCohortPath(String cohortPath);
+        void closeTransaction();
 
-        public void closeTransaction();
+        Future<ActorPath> readyTransaction();
 
-        public Object readyTransaction();
+        void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
         void deleteData(YangInstanceIdentifier path);
 
@@ -284,23 +293,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 final YangInstanceIdentifier path);
 
-        void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
-    }
 
+        List<Future<Object>> getRecordedOperationFutures();
+    }
 
-    private class TransactionContextImpl implements TransactionContext {
-        private final String shardName;
-        private final String actorPath;
-        private final ActorSelection actor;
+    private abstract class AbstractTransactionContext implements TransactionContext {
 
+        protected final String shardName;
+        protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
 
-        private TransactionContextImpl(String shardName, String actorPath,
-            ActorSelection actor) {
+        AbstractTransactionContext(String shardName) {
             this.shardName = shardName;
-            this.actorPath = actorPath;
-            this.actor = actor;
         }
 
         @Override
@@ -308,53 +312,193 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return shardName;
         }
 
+        @Override
+        public List<Future<Object>> getRecordedOperationFutures() {
+            return recordedOperationFutures;
+        }
+    }
+
+    private class TransactionContextImpl extends AbstractTransactionContext {
+        private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+
+        private final String actorPath;
+        private final ActorSelection actor;
+
+        private TransactionContextImpl(String shardName, String actorPath,
+            ActorSelection actor) {
+            super(shardName);
+            this.actorPath = actorPath;
+            this.actor = actor;
+        }
+
         private ActorSelection getActor() {
             return actor;
         }
 
-        @Override
-        public String getResolvedCohortPath(String cohortPath) {
+        private String getResolvedCohortPath(String cohortPath) {
             return actorContext.resolvePath(actorPath, cohortPath);
         }
 
         @Override
         public void closeTransaction() {
+            LOG.debug("Tx {} closeTransaction called", identifier);
             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
         @Override
-        public Object readyTransaction() {
-            return actorContext.executeRemoteOperation(getActor(),
+        public Future<ActorPath> readyTransaction() {
+            LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+                    identifier, recordedOperationFutures.size());
+
+            // Send the ReadyTransaction message to the Tx actor.
+
+            final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
                     new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+
+            // Combine all the previously recorded put/merge/delete operation reply Futures and the
+            // ReadyTransactionReply Future into one Future. If any one fails then the combined
+            // Future will fail. We need all prior operations and the ready operation to succeed
+            // in order to attempt commit.
+
+            List<Future<Object>> futureList =
+                    Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+            futureList.addAll(recordedOperationFutures);
+            futureList.add(replyFuture);
+
+            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
+                    actorContext.getActorSystem().dispatcher());
+
+            // Transform the combined Future into a Future that returns the cohort actor path from
+            // the ReadyTransactionReply. That's the end result of the ready operation.
+
+            return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+                @Override
+                public ActorPath apply(Iterable<Object> notUsed) {
+
+                    LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+                            identifier);
+
+                    // At this point all the Futures succeeded and we need to extract the cohort
+                    // actor path from the ReadyTransactionReply. For the recorded operations, they
+                    // don't return any data so we're only interested that they completed
+                    // successfully. We could be paranoid and verify the correct reply types but
+                    // that really should never happen so it's not worth the overhead of
+                    // de-serializing each reply.
+
+                    // Note the Future get call here won't block as it's complete.
+                    Object serializedReadyReply = replyFuture.value().get().get();
+                    if(serializedReadyReply.getClass().equals(
+                                                     ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+                        ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+                                actorContext.getActorSystem(), serializedReadyReply);
+
+                        String resolvedCohortPath = getResolvedCohortPath(
+                                reply.getCohortPath().toString());
+
+                        LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+                                identifier, resolvedCohortPath);
+
+                        return actorContext.actorFor(resolvedCohortPath);
+                    } else {
+                        // Throwing an exception here will fail the Future.
+
+                        throw new IllegalArgumentException(String.format("Invalid reply type {}",
+                                serializedReadyReply.getClass()));
+                    }
+                }
+            }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
-            actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
+            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            actorContext.sendRemoteOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable());
+            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new MergeData(path, data, schemaContext).toSerializable(),
+                    ActorContext.ASK_DURATION));
+        }
+
+        @Override
+        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new WriteData(path, data, schemaContext).toSerializable(),
+                    ActorContext.ASK_DURATION));
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path) {
+                final YangInstanceIdentifier path) {
+
+            LOG.debug("Tx {} readData called path = {}", identifier, path);
 
             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
 
+            // If there were any previous recorded put/merge/delete operation reply Futures then we
+            // must wait for them to successfully complete. This is necessary to honor the read
+            // uncommitted semantics of the public API contract. If any one fails then fail the read.
+
+            if(recordedOperationFutures.isEmpty()) {
+                finishReadData(path, returnFuture);
+            } else {
+                LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+                        identifier, recordedOperationFutures.size());
+
+                // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+                // Futures#sequence accesses the passed List on a different thread, as
+                // recordedOperationFutures is not synchronized.
+
+                Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+                        Lists.newArrayList(recordedOperationFutures),
+                        actorContext.getActorSystem().dispatcher());
+                OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+                    @Override
+                    public void onComplete(Throwable failure, Iterable<Object> notUsed)
+                            throws Throwable {
+                        if(failure != null) {
+                            LOG.debug("Tx {} readData: a recorded operation failed: {}",
+                                    identifier, failure);
+
+                            returnFuture.setException(new ReadFailedException(
+                                    "The read could not be performed because a previous put, merge,"
+                                    + "or delete operation failed", failure));
+                        } else {
+                            finishReadData(path, returnFuture);
+                        }
+                    }
+                };
+
+                combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            }
+
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+        }
+
+        private void finishReadData(final YangInstanceIdentifier path,
+                final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+
+            LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable failure, Object response) throws Throwable {
+                public void onComplete(Throwable failure, Object readResponse) throws Throwable {
                     if(failure != null) {
+                        LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+
                         returnFuture.setException(new ReadFailedException(
                                 "Error reading data for path " + path, failure));
                     } else {
-                        if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+                        LOG.debug("Tx {} read operation succeeded", identifier, failure);
+
+                        if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
-                                    path, response);
+                                    path, readResponse);
                             if (reply.getNormalizedNode() == null) {
                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
                             } else {
@@ -369,32 +513,76 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+            Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
                     new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
-            future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
-            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
-        }
-
-        @Override
-        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            actorContext.sendRemoteOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable());
+            readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
         @Override
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 final YangInstanceIdentifier path) {
 
+            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
+            // If there were any previous recorded put/merge/delete operation reply Futures then we
+            // must wait for them to successfully complete. This is necessary to honor the read
+            // uncommitted semantics of the public API contract. If any one fails then fail this
+            // request.
+
+            if(recordedOperationFutures.isEmpty()) {
+                finishDataExists(path, returnFuture);
+            } else {
+                LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+                        identifier, recordedOperationFutures.size());
+
+                // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+                // Futures#sequence accesses the passed List on a different thread, as
+                // recordedOperationFutures is not synchronized.
+
+                Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+                        Lists.newArrayList(recordedOperationFutures),
+                        actorContext.getActorSystem().dispatcher());
+                OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+                    @Override
+                    public void onComplete(Throwable failure, Iterable<Object> notUsed)
+                            throws Throwable {
+                        if(failure != null) {
+                            LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+                                    identifier, failure);
+
+                            returnFuture.setException(new ReadFailedException(
+                                    "The data exists could not be performed because a previous "
+                                    + "put, merge, or delete operation failed", failure));
+                        } else {
+                            finishDataExists(path, returnFuture);
+                        }
+                    }
+                };
+
+                combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            }
+
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+        }
+
+        private void finishDataExists(final YangInstanceIdentifier path,
+                final SettableFuture<Boolean> returnFuture) {
+
+            LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
                 public void onComplete(Throwable failure, Object response) throws Throwable {
                     if(failure != null) {
+                        LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+
                         returnFuture.setException(new ReadFailedException(
-                                "Error checking exists for path " + path, failure));
+                                "Error checking data exists for path " + path, failure));
                     } else {
+                        LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+
                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
                             returnFuture.set(Boolean.valueOf(DataExistsReply.
                                         fromSerializable(response).exists()));
@@ -409,80 +597,60 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
                     new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
-            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
         }
     }
 
-    private class NoOpTransactionContext implements TransactionContext {
+    private class NoOpTransactionContext extends AbstractTransactionContext {
 
-        private final Logger
-            LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+        private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
-        private final String shardName;
         private final Exception failure;
 
-        private ActorRef cohort;
-
         public NoOpTransactionContext(String shardName, Exception failure){
-            this.shardName = shardName;
+            super(shardName);
             this.failure = failure;
         }
 
         @Override
-        public String getShardName() {
-            return  shardName;
-
+        public void closeTransaction() {
+            LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
         }
 
         @Override
-        public String getResolvedCohortPath(String cohortPath) {
-            return cohort.path().toString();
+        public Future<ActorPath> readyTransaction() {
+            LOG.debug("Tx {} readyTransaction called", identifier);
+            return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
-        public void closeTransaction() {
-            LOG.warn("txn {} closeTransaction called", identifier);
-        }
-
-        @Override public Object readyTransaction() {
-            LOG.warn("txn {} readyTransaction called", identifier);
-            cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
-            return new ReadyTransactionReply(cohort.path()).toSerializable();
+        public void deleteData(YangInstanceIdentifier path) {
+            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
         }
 
         @Override
-        public void deleteData(YangInstanceIdentifier path) {
-            LOG.warn("txt {} deleteData called path = {}", identifier, path);
+        public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
         }
 
         @Override
-        public void mergeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            LOG.warn("txn {} mergeData called path = {}", identifier, path);
+        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} writeData called path = {}", identifier, path);
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
-            LOG.warn("txn {} readData called path = {}", identifier, path);
+            LOG.debug("Tx {} readData called path = {}", identifier, path);
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
 
-        @Override public void writeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            LOG.warn("txn {} writeData called path = {}", identifier, path);
-        }
-
-        @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+        @Override
+        public CheckedFuture<Boolean, ReadFailedException> dataExists(
             YangInstanceIdentifier path) {
-            LOG.warn("txn {} dataExists called path = {}", identifier, path);
+            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
     }
-
-
-
 }
index 87231f08849ed02398472bb792a40a48753a96be..adb12b298e99260b6f33a6c309f3c6eb16ca78bb 100644 (file)
@@ -7,8 +7,9 @@ import akka.dispatch.Futures;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.isA;
@@ -31,14 +32,21 @@ import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
+    @SuppressWarnings("serial")
+    static class TestException extends RuntimeException {
+    }
+
     @Mock
     private ActorContext actorContext;
 
@@ -49,15 +57,28 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         doReturn(getSystem()).when(actorContext).getActorSystem();
     }
 
-    private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
-        List<ActorPath> cohorts = Lists.newArrayList();
+    private Future<ActorPath> newCohortPath() {
+        ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+        doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+        return Futures.successful(path);
+    }
+
+    private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
         for(int i = 1; i <= nCohorts; i++) {
-            ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
-            cohorts.add(path);
-            doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+            cohortPathFutures.add(newCohortPath());
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+    }
+
+    private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+            throws Exception {
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+        cohortPathFutures.add(newCohortPath());
+        cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
     }
 
     private void setupMockActorContext(Class<?> requestType, Object... responses) {
@@ -80,6 +101,16 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
                 any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
     }
 
+    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Expected ExecutionException");
+        } catch(ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+
     @Test
     public void testCanCommitWithOneCohort() throws Exception {
 
@@ -90,14 +121,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", true, future.get());
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
                 new CanCommitTransactionReply(false));
 
         future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get());
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -112,7 +143,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", true, future.get());
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -128,19 +159,19 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get());
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCanCommitWithExceptionFailure() throws Exception {
+    @Test(expected = TestException.class)
+    public void testCanCommitWithExceptionFailure() throws Throwable {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
 
-        proxy.canCommit().get();
+        propagateExecutionExceptionCause(proxy.canCommit());
     }
 
     @Test(expected = ExecutionException.class)
@@ -151,7 +182,19 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply());
 
-        proxy.canCommit().get();
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCanCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.canCommit());
+        } finally {
+            verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+        }
     }
 
     @Test
@@ -161,7 +204,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply());
 
-        proxy.preCommit().get();
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -173,7 +216,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply(), new RuntimeException("mock"));
 
-        proxy.preCommit().get();
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
     }
 
     @Test
@@ -182,7 +225,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
 
-        proxy.abort().get();
+        proxy.abort().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
     }
@@ -194,11 +237,22 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
 
         // The exception should not get propagated.
-        proxy.abort().get();
+        proxy.abort().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
     }
 
+    @Test
+    public void testAbortWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        // The exception should not get propagated.
+        proxy.abort().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+    }
+
     @Test
     public void testCommit() throws Exception {
 
@@ -207,39 +261,64 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
                 new CommitTransactionReply());
 
-        proxy.commit().get();
+        proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCommitWithFailure() throws Exception {
+    @Test(expected = TestException.class)
+    public void testCommitWithFailure() throws Throwable {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
-                new RuntimeException("mock"));
+                new TestException());
 
-        proxy.commit().get();
+        propagateExecutionExceptionCause(proxy.commit());
     }
 
     @Test(expected = ExecutionException.class)
-    public void teseCommitWithInvalidResponseType() throws Exception {
+    public void testCommitWithInvalidResponseType() throws Exception {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
 
-        proxy.commit().get();
+        proxy.commit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.commit());
+        } finally {
+            verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+        }
     }
 
     @Test
-    public void testGetCohortPaths() {
+    public void testAllThreePhasesSuccessful() throws Exception {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        List<ActorPath> paths = proxy.getCohortPaths();
-        assertNotNull("getCohortPaths returned null", paths);
-        assertEquals("getCohortPaths size", 2, paths.size());
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+
+        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+                new CommitTransactionReply(), new CommitTransactionReply());
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 }
index 14696f786e7e36888be3b5517c14ae4b9779340f..6b11a24e9cedbb3a8234fb6e337f7750a413721f 100644 (file)
@@ -9,7 +9,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -29,12 +31,15 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -48,10 +53,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -62,6 +69,7 @@ import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest extends AbstractActorTest {
@@ -71,7 +79,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     }
 
     static interface Invoker {
-        void invoke(TransactionProxy proxy) throws Exception;
+        CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
     }
 
     private final Configuration configuration = new MockConfiguration();
@@ -90,6 +98,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         schemaContext = TestModel.createTestContext();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -112,8 +121,8 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
             @Override
             public boolean matches(Object argument) {
-                DataExists obj = DataExists.fromSerializable(argument);
-                return obj.getPath().equals(TestModel.TEST_PATH);
+                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
             }
         };
 
@@ -124,8 +133,8 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
             @Override
             public boolean matches(Object argument) {
-                ReadData obj = ReadData.fromSerializable(argument);
-                return obj.getPath().equals(TestModel.TEST_PATH);
+                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
             }
         };
 
@@ -136,6 +145,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
+                if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    return false;
+                }
+
                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
                 return obj.getPath().equals(TestModel.TEST_PATH) &&
                        obj.getData().equals(nodeToWrite);
@@ -149,6 +162,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
+                if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    return false;
+                }
+
                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
                 return obj.getPath().equals(TestModel.TEST_PATH) &&
                        obj.getData().equals(nodeToWrite);
@@ -162,27 +179,38 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
             @Override
             public boolean matches(Object argument) {
-                DeleteData obj = DeleteData.fromSerializable(argument);
-                return obj.getPath().equals(TestModel.TEST_PATH);
+                return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
             }
         };
 
         return argThat(matcher);
     }
 
-    private Object readyTxReply(ActorPath path) {
-        return new ReadyTransactionReply(path).toSerializable();
+    private Future<Object> readyTxReply(ActorPath path) {
+        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
     }
 
     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data)
-                .toSerializable());
+        return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
     }
 
     private Future<Object> dataExistsReply(boolean exists) {
         return Futures.successful(new DataExistsReply(exists).toSerializable());
     }
 
+    private Future<Object> writeDataReply() {
+        return Futures.successful(new WriteDataReply().toSerializable());
+    }
+
+    private Future<Object> mergeDataReply() {
+        return Futures.successful(new MergeDataReply().toSerializable());
+    }
+
+    private Future<Object> deleteDataReply() {
+        return Futures.successful(new DeleteDataReply().toSerializable());
+    }
+
     private ActorSelection actorSelection(ActorRef actorRef) {
         return getSystem().actorSelection(actorRef.path());
     }
@@ -201,7 +229,6 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
         doReturn(getSystem().actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
-        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
                         eqCreateTransaction(memberName, type), anyDuration());
@@ -212,6 +239,17 @@ public class TransactionProxyTest extends AbstractActorTest {
         return actorRef;
     }
 
+    private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+            throws Throwable {
+
+        try {
+            future.checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected ReadFailedException");
+        } catch(ReadFailedException e) {
+            throw e.getCause();
+        }
+    }
+
     @Test
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
@@ -240,7 +278,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     }
 
     @Test(expected = ReadFailedException.class)
-    public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
+    public void testReadWithInvalidReplyMessageType() throws Exception {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
@@ -256,19 +294,13 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        doThrow(new TestException()).when(mockActorContext).
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY, schemaContext);
 
-        try {
-            transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            // Expected - throw cause - expects TestException.
-            throw e.getCause();
-        }
+        propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
 
     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
@@ -280,20 +312,14 @@ public class TransactionProxyTest extends AbstractActorTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY, schemaContext);
 
-        try {
-            invoker.invoke(transactionProxy);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            // Expected - throw cause - expects TestException.
-            throw e.getCause();
-        }
+        propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
     }
 
     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
             @Override
-            public void invoke(TransactionProxy proxy) throws Exception {
-                proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+                return proxy.read(TestModel.TEST_PATH);
             }
         });
     }
@@ -314,6 +340,71 @@ public class TransactionProxyTest extends AbstractActorTest {
         testReadWithExceptionOnInitialCreateTransaction(new TestException());
     }
 
+    @Test(expected = TestException.class)
+    public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+                        anyDuration());
+
+        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        try {
+            propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
+        } finally {
+            verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+                    eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+        }
+    }
+
+    @Test
+    public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+
+        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, expectedNode);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+        assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testReadPreConditionCheck() {
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+    }
+
     @Test
     public void testExists() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
@@ -340,14 +431,14 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
             @Override
-            public void invoke(TransactionProxy proxy) throws Exception {
-                proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+                return proxy.exists(TestModel.TEST_PATH);
             }
         });
     }
 
     @Test(expected = ReadFailedException.class)
-    public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+    public void testExistsWithInvalidReplyMessageType() throws Exception {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
@@ -363,62 +454,206 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        doThrow(new TestException()).when(mockActorContext).
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY, schemaContext);
 
+        propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+    }
+
+    @Test(expected = TestException.class)
+    public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+                        anyDuration());
+
+        doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
         try {
-            transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            // Expected - throw cause - expects TestException.
-            throw e.getCause();
+            propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+        } finally {
+            verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+                    eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
         }
     }
 
     @Test
-    public void testWrite() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+    public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+
+        assertEquals("Exists response", true, exists);
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testxistsPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY, schemaContext);
 
+        transactionProxy.exists(TestModel.TEST_PATH);
+    }
+
+    private void verifyRecordingOperationFutures(List<Future<Object>> futures,
+            Class<?>... expResultTypes) throws Exception {
+        assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
+
+        int i = 0;
+        for( Future<Object> future: futures) {
+            assertNotNull("Recording operation Future is null", future);
+
+            Class<?> expResultType = expResultTypes[i++];
+            if(Throwable.class.isAssignableFrom(expResultType)) {
+                try {
+                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                    fail("Expected exception from recording operation Future");
+                } catch(Exception e) {
+                    // Expected
+                }
+            } else {
+                assertEquals("Recording operation Future result type", expResultType,
+                             Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
+            }
+        }
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).sendRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+        verify(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.SERIALIZABLE_CLASS);
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testWritePreConditionCheck() {
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testWriteAfterReadyPreConditionCheck() {
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.ready();
+
+        transactionProxy.write(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test
     public void testMerge() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY, schemaContext);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).sendRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+        verify(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                MergeDataReply.SERIALIZABLE_CLASS);
     }
 
     @Test
     public void testDelete() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
+        doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY, schemaContext);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        verify(mockActorContext).sendRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData());
+        verify(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                DeleteDataReply.SERIALIZABLE_CLASS);
+    }
+
+    private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
+            Object... expReplies) throws Exception {
+        assertEquals("getReadyOperationFutures size", expReplies.length,
+                proxy.getCohortPathFutures().size());
+
+        int i = 0;
+        for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+            assertNotNull("Ready operation Future is null", future);
+
+            Object expReply = expReplies[i++];
+            if(expReply instanceof ActorPath) {
+                ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                assertEquals("Cohort actor path", expReply, actual);
+            } else {
+                // Expecting exception.
+                try {
+                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                    fail("Expected exception from ready operation Future");
+                } catch(Exception e) {
+                    // Expected
+                }
+            }
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -426,10 +661,15 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReady() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
-        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -437,13 +677,139 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         transactionProxy.read(TestModel.TEST_PATH);
 
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.SERIALIZABLE_CLASS);
+
+        verifyCohortPathFutures(proxy, actorRef.path());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyWithRecordingOperationFailure() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
+                        anyDuration());
+
+        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+
+        verifyCohortPathFutures(proxy, TestException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyWithReplyFailure() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                MergeDataReply.SERIALIZABLE_CLASS);
+
+        verifyCohortPathFutures(proxy, TestException.class);
+    }
+
+    @Test
+    public void testReadyWithInitialCreateTransactionFailure() throws Exception {
+
+        doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+                anyString(), any(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyWithInvalidReplyMessageType() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.successful(new Object())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
+        verifyCohortPathFutures(proxy, IllegalArgumentException.class);
     }
 
     @Test
index 4d6b9ebd6f8317412943cce1a6750128b43668ac..89dcab2c3da1dfc45857ffe89a76c9e74c56a450 100644 (file)
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.core</artifactId>
     </dependency>
+
+    <!-- dependencies to use AbstractDataBrokerTest -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-binding-broker-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-binding-broker-impl</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+        <artifactId>junit</artifactId>
+        <groupId>junit</groupId>
+        <scope>test</scope>
+    </dependency>
+    <!-- used to mock up classes -->
+     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/opendaylight/md-sal/samples/toaster-provider/src/test/java/org/opendaylight/controller/sample/toaster/provider/OpenDaylightToasterTest.java b/opendaylight/md-sal/samples/toaster-provider/src/test/java/org/opendaylight/controller/sample/toaster/provider/OpenDaylightToasterTest.java
new file mode 100644 (file)
index 0000000..516f214
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+* Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+*
+* This program and the accompanying materials are made available under the
+* terms of the Eclipse Public License v1.0 which accompanies this distribution,
+* and is available at http://www.eclipse.org/legal/epl-v10.html
+*/
+package org.opendaylight.controller.sample.toaster.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Future;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInputBuilder;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.Toaster;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.WheatBread;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Optional;
+
+public class OpenDaylightToasterTest extends AbstractDataBrokerTest{
+
+    private static InstanceIdentifier<Toaster> TOASTER_IID =
+                        InstanceIdentifier.builder( Toaster.class ).build();
+    OpendaylightToaster toaster;
+
+    @Override
+    protected void setupWithDataBroker(DataBroker dataBroker) {
+        toaster = new OpendaylightToaster();
+        toaster.setDataProvider( dataBroker );
+
+        /**
+         * Doesn't look like we have support for the NotificationProviderService yet, so mock it
+         * for now.
+         */
+        NotificationProviderService mockNotification = mock( NotificationProviderService.class );
+        toaster.setNotificationProvider( mockNotification );
+    }
+
+    @Test
+    public void testToasterInitOnStartUp() throws Exception {
+        DataBroker broker = getDataBroker();
+
+        ReadOnlyTransaction rTx = broker.newReadOnlyTransaction();
+        Optional<Toaster> optional = rTx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID ).get();
+        assertNotNull( optional );
+        assertTrue( "Operational toaster not present", optional.isPresent() );
+
+        Toaster toaster = optional.get();
+
+        assertEquals( Toaster.ToasterStatus.Up, toaster.getToasterStatus() );
+        assertEquals( new DisplayString("Opendaylight"),
+                      toaster.getToasterManufacturer() );
+        assertEquals( new DisplayString("Model 1 - Binding Aware"),
+                      toaster.getToasterModelNumber() );
+
+        Optional<Toaster> configToaster =
+                            rTx.read( LogicalDatastoreType.CONFIGURATION, TOASTER_IID ).get();
+        assertFalse( "Didn't expect config data for toaster.",
+                     configToaster.isPresent() );
+    }
+
+    @Test
+    @Ignore //ignored because it is not an e test right now. Illustrative purposes only.
+    public void testSomething() throws Exception{
+        MakeToastInput toastInput = new MakeToastInputBuilder()
+                                        .setToasterDoneness( 1L )
+                                        .setToasterToastType( WheatBread.class )
+                                        .build();
+
+        //NOTE: In a real test we would want to override the Thread.sleep() to prevent our junit test
+        //for sleeping for a second...
+        Future<RpcResult<Void>> makeToast = toaster.makeToast( toastInput );
+
+        RpcResult<Void> rpcResult = makeToast.get();
+
+        assertNotNull( rpcResult );
+        assertTrue( rpcResult.isSuccessful() );
+         //etc
+    }
+
+}
index dcc2fa15439b01ecd15eca5144f15eaa3a77086b..2b081258c42014e880846330994ea2dbb84147b8 100644 (file)
@@ -19,15 +19,15 @@ import javax.management.openmbean.OpenType;
  * of some module. Contains default value extracted from yang file.
  */
 public class AttributeConfigElement {
-    private final Object dafaultValue;
+    private final Object defaultValue;
     private final Object value;
 
     private Optional<?> resolvedValue;
     private Object resolvedDefaultValue;
     private String jmxName;
 
-    public AttributeConfigElement(Object dafaultValue, Object value) {
-        this.dafaultValue = dafaultValue;
+    public AttributeConfigElement(Object defaultValue, Object value) {
+        this.defaultValue = defaultValue;
         this.value = value;
     }
 
@@ -42,7 +42,7 @@ public class AttributeConfigElement {
     public void resolveValue(AttributeResolvingStrategy<?, ? extends OpenType<?>> attributeResolvingStrategy,
             String attrName) throws NetconfDocumentedException {
         resolvedValue = attributeResolvingStrategy.parseAttribute(attrName, value);
-        Optional<?> resolvedDefault = attributeResolvingStrategy.parseAttribute(attrName, dafaultValue);
+        Optional<?> resolvedDefault = attributeResolvingStrategy.parseAttribute(attrName, defaultValue);
         resolvedDefaultValue = resolvedDefault.isPresent() ? resolvedDefault.get() : null;
     }
 
@@ -58,6 +58,10 @@ public class AttributeConfigElement {
         return value;
     }
 
+    public Object getDefaultValue() {
+        return defaultValue;
+    }
+
     public Optional<?> getResolvedValue() {
         return resolvedValue;
     }
@@ -68,7 +72,7 @@ public class AttributeConfigElement {
 
     @Override
     public String toString() {
-        return "AttributeConfigElement [dafaultValue=" + dafaultValue + ", value=" + value + "]";
+        return "AttributeConfigElement [defaultValue=" + defaultValue + ", value=" + value + "]";
     }
 
 }
index 792fb28027245d92c30e1bbd3d84ee99e22beced..d06e55da6263e0cb56a0f0b7c4fe0f9fb0b283b7 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.confignetconfconnector.mapping.attri
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 
@@ -39,13 +40,18 @@ public class CompositeAttributeReadingStrategy extends AbstractAttributeReadingS
 
         List<XmlElement> recognisedChildren = Lists.newArrayList();
         for (Entry<String, AttributeReadingStrategy> innerAttrEntry : innerStrategies.entrySet()) {
-            List<XmlElement> childItem = null;
-            childItem = complexElement.getChildElementsWithSameNamespace(innerAttrEntry.getKey());
+            List<XmlElement> childItem = complexElement.getChildElementsWithSameNamespace(
+                    innerAttrEntry.getKey());
             recognisedChildren.addAll(childItem);
 
             AttributeConfigElement resolvedInner = innerAttrEntry.getValue().readElement(childItem);
 
-            innerMap.put(innerAttrEntry.getKey(), resolvedInner.getValue());
+            Object value = resolvedInner.getValue();
+            if(value == null) {
+                value = resolvedInner.getDefaultValue();
+            }
+
+            innerMap.put(innerAttrEntry.getKey(), value);
         }
 
         complexElement.checkUnrecognisedElements(recognisedChildren);
index 61ea76bbfeb9504f456132ba7cfebd67bcc6e508..e69084078963595a286e1aa75ea305b38789e091 100644 (file)
@@ -93,8 +93,9 @@ public class ObjectXmlReader extends AttributeIfcSwitchStatement<AttributeReadin
 
     @Override
     protected AttributeReadingStrategy caseTOAttribute(CompositeType openType) {
-        Preconditions.checkState(getLastAttribute() instanceof TOAttribute);
-        Map<String, AttributeIfc> inner = ((TOAttribute)getLastAttribute()).getYangPropertiesToTypesMap();
+        AttributeIfc lastAttribute = getLastAttribute();
+        Preconditions.checkState(lastAttribute instanceof TOAttribute);
+        Map<String, AttributeIfc> inner = ((TOAttribute)lastAttribute).getYangPropertiesToTypesMap();
 
         Map<String, AttributeReadingStrategy> innerStrategies = Maps.newHashMap();
 
@@ -104,21 +105,23 @@ public class ObjectXmlReader extends AttributeIfcSwitchStatement<AttributeReadin
             innerStrategies.put(innerAttrEntry.getKey(), innerStrat);
         }
 
-        return new CompositeAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategies);
+        return new CompositeAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategies);
     }
 
     @Override
     protected AttributeReadingStrategy caseListAttribute(ArrayType<?> openType) {
-        Preconditions.checkState(getLastAttribute() instanceof ListAttribute);
-        AttributeReadingStrategy innerStrategy = prepareReadingStrategy(key, ((ListAttribute) getLastAttribute()).getInnerAttribute());
-        return new ArrayAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategy);
+        AttributeIfc lastAttribute = getLastAttribute();
+        Preconditions.checkState(lastAttribute instanceof ListAttribute);
+        AttributeReadingStrategy innerStrategy = prepareReadingStrategy(key, ((ListAttribute) lastAttribute).getInnerAttribute());
+        return new ArrayAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategy);
     }
 
     @Override
     protected AttributeReadingStrategy caseListDependeciesAttribute(ArrayType<?> openType) {
-        Preconditions.checkState(getLastAttribute() instanceof ListDependenciesAttribute);
+        AttributeIfc lastAttribute = getLastAttribute();
+        Preconditions.checkState(lastAttribute instanceof ListDependenciesAttribute);
         AttributeReadingStrategy innerStrategy = caseDependencyAttribute(SimpleType.OBJECTNAME);
-        return new ArrayAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategy);
+        return new ArrayAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategy);
     }
 
 }
index 56f674bc34f287d74224ad269a1274c657109e68..a9e8dbe86b06b76087c902521022ff06e239b870 100644 (file)
@@ -9,13 +9,16 @@
 package org.opendaylight.controller.netconf.it;
 
 import static java.util.Arrays.asList;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-import io.netty.channel.ChannelFuture;
+import com.google.common.collect.Lists;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.IOException;
 import java.io.InputStream;
@@ -23,6 +26,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -68,27 +72,27 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
         super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext, getModuleFactories().toArray(
                 new ModuleFactory[0])));
 
-        NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
+        final NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
         factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
 
         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
 
 
         final NetconfServerDispatcher dispatchS = createDispatcher(factoriesListener);
-        ChannelFuture s = dispatchS.createLocalServer(NetconfConfigUtil.getNetconfLocalAddress());
-        s.await();
-        EventLoopGroup bossGroup  = new NioEventLoopGroup();
+        dispatchS.createLocalServer(NetconfConfigUtil.getNetconfLocalAddress()).await();
+        final EventLoopGroup bossGroup  = new NioEventLoopGroup();
         sshServer = NetconfSSHServer.start(tlsAddress.getPort(), NetconfConfigUtil.getNetconfLocalAddress(), getAuthProvider(), bossGroup);
     }
 
-    private NetconfServerDispatcher createDispatcher(NetconfOperationServiceFactoryListenerImpl factoriesListener) {
+    private NetconfServerDispatcher createDispatcher(final NetconfOperationServiceFactoryListenerImpl factoriesListener) {
         return super.createDispatcher(factoriesListener, NetconfITTest.getNetconfMonitoringListenerService(), commitNot);
     }
 
     @After
     public void tearDown() throws Exception {
-        sshServer.stop();
+        sshServer.close();
         commitNot.close();
+        sshServer.join();
     }
 
     private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
@@ -102,13 +106,13 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
 
     @Test
     public void testSecure() throws Exception {
-        NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
+        final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
         try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) {
             NetconfMessage response = netconfClient.sendMessage(getConfig);
             Assert.assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()),
                     NetconfMessageUtil.isErrorMessage(response));
 
-            NetconfMessage gs = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"2\"\n" +
+            final NetconfMessage gs = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"2\"\n" +
                     "     xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
                     "    <get-schema xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring\">\n" +
                     "        <identifier>config</identifier>\n" +
@@ -121,6 +125,41 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
         }
     }
 
+    /**
+     * Test all requests are handled properly and no mismatch occurs in listener
+     */
+    @Test(timeout = 3*60*1000)
+    public void testSecureStress() throws Exception {
+        final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
+        try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) {
+
+            final AtomicInteger responseCounter = new AtomicInteger(0);
+            final List<Future<?>> futures = Lists.newArrayList();
+
+            final int requests = 1000;
+            for (int i = 0; i < requests; i++) {
+                final Future<NetconfMessage> netconfMessageFuture = netconfClient.sendRequest(getConfig);
+                futures.add(netconfMessageFuture);
+                netconfMessageFuture.addListener(new GenericFutureListener<Future<? super NetconfMessage>>() {
+                    @Override
+                    public void operationComplete(final Future<? super NetconfMessage> future) throws Exception {
+                        assertTrue("Request unsuccessful " + future.cause(), future.isSuccess());
+                        responseCounter.incrementAndGet();
+                    }
+                });
+            }
+
+            for (final Future<?> future : futures) {
+                future.await();
+            }
+
+            // Give future listeners some time to finish counter incrementation
+            Thread.sleep(5000);
+
+            org.junit.Assert.assertEquals(requests, responseCounter.get());
+        }
+    }
+
     public NetconfClientConfiguration getClientConfiguration() throws IOException {
         final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
         b.withAddress(tlsAddress);
@@ -133,7 +172,7 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
     }
 
     public AuthProvider getAuthProvider() throws Exception {
-        AuthProvider mock = mock(AuthProviderImpl.class);
+        final AuthProvider mock = mock(AuthProviderImpl.class);
         doReturn(true).when(mock).authenticated(anyString(), anyString());
         doReturn(PEMGenerator.generate().toCharArray()).when(mock).getPEMAsCharArray();
         return mock;
index fa78fa4bd3983c7eee4cb166e54bc5692f0bad54..3b3f71b0ed0f3aad635d88d6b0bb93a3e4229aa3 100644 (file)
@@ -7,8 +7,7 @@
  */
 package org.opendaylight.controller.netconf.monitoring;
 
-import com.google.common.collect.Maps;
-
+import java.util.Collections;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
@@ -18,36 +17,26 @@ import org.opendaylight.controller.netconf.monitoring.xml.JaxBSerializer;
 import org.opendaylight.controller.netconf.monitoring.xml.model.NetconfState;
 import org.opendaylight.controller.netconf.util.mapping.AbstractNetconfOperation;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-import java.util.Map;
-
 public class Get extends AbstractNetconfOperation {
 
     private static final Logger logger = LoggerFactory.getLogger(Get.class);
     private final NetconfMonitoringService netconfMonitor;
 
-    public Get(NetconfMonitoringService netconfMonitor) {
+    public Get(final NetconfMonitoringService netconfMonitor) {
         super(MonitoringConstants.MODULE_NAME);
         this.netconfMonitor = netconfMonitor;
     }
 
-    private Element getPlaceholder(Document innerResult) throws NetconfDocumentedException {
-        try {
-            XmlElement rootElement = null;
-            rootElement = XmlElement.fromDomElementWithExpected(innerResult.getDocumentElement(),
-                    XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.RFC4741_TARGET_NAMESPACE);
-            return rootElement.getOnlyChildElement(XmlNetconfConstants.DATA_KEY).getDomElement();
-        } catch (RuntimeException e) {
-            throw new IllegalArgumentException(String.format(
-                    "Input xml in wrong format, Expecting root element %s with child element %s, but was %s",
-                    XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.DATA_KEY,
-                    XmlUtil.toString(innerResult.getDocumentElement())), e);
-        }
+    private Element getPlaceholder(final Document innerResult)
+            throws NetconfDocumentedException {
+        final XmlElement rootElement = XmlElement.fromDomElementWithExpected(
+                innerResult.getDocumentElement(), XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.RFC4741_TARGET_NAMESPACE);
+        return rootElement.getOnlyChildElement(XmlNetconfConstants.DATA_KEY).getDomElement();
     }
 
     @Override
@@ -61,7 +50,7 @@ public class Get extends AbstractNetconfOperation {
     }
 
     @Override
-    public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation)
+    public Document handle(final Document requestMessage, final NetconfOperationChainedExecution subsequentOperation)
             throws NetconfDocumentedException {
         if (subsequentOperation.isExecutionTermination()){
             throw new NetconfDocumentedException(String.format("Subsequent netconf operation expected by %s", this),
@@ -71,29 +60,29 @@ public class Get extends AbstractNetconfOperation {
         }
 
         try {
-            Document innerResult = subsequentOperation.execute(requestMessage);
+            final Document innerResult = subsequentOperation.execute(requestMessage);
 
-            NetconfState netconfMonitoring = new NetconfState(netconfMonitor);
+            final NetconfState netconfMonitoring = new NetconfState(netconfMonitor);
             Element monitoringXmlElement = new JaxBSerializer().toXml(netconfMonitoring);
 
             monitoringXmlElement = (Element) innerResult.importNode(monitoringXmlElement, true);
-            Element monitoringXmlElementPlaceholder = getPlaceholder(innerResult);
+            final Element monitoringXmlElementPlaceholder = getPlaceholder(innerResult);
             monitoringXmlElementPlaceholder.appendChild(monitoringXmlElement);
 
             return innerResult;
-        } catch (RuntimeException e) {
-            String errorMessage = "Get operation for netconf-state subtree failed";
+        } catch (final RuntimeException e) {
+            final String errorMessage = "Get operation for netconf-state subtree failed";
             logger.warn(errorMessage, e);
-            Map<String, String> info = Maps.newHashMap();
-            info.put(NetconfDocumentedException.ErrorSeverity.error.toString(), e.getMessage());
+
             throw new NetconfDocumentedException(errorMessage, NetconfDocumentedException.ErrorType.application,
                     NetconfDocumentedException.ErrorTag.operation_failed,
-                    NetconfDocumentedException.ErrorSeverity.error, info);
+                    NetconfDocumentedException.ErrorSeverity.error,
+                    Collections.singletonMap(NetconfDocumentedException.ErrorSeverity.error.toString(), e.getMessage()));
         }
     }
 
     @Override
-    protected Element handle(Document document, XmlElement message, NetconfOperationChainedExecution subsequentOperation)
+    protected Element handle(final Document document, final XmlElement message, final NetconfOperationChainedExecution subsequentOperation)
             throws NetconfDocumentedException {
         throw new UnsupportedOperationException("Never gets called");
     }
index 14c47352a8409d633dca64a01db06759ad79f8f6..9d332c644029ff3c26a2725703e4b348ff046cf0 100644 (file)
@@ -24,7 +24,6 @@ public class NetconfMonitoringActivator implements BundleActivator {
     public void start(final BundleContext context)  {
         monitor = new NetconfMonitoringServiceTracker(context);
         monitor.open();
-
     }
 
     @Override
index 920236b9b67ab0a277a5e166459463bc3ebac562..f99ae54e6dafac6e9ea01f36e85ab46261d9ce67 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.netconf.monitoring.osgi;
 
 import com.google.common.base.Preconditions;
+import java.util.Hashtable;
 import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
 import org.osgi.framework.BundleContext;
@@ -17,43 +18,39 @@ import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Dictionary;
-import java.util.Hashtable;
-
 public class NetconfMonitoringServiceTracker extends ServiceTracker<NetconfMonitoringService, NetconfMonitoringService> {
 
     private static final Logger logger = LoggerFactory.getLogger(NetconfMonitoringServiceTracker.class);
 
     private ServiceRegistration<NetconfOperationServiceFactory> reg;
 
-    NetconfMonitoringServiceTracker(BundleContext context) {
+    NetconfMonitoringServiceTracker(final BundleContext context) {
         super(context, NetconfMonitoringService.class, null);
     }
 
     @Override
-    public NetconfMonitoringService addingService(ServiceReference<NetconfMonitoringService> reference) {
+    public NetconfMonitoringService addingService(final ServiceReference<NetconfMonitoringService> reference) {
         Preconditions.checkState(reg == null, "Monitoring service was already added");
 
-        NetconfMonitoringService netconfMonitoringService = super.addingService(reference);
+        final NetconfMonitoringService netconfMonitoringService = super.addingService(reference);
 
         final NetconfMonitoringOperationService operationService = new NetconfMonitoringOperationService(
                 netconfMonitoringService);
-        NetconfOperationServiceFactory factory = new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
+        final NetconfOperationServiceFactory factory = new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
                 operationService);
 
-        Dictionary<String, ?> props = new Hashtable<>();
-        reg = context.registerService(NetconfOperationServiceFactory.class, factory, props);
+        reg = context.registerService(NetconfOperationServiceFactory.class, factory, new Hashtable<String, Object>());
 
         return netconfMonitoringService;
     }
 
     @Override
-    public void removedService(ServiceReference<NetconfMonitoringService> reference,
-            NetconfMonitoringService netconfMonitoringService) {
+    public void removedService(final ServiceReference<NetconfMonitoringService> reference,
+            final NetconfMonitoringService netconfMonitoringService) {
         if(reg!=null) {
             try {
                 reg.unregister();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 logger.warn("Ignoring exception while unregistering {}", reg, e);
             }
         }
index 4b07ab090aabfd3727524d255319352747839f4a..962ad17b66c280c5e7fb68cfc279ff30d79c2285 100644 (file)
@@ -18,17 +18,17 @@ import javax.xml.transform.dom.DOMResult;
 
 public class JaxBSerializer {
 
-    public Element toXml(NetconfState monitoringModel) {
-        DOMResult res = null;
+    public Element toXml(final NetconfState monitoringModel) {
+        final DOMResult res;
         try {
-            JAXBContext jaxbContext = JAXBContext.newInstance(NetconfState.class);
-            Marshaller marshaller = jaxbContext.createMarshaller();
+            final JAXBContext jaxbContext = JAXBContext.newInstance(NetconfState.class);
+            final Marshaller marshaller = jaxbContext.createMarshaller();
 
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
 
             res = new DOMResult();
             marshaller.marshal(monitoringModel, res);
-        } catch (JAXBException e) {
+        } catch (final JAXBException e) {
            throw new RuntimeException("Unable to serialize netconf state " + monitoringModel, e);
         }
         return ((Document)res.getNode()).getDocumentElement();
index 98bda5833e00a83487d9d356a44f0b55db0e82d9..8f5a1c029d8611b9f75c7b4570fc63002d9f1e18 100644 (file)
@@ -28,15 +28,12 @@ public final class NetconfState {
     private Schemas schemas;
     private Sessions sessions;
 
-    public NetconfState(NetconfMonitoringService monitoringService) {
+    public NetconfState(final NetconfMonitoringService monitoringService) {
         this.sessions = monitoringService.getSessions();
         this.schemas = monitoringService.getSchemas();
     }
 
-    public NetconfState() {
-    }
-
-
+    public NetconfState() {}
 
     @XmlElementWrapper(name="schemas")
     @XmlElement(name="schema")
@@ -44,7 +41,7 @@ public final class NetconfState {
         return Collections2.transform(schemas.getSchema(), new Function<Schema, MonitoringSchema>() {
             @Nullable
             @Override
-            public MonitoringSchema apply(@Nullable Schema input) {
+            public MonitoringSchema apply(@Nullable final Schema input) {
                 return new MonitoringSchema(input);
             }
         });
@@ -56,7 +53,7 @@ public final class NetconfState {
         return Collections2.transform(sessions.getSession(), new Function<Session, MonitoringSession>() {
             @Nullable
             @Override
-            public MonitoringSession apply(@Nullable Session input) {
+            public MonitoringSession apply(@Nullable final Session input) {
                 return new MonitoringSession(input);
             }
         });
diff --git a/opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/GetTest.java b/opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/GetTest.java
new file mode 100644 (file)
index 0000000..5fceac0
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.monitoring;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
+import java.util.Collections;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
+import org.w3c.dom.Document;
+
+public class GetTest {
+
+    @Mock
+    private NetconfMonitoringService monitor;
+    @Mock
+    private Document request;
+    @Mock
+    private NetconfOperationChainedExecution subsequentOperation;
+    private Document incorrectSubsequentResult;
+    private Document correctSubsequentResult;
+
+    private Get get;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        incorrectSubsequentResult = XmlUtil.readXmlToDocument("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>");
+        correctSubsequentResult = XmlUtil.readXmlToDocument("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"><data></data></rpc-reply>");
+
+        doReturn(new SessionsBuilder().setSession(Collections.<Session>emptyList()).build()).when(monitor).getSessions();
+        doReturn(new SchemasBuilder().setSchema(Collections.<Schema>emptyList()).build()).when(monitor).getSchemas();
+        doReturn(false).when(subsequentOperation).isExecutionTermination();
+
+        get = new Get(monitor);
+    }
+
+    @Test
+    public void testHandleNoSubsequent() throws Exception {
+        try {
+            get.handle(null, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+        } catch (final NetconfDocumentedException e) {
+            assertNetconfDocumentedEx(e, NetconfDocumentedException.ErrorSeverity.error, NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorType.application);
+            return;
+        }
+
+        fail("Get should fail without subsequent operation");
+    }
+
+    @Test
+    public void testHandleWrongPlaceholder() throws Exception {
+        doReturn(incorrectSubsequentResult).when(subsequentOperation).execute(request);
+        try {
+            get.handle(request, subsequentOperation);
+        } catch (final NetconfDocumentedException e) {
+            assertNetconfDocumentedEx(e, NetconfDocumentedException.ErrorSeverity.error, NetconfDocumentedException.ErrorTag.invalid_value, NetconfDocumentedException.ErrorType.application);
+            return;
+        }
+
+        fail("Get should fail with wrong xml");
+    }
+
+    @Test
+    public void testHandleRuntimeEx() throws Exception {
+        doThrow(RuntimeException.class).when(subsequentOperation).execute(request);
+        try {
+            get.handle(request, subsequentOperation);
+        } catch (final NetconfDocumentedException e) {
+            assertNetconfDocumentedEx(e, NetconfDocumentedException.ErrorSeverity.error, NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorType.application);
+            assertEquals(1, e.getErrorInfo().size());
+            return;
+        }
+
+        fail("Get should fail with wrong xml");
+    }
+
+    @Test
+    public void testSuccessHandle() throws Exception {
+        doReturn(correctSubsequentResult).when(subsequentOperation).execute(request);
+        assertTrue(get.getHandlingPriority().getPriority().get() > HandlingPriority.HANDLE_WITH_DEFAULT_PRIORITY.getPriority().get());
+        final Document result = get.handle(request, subsequentOperation);
+        assertThat(XmlUtil.toString(result), CoreMatchers.containsString("sessions"));
+        assertThat(XmlUtil.toString(result), CoreMatchers.containsString("schemas"));
+
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testHandle() throws Exception {
+        get.handle(null, null, null);
+
+    }
+
+    private void assertNetconfDocumentedEx(final NetconfDocumentedException e, final NetconfDocumentedException.ErrorSeverity severity, final NetconfDocumentedException.ErrorTag errorTag, final NetconfDocumentedException.ErrorType type) {
+        assertEquals(severity, e.getErrorSeverity());
+        assertEquals(errorTag, e.getErrorTag());
+        assertEquals(type, e.getErrorType());
+    }
+}
index 02129574da40ec274a22b077614c38653a14770d..d0d587fb84263c02ef99cb0727de1d4c81b28223 100644 (file)
@@ -7,37 +7,40 @@
 */
 package org.opendaylight.controller.netconf.monitoring.xml;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import com.google.common.collect.Lists;
+import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
 import org.opendaylight.controller.netconf.monitoring.xml.model.NetconfState;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.NetconfTcp;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.Session1;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfSsh;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Transport;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaKey;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.DateAndTime;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.ZeroBasedCounter32;
-import org.w3c.dom.Element;
-
-import java.util.Date;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 
 public class JaxBSerializerTest {
 
     @Test
     public void testName() throws Exception {
 
-        NetconfMonitoringService service = new NetconfMonitoringService() {
+        final NetconfMonitoringService service = new NetconfMonitoringService() {
 
             @Override
             public Sessions getSessions() {
@@ -46,19 +49,54 @@ public class JaxBSerializerTest {
 
             @Override
             public Schemas getSchemas() {
-                return new SchemasBuilder().setSchema(Lists.<Schema>newArrayList()).build();
+                return new SchemasBuilder().setSchema(Lists.newArrayList(getMockSchema("id", "v1", Yang.class), getMockSchema("id2", "", Yang.class))).build();
             }
         };
-        NetconfState model = new NetconfState(service);
-        Element xml = new JaxBSerializer().toXml(model);
+        final NetconfState model = new NetconfState(service);
+        final String xml = XmlUtil.toString(new JaxBSerializer().toXml(model));
+
+        assertThat(xml, CoreMatchers.containsString(
+                "<schema>\n" +
+                "<format>yang</format>\n" +
+                "<identifier>id</identifier>\n" +
+                "<location>NETCONF</location>\n" +
+                "<namespace>localhost</namespace>\n" +
+                "<version>v1</version>\n" +
+                "</schema>\n"));
+
+        assertThat(xml, CoreMatchers.containsString(
+                "<session>\n" +
+                "<session-id>1</session-id>\n" +
+                "<in-bad-rpcs>0</in-bad-rpcs>\n" +
+                "<in-rpcs>0</in-rpcs>\n" +
+                "<login-time>loginTime</login-time>\n" +
+                "<out-notifications>0</out-notifications>\n" +
+                "<out-rpc-errors>0</out-rpc-errors>\n" +
+                "<ncme:session-identifier>client</ncme:session-identifier>\n" +
+                "<source-host>address/port</source-host>\n" +
+                "<transport>ncme:netconf-tcp</transport>\n" +
+                "<username>username</username>\n" +
+                "</session>"));
+    }
+
+    private Schema getMockSchema(final String id, final String version, final Class<Yang> format) {
+        final Schema mock = mock(Schema.class);
+
+        doReturn(format).when(mock).getFormat();
+        doReturn(id).when(mock).getIdentifier();
+        doReturn(new Uri("localhost")).when(mock).getNamespace();
+        doReturn(version).when(mock).getVersion();
+        doReturn(Lists.newArrayList(new Schema.Location(Schema.Location.Enumeration.NETCONF))).when(mock).getLocation();
+        doReturn(new SchemaKey(format, id, version)).when(mock).getKey();
+        return mock;
     }
 
-    private Session getMockSession(Class<? extends Transport> transportType) {
-        Session mocked = mock(Session.class);
-        Session1 mockedSession1 = mock(Session1.class);
+    private Session getMockSession(final Class<? extends Transport> transportType) {
+        final Session mocked = mock(Session.class);
+        final Session1 mockedSession1 = mock(Session1.class);
         doReturn("client").when(mockedSession1).getSessionIdentifier();
         doReturn(1L).when(mocked).getSessionId();
-        doReturn(new DateAndTime(new Date().toString())).when(mocked).getLoginTime();
+        doReturn(new DateAndTime("loginTime")).when(mocked).getLoginTime();
         doReturn(new Host(new DomainName("address/port"))).when(mocked).getSourceHost();
         doReturn(new ZeroBasedCounter32(0L)).when(mocked).getInBadRpcs();
         doReturn(new ZeroBasedCounter32(0L)).when(mocked).getInRpcs();
index 80dc1ae0fd352fd6827701059eeb10fa5050f5f2..cb8461a29965816320f1de6087e7a13629c4be9e 100644 (file)
@@ -64,7 +64,6 @@
       <groupId>org.openexi</groupId>
       <artifactId>nagasena-rta</artifactId>
     </dependency>
-
     <dependency>
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.core</artifactId>
       <groupId>xmlunit</groupId>
       <artifactId>xmlunit</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>mockito-configuration</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
index d765ca8b2573702368ca9aca3e65212a8eed1e0a..f39e2c425d039cca549fd574607c52af02eced9e 100644 (file)
@@ -53,8 +53,7 @@ public final class NetconfHelloMessageToXMLEncoder extends NetconfMessageToXMLEn
         Optional<NetconfHelloMessageAdditionalHeader> headerOptional = ((NetconfHelloMessage) msg)
                 .getAdditionalHeader();
 
-        // If additional header present, serialize it along with netconf hello
-        // message
+        // If additional header present, serialize it along with netconf hello message
         if (headerOptional.isPresent()) {
             out.writeBytes(headerOptional.get().toFormattedString().getBytes(Charsets.UTF_8));
         }
index 69c0d53fc12144ad7e780126e5f8d3fe6372a571..bfc8d77e17bc7e4ae799d9b0859bdf3085f5e6af 100644 (file)
@@ -7,26 +7,21 @@
  */
 package org.opendaylight.controller.netconf.nettyutil.handler;
 
-import java.util.List;
-
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.List;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
     private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToMessageDecoder.class);
 
     @Override
-    @VisibleForTesting
     public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
 
         if (in.readableBytes() != 0) {
index eea2b8693a26449a5f96b8d791f4b07082273bf8..0548b1d371a415569dffc1826c02fae4700df45d 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication;
 
-import ch.ethz.ssh2.Connection;
 import java.io.IOException;
 import org.apache.sshd.ClientSession;
 
@@ -16,8 +15,6 @@ import org.apache.sshd.ClientSession;
  * Class providing authentication facility to SSH handler.
  */
 public abstract class AuthenticationHandler {
-    public abstract void authenticate(Connection connection) throws IOException;
-
 
     public abstract String getUsername();
 
index 553e5359ffed1cd8b5728734bb5546e3abb0f72e..ab94e59a93dcf4e70494f7bf65ba66a28b5915c0 100644 (file)
@@ -9,15 +9,12 @@
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication;
 
 import java.io.IOException;
-
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.client.future.AuthFuture;
 
-import ch.ethz.ssh2.Connection;
-
 /**
  * Class Providing username/password authentication option to
- * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler}
+ * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandler}
  */
 public class LoginPassword extends AuthenticationHandler {
     private final String username;
@@ -28,15 +25,6 @@ public class LoginPassword extends AuthenticationHandler {
         this.password = password;
     }
 
-    @Override
-    public void authenticate(Connection connection) throws IOException {
-        final boolean isAuthenticated = connection.authenticateWithPassword(username, password);
-
-        if (!isAuthenticated) {
-            throw new IOException("Authentication failed.");
-        }
-    }
-
     @Override
     public String getUsername() {
         return username;
index 2761a45d03bedc8730cb69036748c266cdc7f412..935cb8dcd06ca966e6c560560d2030150cced460 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -25,7 +26,10 @@ import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.util.Buffer;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.slf4j.Logger;
@@ -53,10 +57,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private final SshClient sshClient;
 
     private SshReadAsyncListener sshReadAsyncListener;
+    private SshWriteAsyncHandler sshWriteAsyncHandler;
+
     private ClientChannel channel;
     private ClientSession session;
     private ChannelPromise connectPromise;
 
+
     public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
         return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
     }
@@ -139,10 +146,11 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
         connectPromise.setSuccess();
         connectPromise = null;
-        ctx.fireChannelActive();
 
-        final IoInputStream asyncOut = channel.getAsyncOut();
-        sshReadAsyncListener = new SshReadAsyncListener(ctx, asyncOut);
+        sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
+        sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+
+        ctx.fireChannelActive();
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@@ -154,17 +162,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-        try {
-            if(channel.getAsyncIn().isClosed() || channel.getAsyncIn().isClosing()) {
-                handleSshSessionClosed(ctx);
-            } else {
-                channel.getAsyncIn().write(toBuffer(msg));
-                ((ByteBuf) msg).release();
-            }
-        } catch (final Exception e) {
-            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
-            throw new IllegalStateException("Exception while writing to SSH remote on channel " + ctx.channel(),e);
-        }
+        sshWriteAsyncHandler.write(ctx, msg, promise);
     }
 
     private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
@@ -172,15 +170,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         ctx.fireChannelInactive();
     }
 
-    private Buffer toBuffer(final Object msg) {
-        // TODO Buffer vs ByteBuf translate, Can we handle that better ?
-        Preconditions.checkState(msg instanceof ByteBuf);
-        final ByteBuf byteBuf = (ByteBuf) msg;
-        final byte[] temp = new byte[byteBuf.readableBytes()];
-        byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
-        return new Buffer(temp);
-    }
-
     @Override
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
         this.connectPromise = promise;
@@ -193,22 +182,31 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     }
 
     @Override
-    public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
+    public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
         if(sshReadAsyncListener != null) {
             sshReadAsyncListener.close();
         }
 
-        session.close(false).addListener(new SshFutureListener<CloseFuture>() {
-            @Override
-            public void operationComplete(final CloseFuture future) {
-                if(future.isClosed() == false) {
-                    session.close(true);
+        if(sshWriteAsyncHandler != null) {
+            sshWriteAsyncHandler.close();
+        }
+
+        if(session!= null && !session.isClosed() && !session.isClosing()) {
+            session.close(false).addListener(new SshFutureListener<CloseFuture>() {
+                @Override
+                public void operationComplete(final CloseFuture future) {
+                    if (future.isClosed() == false) {
+                        session.close(true);
+                    }
+                    session = null;
                 }
-                session = null;
-            }
-        });
+            });
+        }
 
         channel = null;
+        promise.setSuccess();
+
+        handleSshSessionClosed(ctx);
     }
 
     /**
@@ -255,7 +253,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         }
 
         @Override
-        public synchronized void close() throws Exception {
+        public synchronized void close() {
             // Remove self as listener on close to prevent reading from closed input
             if(currentReadFuture != null) {
                 currentReadFuture.removeListener(this);
@@ -264,4 +262,103 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             asyncOut = null;
         }
     }
+
+    private static final class SshWriteAsyncHandler implements AutoCloseable {
+        public static final int MAX_PENDING_WRITES = 100;
+
+        private final ChannelOutboundHandler channelHandler;
+        private IoOutputStream asyncIn;
+
+        // Counter that holds the amount of pending write messages
+        // Pending write can occur in case remote window is full
+        // In such case, we need to wait for the pending write to finish
+        private int pendingWriteCounter;
+        // Last write future, that can be pending
+        private IoWriteFuture lastWriteFuture;
+
+        public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
+            this.channelHandler = channelHandler;
+            this.asyncIn = asyncIn;
+        }
+
+        public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
+            try {
+                if(asyncIn.isClosed() || asyncIn.isClosing()) {
+                    handleSshSessionClosed(ctx);
+                } else {
+                    lastWriteFuture = asyncIn.write(toBuffer(msg));
+                    lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+
+                        @Override
+                        public void operationComplete(final IoWriteFuture future) {
+                            ((ByteBuf) msg).release();
+
+                            // Notify success or failure
+                            if (future.isWritten()) {
+                                promise.setSuccess();
+                            }
+                            promise.setFailure(future.getException());
+
+                            // Reset last pending future
+                            synchronized (SshWriteAsyncHandler.this) {
+                                lastWriteFuture = null;
+                            }
+                        }
+                    });
+                }
+            } catch (final WritePendingException e) {
+                // Check limit for pending writes
+                pendingWriteCounter++;
+                if(pendingWriteCounter > MAX_PENDING_WRITES) {
+                    handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
+                            ", remote window is not getting read or is too small"));
+                }
+
+                logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
+
+                // In case of pending, re-invoke write after pending is finished
+                lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+                    @Override
+                    public void operationComplete(final IoWriteFuture future) {
+                        if(future.isWritten()) {
+                            synchronized (SshWriteAsyncHandler.this) {
+                                // Pending done, decrease counter
+                                pendingWriteCounter--;
+                            }
+                            write(ctx, msg, promise);
+                        } else {
+                            // Cannot reschedule pending, fail
+                            handlePendingFailed(ctx, e);
+                        }
+                    }
+
+                });
+            }
+        }
+
+        private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
+            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
+            try {
+                channelHandler.disconnect(ctx, ctx.newPromise());
+            } catch (final Exception ex) {
+                // This should not happen
+                throw new IllegalStateException(ex);
+            }
+        }
+
+        @Override
+        public void close() {
+            asyncIn = null;
+        }
+
+        private Buffer toBuffer(final Object msg) {
+            // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+            Preconditions.checkState(msg instanceof ByteBuf);
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            final byte[] temp = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
+            return new Buffer(temp);
+        }
+
+    }
 }
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/ChannelInputStream.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/ChannelInputStream.java
deleted file mode 100644 (file)
index ba65b9e..0000000
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Class provides {@link InputStream} functionality to users of virtual socket.
- */
-public class ChannelInputStream extends InputStream implements ChannelInboundHandler {
-    private final Object lock = new Object();
-    private final ByteBuf bb = Unpooled.buffer();
-
-    @Override
-    public int read(byte b[], int off, int len) throws IOException {
-        if (b == null) {
-            throw new NullPointerException();
-        } else if (off < 0 || len < 0 || len > b.length - off) {
-            throw new IndexOutOfBoundsException();
-        } else if (len == 0) {
-            return 0;
-        }
-
-        int bytesRead = 1;
-        synchronized (lock) {
-            int c = read();
-
-            b[off] = (byte)c;
-
-            if(this.bb.readableBytes() == 0) {
-                return bytesRead;
-            }
-
-            int ltr = len-1;
-            ltr = (ltr <= bb.readableBytes()) ? ltr : bb.readableBytes();
-
-            bb.readBytes(b, 1, ltr);
-            bytesRead += ltr;
-        }
-        return bytesRead;
-    }
-
-    @Override
-    public int read() throws IOException {
-        synchronized (lock) {
-            while (this.bb.readableBytes() == 0) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new IllegalStateException(e);
-                }
-            }
-            return this.bb.readByte() & 0xFF;
-        }
-    }
-
-    @Override
-    public int available() throws IOException {
-        synchronized (lock) {
-            return this.bb.readableBytes();
-        }
-    }
-
-    public void channelRegistered(ChannelHandlerContext ctx) {
-        ctx.fireChannelRegistered();
-    }
-
-    public void channelUnregistered(ChannelHandlerContext ctx) {
-        ctx.fireChannelUnregistered();
-    }
-
-    public void channelActive(ChannelHandlerContext ctx) {
-        ctx.fireChannelActive();
-    }
-
-    public void channelInactive(ChannelHandlerContext ctx) {
-        ctx.fireChannelInactive();
-    }
-
-    public void channelRead(ChannelHandlerContext ctx, Object o) {
-        synchronized(lock) {
-            this.bb.discardReadBytes();
-            this.bb.writeBytes((ByteBuf) o);
-            ((ByteBuf) o).release();
-            lock.notifyAll();
-        }
-    }
-
-    public void channelReadComplete(ChannelHandlerContext ctx) {
-        ctx.fireChannelReadComplete();
-    }
-
-    public void userEventTriggered(ChannelHandlerContext ctx, Object o) {
-        ctx.fireUserEventTriggered(o);
-    }
-
-    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
-        ctx.fireChannelWritabilityChanged();
-    }
-
-    public void handlerAdded(ChannelHandlerContext ctx) {
-    }
-
-    public void handlerRemoved(ChannelHandlerContext ctx) {
-    }
-
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
-        ctx.fireExceptionCaught(throwable);
-    }
-}
-
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/ChannelOutputStream.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/ChannelOutputStream.java
deleted file mode 100644 (file)
index 2dc5235..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
-import io.netty.channel.ChannelPromise;
-
-import java.io.OutputStream;
-import java.net.SocketAddress;
-
-/**
- * Class provides {@link OutputStream) functionality to users of virtual socket.
- */
-public class ChannelOutputStream extends OutputStream implements ChannelOutboundHandler {
-    private final Object lock = new Object();
-    private ByteBuf buff = Unpooled.buffer();
-    private ChannelHandlerContext ctx;
-
-    @Override
-    public void flush() {
-        synchronized(lock) {
-            ctx.writeAndFlush(buff).awaitUninterruptibly();
-            buff = Unpooled.buffer();
-        }
-    }
-
-    @Override
-    public void write(int b) {
-        synchronized(lock) {
-            buff.writeByte(b);
-        }
-    }
-
-    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
-                     ChannelPromise promise) {
-        ctx.bind(localAddress, promise);
-    }
-
-    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
-                        SocketAddress localAddress, ChannelPromise promise) {
-        this.ctx = ctx;
-        ctx.connect(remoteAddress, localAddress, promise);
-    }
-
-    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
-        ctx.disconnect(promise);
-    }
-
-    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
-        ctx.close(promise);
-    }
-
-    public void deregister(ChannelHandlerContext ctx, ChannelPromise channelPromise) {
-        ctx.deregister(channelPromise);
-    }
-
-    public void read(ChannelHandlerContext ctx) {
-        ctx.read();
-    }
-
-    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
-        // pass
-    }
-
-    public void flush(ChannelHandlerContext ctx) {
-        // pass
-    }
-
-    public void handlerAdded(ChannelHandlerContext ctx)
-            throws Exception {
-    }
-
-    public void handlerRemoved(ChannelHandlerContext ctx)
-            throws Exception {
-    }
-
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        ctx.fireExceptionCaught(cause);
-    }
-}
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/VirtualSocket.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/VirtualSocket.java
deleted file mode 100644 (file)
index 69cce80..0000000
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SocketChannel;
-
-/**
- * Handler class providing Socket functionality to OIO client application. By using VirtualSocket user can
- * use OIO application in asynchronous environment and NIO EventLoop. Using VirtualSocket OIO applications
- * are able to use full potential of NIO environment.
- */
-//TODO: refactor - socket should be created when connection is established
-public class VirtualSocket extends Socket implements ChannelHandler {
-    private static final String INPUT_STREAM = "inputStream";
-    private static final String OUTPUT_STREAM = "outputStream";
-
-    private final ChannelInputStream chais = new ChannelInputStream();
-    private final ChannelOutputStream chaos = new ChannelOutputStream();
-    private ChannelHandlerContext ctx;
-
-
-    public InputStream getInputStream() {
-        return this.chais;
-    }
-
-    public OutputStream getOutputStream() {
-        return this.chaos;
-    }
-
-    public void handlerAdded(ChannelHandlerContext ctx) {
-        this.ctx = ctx;
-
-        if (ctx.channel().pipeline().get(OUTPUT_STREAM) == null) {
-            ctx.channel().pipeline().addFirst(OUTPUT_STREAM, chaos);
-        }
-
-        if (ctx.channel().pipeline().get(INPUT_STREAM) == null) {
-            ctx.channel().pipeline().addFirst(INPUT_STREAM, chais);
-        }
-    }
-
-    public void handlerRemoved(ChannelHandlerContext ctx) {
-        if (ctx.channel().pipeline().get(OUTPUT_STREAM) != null) {
-            ctx.channel().pipeline().remove(OUTPUT_STREAM);
-        }
-
-        if (ctx.channel().pipeline().get(INPUT_STREAM) != null) {
-            ctx.channel().pipeline().remove(INPUT_STREAM);
-        }
-    }
-
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
-        // TODO exceptionCaught is deprecated transform this handler
-        ctx.fireExceptionCaught(throwable);
-    }
-
-
-    @Override
-    public void connect(SocketAddress endpoint) throws IOException {}
-
-    @Override
-    public void connect(SocketAddress endpoint, int timeout) throws IOException {}
-
-    @Override
-    public void bind(SocketAddress bindpoint) throws IOException {}
-
-    @Override
-    public InetAddress getInetAddress() {
-        InetSocketAddress isa = getInetSocketAddress();
-        return isa.getAddress();
-    }
-
-    @Override
-    public InetAddress getLocalAddress() {return null;}
-
-    @Override
-    public int getPort() {
-        return getInetSocketAddress().getPort();
-    }
-
-    private InetSocketAddress getInetSocketAddress() {
-        return (InetSocketAddress)getRemoteSocketAddress();
-    }
-
-    @Override
-    public int getLocalPort() {return -1;}
-
-    @Override
-    public SocketAddress getRemoteSocketAddress() {
-        return this.ctx.channel().remoteAddress();
-    }
-
-    @Override
-    public SocketAddress getLocalSocketAddress() {
-        return this.ctx.channel().localAddress();
-    }
-
-    @Override
-    public SocketChannel getChannel() {return null;}
-
-    @Override
-    public void setTcpNoDelay(boolean on) throws SocketException {}
-
-    @Override
-    public boolean getTcpNoDelay() throws SocketException {return false;}
-
-    @Override
-    public void setSoLinger(boolean on, int linger) throws SocketException {}
-
-    @Override
-    public int getSoLinger() throws SocketException {return -1;}
-
-    @Override
-    public void sendUrgentData(int data) throws IOException {}
-
-    @Override
-    public void setOOBInline(boolean on) throws SocketException {}
-
-    @Override
-    public boolean getOOBInline() throws SocketException {return false;}
-
-    @Override
-    public synchronized void setSoTimeout(int timeout) throws SocketException {}
-
-    @Override
-    public synchronized int getSoTimeout() throws SocketException {return -1;}
-
-    @Override
-    public synchronized void setSendBufferSize(int size) throws SocketException {}
-
-    @Override
-    public synchronized int getSendBufferSize() throws SocketException {return -1;}
-
-    @Override
-    public synchronized void setReceiveBufferSize(int size) throws SocketException {}
-
-    @Override
-    public synchronized int getReceiveBufferSize() throws SocketException {return -1;}
-
-    @Override
-    public void setKeepAlive(boolean on) throws SocketException {}
-
-    @Override
-    public boolean getKeepAlive() throws SocketException {return false;}
-
-    @Override
-    public void setTrafficClass(int tc) throws SocketException {}
-
-    @Override
-    public int getTrafficClass() throws SocketException {return -1;}
-
-    @Override
-    public void setReuseAddress(boolean on) throws SocketException {}
-
-    @Override
-    public boolean getReuseAddress() throws SocketException {return false;}
-
-    @Override
-    public synchronized void close() throws IOException {}
-
-    @Override
-    public void shutdownInput() throws IOException {}
-
-    @Override
-    public void shutdownOutput() throws IOException {}
-
-    @Override
-    public String toString() {
-        return "VirtualSocket{" + getInetAddress() + ":" + getPort() + "}";
-    }
-
-    @Override
-    public boolean isConnected() {return false;}
-
-    @Override
-    public boolean isBound() {return false;}
-
-    @Override
-    public boolean isClosed() {return false;}
-
-    @Override
-    public boolean isInputShutdown() {return false;}
-
-    @Override
-    public boolean isOutputShutdown() {return false;}
-
-    @Override
-    public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {}
-}
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ChunkedFramingMechanismEncoderTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ChunkedFramingMechanismEncoderTest.java
new file mode 100644 (file)
index 0000000..9347512
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+
+public class ChunkedFramingMechanismEncoderTest {
+
+    private int chunkSize;
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        chunkSize = 256;
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIllegalSize() throws Exception {
+        new ChunkedFramingMechanismEncoder(10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIllegalSizeMax() throws Exception {
+        new ChunkedFramingMechanismEncoder(Integer.MAX_VALUE);
+    }
+
+    @Test
+    public void testEncode() throws Exception {
+        final List<ByteBuf> chunks = Lists.newArrayList();
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                chunks.add((ByteBuf) invocation.getArguments()[0]);
+                return null;
+            }
+        }).when(ctx).write(anyObject());
+
+        final ChunkedFramingMechanismEncoder encoder = new ChunkedFramingMechanismEncoder(chunkSize);
+        final int lastChunkSize = 20;
+        final ByteBuf src = Unpooled.wrappedBuffer(getByteArray(chunkSize * 4 + lastChunkSize));
+        final ByteBuf destination = Unpooled.buffer();
+        encoder.encode(ctx, src, destination);
+        assertEquals(4, chunks.size());
+
+        final int framingSize = "#256\n".getBytes().length + 1/* new line at end */;
+
+        for (final ByteBuf chunk : chunks) {
+            assertEquals(chunkSize + framingSize, chunk.readableBytes());
+        }
+
+        final int lastFramingSize = "#20\n".length() + NetconfMessageConstants.END_OF_CHUNK.length + 1/* new line at end */;
+        assertEquals(lastChunkSize + lastFramingSize, destination.readableBytes());
+    }
+
+    private byte[] getByteArray(final int size) {
+        final byte[] bytes = new byte[size];
+        for (int i = 0; i < size; i++) {
+            bytes[i] = 'a';
+        }
+        return bytes;
+    }
+}
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/EOMFramingMechanismEncoderTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/EOMFramingMechanismEncoderTest.java
new file mode 100644 (file)
index 0000000..158f3a8
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+
+public class EOMFramingMechanismEncoderTest {
+
+    @Test
+    public void testEncode() throws Exception {
+        final byte[] content = new byte[50];
+        final ByteBuf source = Unpooled.wrappedBuffer(content);
+        final ByteBuf destination = Unpooled.buffer();
+        new EOMFramingMechanismEncoder().encode(null, source, destination);
+
+        assertEquals(Unpooled.wrappedBuffer(source.array(), NetconfMessageConstants.END_OF_MESSAGE), destination);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/FramingMechanismHandlerFactoryTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/FramingMechanismHandlerFactoryTest.java
new file mode 100644 (file)
index 0000000..4f123f0
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
+
+public class FramingMechanismHandlerFactoryTest {
+
+    @Test
+    public void testCreate() throws Exception {
+        MatcherAssert.assertThat(FramingMechanismHandlerFactory
+                .createHandler(FramingMechanism.CHUNK), CoreMatchers
+                .instanceOf(ChunkedFramingMechanismEncoder.class));
+        MatcherAssert.assertThat(FramingMechanismHandlerFactory
+                .createHandler(FramingMechanism.EOM), CoreMatchers
+                .instanceOf(EOMFramingMechanismEncoder.class));
+    }
+}
\ No newline at end of file
index e088859e82a628a1fa166def6ae743f58e2d0045..a647b9ee172f44fc9e15baf7a284368ab00a8b19 100644 (file)
@@ -7,16 +7,16 @@
  */
 package org.opendaylight.controller.netconf.nettyutil.handler;
 
+import static org.junit.Assert.assertEquals;
+
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import junit.framework.Assert;
+import java.util.List;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.List;
-
 public class NetconfChunkAggregatorTest {
 
     private static final String CHUNKED_MESSAGE = "\n#4\n" +
@@ -45,26 +45,26 @@ public class NetconfChunkAggregatorTest {
 
     @Test
     public void testMultipleChunks() throws Exception {
-        List<Object> output = Lists.newArrayList();
-        ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE.getBytes(Charsets.UTF_8));
+        final List<Object> output = Lists.newArrayList();
+        final ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE.getBytes(Charsets.UTF_8));
         agr.decode(null, input, output);
 
-        Assert.assertEquals(1, output.size());
-        ByteBuf chunk = (ByteBuf) output.get(0);
+        assertEquals(1, output.size());
+        final ByteBuf chunk = (ByteBuf) output.get(0);
 
-        Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+        assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
     }
 
     @Test
     public void testOneChunks() throws Exception {
-        List<Object> output = Lists.newArrayList();
-        ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE_ONE.getBytes(Charsets.UTF_8));
+        final List<Object> output = Lists.newArrayList();
+        final ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE_ONE.getBytes(Charsets.UTF_8));
         agr.decode(null, input, output);
 
-        Assert.assertEquals(1, output.size());
-        ByteBuf chunk = (ByteBuf) output.get(0);
+        assertEquals(1, output.size());
+        final ByteBuf chunk = (ByteBuf) output.get(0);
 
-        Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+        assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
     }
 
 
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfHelloMessageToXMLEncoderTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfHelloMessageToXMLEncoderTest.java
new file mode 100644 (file)
index 0000000..00d95df
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+
+public class NetconfHelloMessageToXMLEncoderTest {
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testEncode() throws Exception {
+        final NetconfMessage msg = new NetconfHelloMessage(XmlUtil.readXmlToDocument("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"),
+                NetconfHelloMessageAdditionalHeader.fromString("[tomas;10.0.0.0:10000;tcp;client;]"));
+        final ByteBuf destination = Unpooled.buffer();
+        new NetconfHelloMessageToXMLEncoder().encode(ctx, msg, destination);
+
+        final String encoded = new String(destination.array());
+        assertThat(encoded, containsString("[tomas;10.0.0.0:10000;tcp;client;]"));
+        assertThat(encoded, containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+    }
+
+    @Test
+    public void testEncodeNoHeader() throws Exception {
+        final NetconfMessage msg = new NetconfHelloMessage(XmlUtil.readXmlToDocument("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+        final ByteBuf destination = Unpooled.buffer();
+        new NetconfHelloMessageToXMLEncoder().encode(ctx, msg, destination);
+
+        final String encoded = new String(destination.array());
+        assertThat(encoded, not(containsString("[tomas;10.0.0.0:10000;tcp;client;]")));
+        assertThat(encoded, containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testEncodeNotHello() throws Exception {
+        final NetconfMessage msg = new NetconfMessage(XmlUtil.readXmlToDocument("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+        new NetconfHelloMessageToXMLEncoder().encode(ctx, msg, null);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfXMLToHelloMessageDecoderTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfXMLToHelloMessageDecoderTest.java
new file mode 100644 (file)
index 0000000..f0c0d63
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+
+public class NetconfXMLToHelloMessageDecoderTest {
+
+    @Test
+    public void testDecodeWithHeader() throws Exception {
+        final ByteBuf src = Unpooled.wrappedBuffer(String.format("%s\n%s",
+                "[tomas;10.0.0.0:10000;tcp;client;]", "<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>").getBytes());
+        final List<Object> out = Lists.newArrayList();
+        new NetconfXMLToHelloMessageDecoder().decode(null, src, out);
+
+        assertEquals(1, out.size());
+        assertThat(out.get(0), CoreMatchers.instanceOf(NetconfHelloMessage.class));
+        final NetconfHelloMessage hello = (NetconfHelloMessage) out.get(0);
+        assertTrue(hello.getAdditionalHeader().isPresent());
+        assertEquals("[tomas;10.0.0.0:10000;tcp;client;]\n", hello.getAdditionalHeader().get().toFormattedString());
+        assertThat(XmlUtil.toString(hello.getDocument()), CoreMatchers.containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\""));
+    }
+
+    @Test
+    public void testDecodeNoHeader() throws Exception {
+        final ByteBuf src = Unpooled.wrappedBuffer("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+        final List<Object> out = Lists.newArrayList();
+        new NetconfXMLToHelloMessageDecoder().decode(null, src, out);
+
+        assertEquals(1, out.size());
+        assertThat(out.get(0), CoreMatchers.instanceOf(NetconfHelloMessage.class));
+        final NetconfHelloMessage hello = (NetconfHelloMessage) out.get(0);
+        assertFalse(hello.getAdditionalHeader().isPresent());
+    }
+
+    @Test
+    public void testDecodeCaching() throws Exception {
+        final ByteBuf msg1 = Unpooled.wrappedBuffer("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+        final ByteBuf msg2 = Unpooled.wrappedBuffer("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+        final ByteBuf src = Unpooled.wrappedBuffer("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+        final List<Object> out = Lists.newArrayList();
+        final NetconfXMLToHelloMessageDecoder decoder = new NetconfXMLToHelloMessageDecoder();
+        decoder.decode(null, src, out);
+        decoder.decode(null, msg1, out);
+        decoder.decode(null, msg2, out);
+
+        assertEquals(1, out.size());
+
+        assertEquals(2, Iterables.size(decoder.getPostHelloNetconfMessages()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDecodeNotHelloReceived() throws Exception {
+        final ByteBuf msg1 = Unpooled.wrappedBuffer("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+        final List<Object> out = Lists.newArrayList();
+        NetconfXMLToHelloMessageDecoder decoder = new NetconfXMLToHelloMessageDecoder();
+        decoder.decode(null, msg1, out);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfXMLToMessageDecoderTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfXMLToMessageDecoderTest.java
new file mode 100644 (file)
index 0000000..f85a387
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import org.junit.Test;
+
+public class NetconfXMLToMessageDecoderTest {
+
+    @Test
+    public void testDecodeNoMoreContent() throws Exception {
+        final ArrayList<Object> out = Lists.newArrayList();
+        new NetconfXMLToMessageDecoder().decode(null, Unpooled.buffer(), out);
+        assertEquals(0, out.size());
+    }
+
+    @Test
+    public void testDecode() throws Exception {
+        final ArrayList<Object> out = Lists.newArrayList();
+        new NetconfXMLToMessageDecoder().decode(null, Unpooled.wrappedBuffer("<msg/>".getBytes()), out);
+        assertEquals(1, out.size());
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/authentication/LoginPasswordTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/authentication/LoginPasswordTest.java
new file mode 100644 (file)
index 0000000..01df1e3
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.sshd.ClientSession;
+import org.apache.sshd.client.future.AuthFuture;
+import org.junit.Test;
+
+public class LoginPasswordTest {
+
+    @Test
+    public void testLoginPassword() throws Exception {
+        final LoginPassword loginPassword = new LoginPassword("user", "pwd");
+        assertEquals("user", loginPassword.getUsername());
+
+        final ClientSession session = mock(ClientSession.class);
+        doNothing().when(session).addPasswordIdentity("pwd");
+        doReturn(mock(AuthFuture.class)).when(session).auth();
+        loginPassword.authenticate(session);
+
+        verify(session).addPasswordIdentity("pwd");
+        verify(session).auth();
+    }
+}
\ No newline at end of file
index 1b2201170a2426dabce3174515bfe3784e374ced..b32e880537e06d44571875a3dda56c4a36dddf6f 100644 (file)
@@ -118,7 +118,7 @@ public class SSHTest {
             Thread.sleep(100);
         }
         assertFalse(echoClientHandler.isConnected());
-        assertEquals(State.FAILED_TO_CONNECT, echoClientHandler.getState());
+        assertEquals(State.CONNECTION_CLOSED, echoClientHandler.getState());
     }
 
 }
index 0a1da7760412edb0308b64d26ee83d92c51a8bee..69a57748ddabc88212411cf6b4ecf3d088a80381 100644 (file)
@@ -75,5 +75,5 @@ public interface INeutronLoadBalancerPoolMemberAware {
      *            instance of deleted LoadBalancerPool object
      * @return void
      */
-    public void NeutronLoadBalancerPoolMemberDeleted(NeutronLoadBalancerPoolMember loadBalancerPoolMember);
+    public void neutronLoadBalancerPoolMemberDeleted(NeutronLoadBalancerPoolMember loadBalancerPoolMember);
 }