Merge "Flow request containing an invalid action should be rejected"
authorGiovanni Meo <gmeo@cisco.com>
Tue, 22 Jul 2014 06:28:34 +0000 (06:28 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 22 Jul 2014 06:28:34 +0000 (06:28 +0000)
76 files changed:
opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java
opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/yang/threadpool/impl/flexible/FlexibleThreadPoolModule.java
opendaylight/config/threadpool-config-impl/src/main/yang/threadpool-impl-flexible.yang
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ReadTransaction.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/WriteTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractWriteTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataWriteTransactionImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/ForwardedBackwardsCompatibleDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/RpcProviderRegistryImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomCommitHandler.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingCommitHandler.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/Bug1125RegressionTest.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/ListInsertionDataChangeListenerTest.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/WriteTransactionTest.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/AbstractDataBrokerTest.java
opendaylight/md-sal/sal-binding-util/src/main/java/org/opendaylight/controller/md/sal/binding/util/TypeSafeDataReader.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/AsyncReadOnlyTransaction.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/AsyncReadTransaction.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/AsyncWriteTransaction.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataValidationFailedException.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/OptimisticLockFailedException.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitFailedException.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataReadTransaction.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataWriteTransaction.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedTransactionFactory.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitImplementation.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedWriteTransaction.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/compat/BackwardsCompatibleTransaction.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/NotificationModule.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerPerformanceTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTwoPhaseCommitTransaction.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/java/org/opendaylight/controller/cluster/datastore/util/NormalizedNodeXmlConverterTest.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestCodec.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestGetOperationTest.java
opendaylight/md-sal/samples/toaster-provider/src/main/java/org/opendaylight/controller/sample/toaster/provider/OpendaylightToaster.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreServiceImpl.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Disconnect.java
opendaylight/netconf/netconf-cli/src/test/java/org/opendaylight/controller/netconf/cli/io/IOUtilTest.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SshClientChannelInitializer.java
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/test/TestingNetconfClient.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionNegotiator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEOMAggregator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToXMLEncoder.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/EXIParameters.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/NetconfStartExiMessage.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/authentication/LoginPassword.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/Invoker.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClient.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshHandler.java [moved from opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/SshHandler.java with 83% similarity]
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshSession.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/VirtualSocket.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/VirtualSocketException.java [deleted file]
opendaylight/netconf/netconf-ssh/pom.xml
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/NetconfSSHServer.java
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/EchoClient.java
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/EchoClientHandler.java
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java

index 3dfa6e2bc756419b18f3224b32b127255e00d35d..5036399828a539e5ec5deb451881f3e4e9218b68 100644 (file)
@@ -10,15 +10,20 @@ package org.opendaylight.controller.config.threadpool.util;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 
+import com.google.common.base.Optional;
+
 /**
  * Implementation of {@link ThreadPool} using flexible number of threads wraps
  * {@link ExecutorService}.
@@ -28,12 +33,33 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
 
     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
             ThreadFactory threadFactory) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
+    }
+
+    public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+            ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+    }
+
+    private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+            ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
 
         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
-                new SynchronousQueue<Runnable>(), threadFactory);
+                queue, threadFactory, new FlexibleRejectionHandler());
         executor.prestartAllCoreThreads();
     }
 
+    /**
+     * Overriding the queue:
+     * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+     * occurs in RejectedExecutionHandler.
+     * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+     */
+    private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
+        final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
+        return new ForwardingBlockingQueue(delegate);
+    }
+
     @Override
     public ExecutorService getExecutor() {
         return Executors.unconfigurableExecutorService(executor);
@@ -77,4 +103,37 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
         executor.shutdown();
     }
 
+    /**
+     * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+     */
+    private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+        @Override
+        public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+            try {
+                executor.getQueue().put(r);
+            } catch (InterruptedException e) {
+                throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+            }
+        }
+    }
+
+    private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+        private final BlockingQueue<Runnable> delegate;
+
+        public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected BlockingQueue<Runnable> delegate() {
+            return delegate;
+        }
+
+        @Override
+        public boolean offer(final Runnable r) {
+            // ThreadPoolExecutor will spawn a new thread after core size is reached only
+            // if the queue.offer returns false.
+            return false;
+        }
+    }
 }
index 94639d43c0248787c736a2d7df7738597a4ef975..d6abe168fbedfa9a7d2db6fc16d1151e621ca874 100644 (file)
@@ -17,6 +17,7 @@
 */
 package org.opendaylight.controller.config.yang.threadpool.impl.flexible;
 
+import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.config.api.JmxAttributeValidationException;
@@ -50,11 +51,15 @@ public final class FlexibleThreadPoolModule extends org.opendaylight.controller.
         JmxAttributeValidationException.checkNotNull(getMaxThreadCount(), maxThreadCountJmxAttribute);
         JmxAttributeValidationException.checkCondition(getMaxThreadCount() > 0, "must be greater than zero",
                 maxThreadCountJmxAttribute);
+
+        if(getQueueCapacity() != null) {
+            JmxAttributeValidationException.checkCondition(getQueueCapacity() > 0, "Queue capacity cannot be < 1", queueCapacityJmxAttribute);
+        }
     }
 
     @Override
     public java.lang.AutoCloseable createInstance() {
         return new FlexibleThreadPoolWrapper(getMinThreadCount(), getMaxThreadCount(), getKeepAliveMillis(),
-                TimeUnit.MILLISECONDS, getThreadFactoryDependency());
+                TimeUnit.MILLISECONDS, getThreadFactoryDependency(), Optional.fromNullable(getQueueCapacity()));
     }
 }
index be275ef4870b3797e5195d61380a8c5935f3ff83..c124f6388fb2f5f41b1c48a86f55231ea4c023b4 100644 (file)
@@ -46,6 +46,12 @@ module threadpool-impl-flexible {
                 type uint32;
             }
 
+            leaf queueCapacity {
+                type uint16;
+                mandatory false;
+                description "Capacity of queue that holds waiting tasks";
+            }
+
             container threadFactory {
                 uses config:service-ref {
                     refine type {
index 8fedbe4d4c1c1b35d7404fa9f5c3daf4ce3997f0..f81a332ab6e8a2832b62e0e9a7ff164b1488ab46 100644 (file)
@@ -12,7 +12,7 @@
     <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
       <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
 
-        <!-- Netconf dispatcher to be used by all netconf-connectors --> 
+        <!-- Netconf dispatcher to be used by all netconf-connectors -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher">prefix:netconf-client-dispatcher</type>
           <name>global-netconf-dispatcher</name>
           </timer>
         </module>
 
-        <!-- Thread factory to be used by all threadpools in netconf-connectors --> 
+        <!-- Thread factory to be used by all threadpools in netconf-connectors -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
           <name>global-netconf-processing-executor-threadfactory</name>
           <name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
-        </module> 
-        <!-- Flexible threadpool for all netconf connectors, Max thread count is set to 4 --> 
+        </module>
+        <!-- flexible threadpool for all netconf connectors, Max thread count is set to 4.  -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">prefix:threadpool-flexible</type>
           <name>global-netconf-processing-executor</name>
           <minThreadCount xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">1</minThreadCount>
           <max-thread-count xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">4</max-thread-count>
           <keepAliveMillis xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">600000</keepAliveMillis>
+
           <threadFactory xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
             <name>global-netconf-processing-executor-threadfactory</name>
           </threadFactory>
-        </module>  
+        </module>
       </modules>
 
       <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
index a7b5f3295778c4f758e1c23785f3ef9fef9426bd..cc85d4337b93d5b1a2322af009052b7c26091bd8 100644 (file)
@@ -15,7 +15,32 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 
+/**
+ * A transaction that provides read access to a logical data store.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncReadTransaction}.
+ */
 public interface ReadTransaction extends AsyncReadTransaction<InstanceIdentifier<?>, DataObject> {
-    @Override
-    ListenableFuture<Optional<DataObject>> read(LogicalDatastoreType store, InstanceIdentifier<?> path);
+
+    /**
+     * Reads data from the provided logical data store located at the provided path.
+     *<p>
+     * If the target is a subtree, then the whole subtree is read (and will be
+     * accessible from the returned data object).
+     *
+     * @param store
+     *            Logical data store from which read should occur.
+     * @param path
+     *            Path which uniquely identifies subtree which client want to
+     *            read
+     * @return Listenable Future which contains read result
+     *         <ul>
+     *         <li>If data at supplied path exists the
+     *         {@link ListeblaFuture#get()} returns Optional object containing
+     *         data once read is done.
+     *         <li>If data at supplied path does not exists the
+     *         {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+     *         </ul>
+     */
+    <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store, InstanceIdentifier<T> path);
 }
index 0cef81d35922650357ee4bd7e502b5c79f11a3f8..044a700d84f26b968cb29f6fe3cd6a80e0dbf0ec 100644 (file)
@@ -18,8 +18,46 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
  * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
  */
 public interface WriteTransaction extends AsyncWriteTransaction<InstanceIdentifier<?>, DataObject> {
-    @Override
-    void put(LogicalDatastoreType store, InstanceIdentifier<?> path, DataObject data);
+
+    /**
+     * Stores a piece of data at the specified path. This acts as an add / replace
+     * operation, which is to say that whole subtree will be replaced by the specified data.
+     * <p>
+     * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+     * <p>
+     * If you need to make sure that a parent object exists but you do not want modify
+     * its pre-existing state by using put, consider using {@link #merge} instead.
+     *
+     * @param store
+     *            the logical data store which should be modified
+     * @param path
+     *            the data object path
+     * @param data
+     *            the data object to be written to the specified path
+     * @throws IllegalStateException
+     *             if the transaction has already been submitted
+     */
+    <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data);
+
+    /**
+     * Merges a piece of data with the existing data at a specified path. Any pre-existing data
+     * which is not explicitly overwritten will be preserved. This means that if you store a container,
+     * its child lists will be merged.
+     * <p>
+     * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+     *<p>
+     * If you require an explicit replace operation, use {@link #put} instead.
+     *
+     * @param store
+     *            the logical data store which should be modified
+     * @param path
+     *            the data object path
+     * @param data
+     *            the data object to be merged to the specified path
+     * @throws IllegalStateException
+     *             if the transaction has already been submitted
+     */
+    <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data);
 
     @Override
     void delete(LogicalDatastoreType store, InstanceIdentifier<?> path);
index e5e1e300c17437e52962218f168da911b3f54526..a6d20c5c34cb06457ccaf8727c40980164fdefe4 100644 (file)
@@ -54,8 +54,8 @@ abstract class AbstractForwardedTransaction<T extends AsyncTransaction<InstanceI
         return codec;
     }
 
-    protected final ListenableFuture<Optional<DataObject>> doRead(final DOMDataReadTransaction readTx,
-            final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> path) {
+    protected final <T extends DataObject> ListenableFuture<Optional<T>> doRead(final DOMDataReadTransaction readTx,
+            final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
         return Futures.transform(readTx.read(store, codec.toNormalized(path)), codec.deserializeFunction(path));
     }
 }
index 9eceeb1e436b96fd1e063e8b79e3a96bcaf54c94..a8eef5a3cae2687f6cc552454407e0bb8bf9eb97 100644 (file)
@@ -10,13 +10,12 @@ package org.opendaylight.controller.md.sal.binding.impl;
 import java.util.Collections;
 import java.util.Map.Entry;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.Identifiable;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -24,7 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  *
@@ -122,8 +121,8 @@ public class AbstractWriteTransaction<T extends DOMDataWriteTransaction> extends
         getDelegate().delete(store, normalized);
     }
 
-    protected final ListenableFuture<RpcResult<TransactionStatus>> doCommit() {
-        return getDelegate().commit();
+    protected final CheckedFuture<Void,TransactionCommitFailedException> doSubmit() {
+        return getDelegate().submit();
     }
 
     protected final boolean doCancel() {
index e71404dfa454fb018febe0d0876cf832673abec5..bb942047f23d807aa7b000dd4bcd03862a9f7683 100644 (file)
@@ -25,8 +25,8 @@ class BindingDataReadTransactionImpl extends AbstractForwardedTransaction<DOMDat
     }
 
     @Override
-    public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
-            final InstanceIdentifier<?> path) {
+    public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
+            final InstanceIdentifier<T> path) {
         return doRead(getDelegate(),store, path);
     }
 
index 5a89cc70b80982662c7750e57c3b7485b802c372..c8b9d9347a2c79e099f90f72b372482dfdc20c10 100644 (file)
@@ -25,8 +25,8 @@ class BindingDataReadWriteTransactionImpl extends
     }
 
     @Override
-    public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
-            final InstanceIdentifier<?> path) {
+    public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
+            final InstanceIdentifier<T> path) {
         return doRead(getDelegate(), store, path);
     }
 }
\ No newline at end of file
index a62319be22cc881612922a18f7de2e149d3713c0..29790fb60a1a22209047acd44d1057087b707194 100644 (file)
@@ -10,11 +10,13 @@ package org.opendaylight.controller.md.sal.binding.impl;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
-
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 
 class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
@@ -24,15 +26,15 @@ class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
         super(delegateTx, codec);
     }
 
-
-
     @Override
-    public void put(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+    public <T extends DataObject> void put(final LogicalDatastoreType store, final InstanceIdentifier<T> path,
+                                           final T data) {
         doPut(store, path, data);
     }
 
     @Override
-    public void merge(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+    public <T extends DataObject> void merge(final LogicalDatastoreType store, final InstanceIdentifier<T> path,
+                                             final T data) {
         doMerge(store, path, data);
     }
 
@@ -43,7 +45,12 @@ class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
 
     @Override
     public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-        return doCommit();
+        return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+    }
+
+    @Override
+    public CheckedFuture<Void,TransactionCommitFailedException> submit() {
+        return doSubmit();
     }
 
     @Override
index d275c838f25adacc554898b36db32aa47b7cfaa6..f8c1cf6b99f4dfd7664251db168b0f59eddce557 100644 (file)
@@ -123,7 +123,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
      */
     public Optional<InstanceIdentifier<? extends DataObject>> toBinding(
             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
-            throws DeserializationException {
+                    throws DeserializationException {
 
         PathArgument lastArgument = Iterables.getLast(normalized.getPathArguments());
         // Used instance-identifier codec do not support serialization of last
@@ -140,7 +140,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
 
     private Optional<InstanceIdentifier<? extends DataObject>> toBindingAugmented(
             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
-            throws DeserializationException {
+                    throws DeserializationException {
         Optional<InstanceIdentifier<? extends DataObject>> potential = toBindingImpl(normalized);
         // Shorthand check, if codec already supports deserialization
         // of AugmentationIdentifier we will return
@@ -190,7 +190,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
 
     private Optional<InstanceIdentifier<? extends DataObject>> toBindingImpl(
             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
-            throws DeserializationException {
+                    throws DeserializationException {
         org.opendaylight.yangtools.yang.data.api.InstanceIdentifier legacyPath;
 
         try {
@@ -220,7 +220,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
 
     private DataNormalizationOperation<?> findNormalizationOperation(
             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
-            throws DataNormalizationException {
+                    throws DataNormalizationException {
         DataNormalizationOperation<?> current = legacyToNormalized.getRootOperation();
         for (PathArgument arg : normalized.getPathArguments()) {
             current = current.getChild(arg);
@@ -264,7 +264,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
 
     public Optional<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
             final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
-            throws DeserializationException {
+                    throws DeserializationException {
         Optional<InstanceIdentifier<? extends DataObject>> potentialPath = toBinding(normalized.getKey());
         if (potentialPath.isPresent()) {
             InstanceIdentifier<? extends DataObject> bindingPath = potentialPath.get();
@@ -378,7 +378,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
         return Optional.absent();
     }
 
-    private Optional<AugmentationSchema> findAugmentation(final Class targetType,
+    private Optional<AugmentationSchema> findAugmentation(final Class<?> targetType,
             final Set<AugmentationSchema> augmentations) {
         YangModuleInfo moduleInfo;
         try {
@@ -495,6 +495,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
             if (isAugmentation(arg.getType())) {
                 count++;
             }
+
         }
         return count;
     }
@@ -509,12 +510,12 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
         return count;
     }
 
-    public Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>> deserializeFunction(
-            final InstanceIdentifier<?> path) {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public <T extends DataObject> Function<Optional<NormalizedNode<?, ?>>, Optional<T>>  deserializeFunction(final InstanceIdentifier<T> path) {
         return new DeserializeFunction(this, path);
     }
 
-    private static class DeserializeFunction implements Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>> {
+    private static class DeserializeFunction<T extends DataObject> implements Function<Optional<NormalizedNode<?, ?>>, Optional<T>> {
 
         private final BindingToNormalizedNodeCodec codec;
         private final InstanceIdentifier<?> path;
@@ -525,9 +526,10 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
             this.path = Preconditions.checkNotNull(path, "Path must not be null");
         }
 
+        @SuppressWarnings("rawtypes")
         @Nullable
         @Override
-        public Optional<DataObject> apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
+        public Optional apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
             if (normalizedNode.isPresent()) {
                 final DataObject dataObject;
                 try {
@@ -548,8 +550,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
     /**
      * Returns an default object according to YANG schema for supplied path.
      *
-     * @param path
-     *            DOM Path
+     * @param path DOM Path
      * @return Node with defaults set on.
      */
     public NormalizedNode<?, ?> getDefaultNodeFor(final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path) {
index b45450ae3a54554243fe6a3c5097f362fb82779b..12f26b09bb2cf1645d645438302e705c3c0489ce 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.Data
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
@@ -160,7 +161,7 @@ public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDat
             @Override
             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
                 if(requestCommitSuccess) {
-                    return tx.getDelegate().commit();
+                    return AbstractDataTransaction.convertToLegacyCommitFuture(tx.getDelegate().submit());
                 }
                 return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
             }
index 952d84d885c41e73fe2312bf22c723f62cc44b35..c61ec4926a65f58f31fd03657162b0af70b23c0e 100644 (file)
@@ -9,6 +9,10 @@ package org.opendaylight.controller.sal.binding.impl;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 import java.util.EventListener;
 import java.util.HashMap;
 import java.util.Map;
@@ -37,14 +41,21 @@ import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RpcProviderRegistryImpl implements //
-        RpcProviderRegistry, //
-        RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
+public class RpcProviderRegistryImpl implements RpcProviderRegistry, RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
 
     private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
 
-    // publicProxies is a cache of proxy objects where each value in the map corresponds to a specific RpcService
-    private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
+    // cache of proxy objects where each value in the map corresponds to a specific RpcService
+    private final LoadingCache<Class<? extends RpcService>, RpcService> publicProxies = CacheBuilder.newBuilder().weakKeys().
+            build(new CacheLoader<Class<? extends RpcService>, RpcService>() {
+                @Override
+                public RpcService load(final Class<? extends RpcService> type) {
+                    final RpcService proxy = rpcFactory.getDirectProxyFor(type);
+                    LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
+                    return proxy;
+                }
+            });
+
     private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
     private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
             .create();
@@ -93,26 +104,7 @@ public class RpcProviderRegistryImpl implements //
     @SuppressWarnings("unchecked")
     @Override
     public final <T extends RpcService> T getRpcService(final Class<T> type) {
-
-        T potentialProxy = (T) publicProxies.get(type);
-        if (potentialProxy != null) {
-            return potentialProxy;
-        }
-        synchronized (this) {
-            /**
-             * Potential proxy could be instantiated by other thread while we
-             * were waiting for the lock.
-             */
-
-            potentialProxy = (T) publicProxies.get(type);
-            if (potentialProxy != null) {
-                return potentialProxy;
-            }
-            T proxy = rpcFactory.getDirectProxyFor(type);
-            LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
-            publicProxies.put(type, proxy);
-            return proxy;
-        }
+        return (T) publicProxies.getUnchecked(type);
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -205,8 +197,7 @@ public class RpcProviderRegistryImpl implements //
 
     }
 
-    private class RouteChangeForwarder<T extends RpcService> implements
-            RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+    private class RouteChangeForwarder<T extends RpcService> implements RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
 
         private final Class<T> type;
 
@@ -240,8 +231,7 @@ public class RpcProviderRegistryImpl implements //
         }
     }
 
-    public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements
-            RpcRegistration<T> {
+    public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
 
         private final Class<T> serviceType;
         private RpcProviderRegistryImpl registry;
index ee0628308e627dcde8ca0c960dd1c13b3378a32b..743430938735624d1640800ee44059b02f40ab3d 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
@@ -14,6 +15,10 @@ import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMapping
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * @deprecated This is part of the legacy DataBrokerService
+ */
+@Deprecated
 class BindingToDomCommitHandler implements
     DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
 
@@ -34,7 +39,7 @@ class BindingToDomCommitHandler implements
         this.biDataService = biDataService;
     }
 
-    public void setMappingService(BindingIndependentMappingService mappingService) {
+    public void setMappingService(final BindingIndependentMappingService mappingService) {
         this.mappingService = mappingService;
     }
 
index 395af8f4870d1a485bddc594641ef4095b188360..88024b9af2ce9e478352e2feba43baa8dfa5557b 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+
 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
@@ -16,6 +17,10 @@ import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * @deprecated This is part of the legacy DataBrokerService
+ */
+@Deprecated
 class DomToBindingCommitHandler implements //
     RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
     DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
@@ -34,11 +39,11 @@ class DomToBindingCommitHandler implements //
     private DataProviderService baDataService;
     private BindingIndependentMappingService mappingService;
 
-    public void setBindingAwareDataService(DataProviderService baDataService) {
+    public void setBindingAwareDataService(final DataProviderService baDataService) {
         this.baDataService = baDataService;
     }
 
-    public void setMappingService(BindingIndependentMappingService mappingService) {
+    public void setMappingService(final BindingIndependentMappingService mappingService) {
         this.mappingService = mappingService;
     }
 
index 9f1ae3845749e13e2adeb216d516a182379f4494..fb115350612cfe894e2cf85a29c6af2f0c53741c 100644 (file)
@@ -49,13 +49,13 @@ public class Bug1125RegressionTest extends AbstractDataChangeListenerTest {
                                 "foo").build()).build();
         initialTx.put(LogicalDatastoreType.OPERATIONAL, path(TOP_FOO_KEY),
                 topLevelList(TOP_FOO_KEY, fooAugment));
-        assertCommit(initialTx.commit());
+        assertCommit(initialTx.submit());
     }
 
     private void delete(final InstanceIdentifier<?> path) {
         WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
         tx.delete(LogicalDatastoreType.OPERATIONAL, path);
-        assertCommit(tx.commit());
+        assertCommit(tx.submit());
     }
 
     private void verifyRemoved(
index 05bc857969005bfab5a35f9627970a1434eb1bc6..54493300e342df6d2f3f2428a53f597677089948 100644 (file)
@@ -48,7 +48,7 @@ public class ListInsertionDataChangeListenerTest extends AbstractDataChangeListe
     protected void setupWithDataBroker(final DataBroker dataBroker) {
         WriteTransaction initialTx = dataBroker.newWriteOnlyTransaction();
         initialTx.put(CONFIGURATION, TOP, top(topLevelList(TOP_FOO_KEY)));
-        assertCommit(initialTx.commit());
+        assertCommit(initialTx.submit());
     }
 
     @Test
@@ -60,7 +60,7 @@ public class ListInsertionDataChangeListenerTest extends AbstractDataChangeListe
 
         ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
         writeTx.put(CONFIGURATION, TOP, top(topLevelList(TOP_BAR_KEY)));
-        assertCommit(writeTx.commit());
+        assertCommit(writeTx.submit());
         AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> top = topListener.event();
         AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> all = allListener.event();
         AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> foo = fooListener.event();
@@ -115,7 +115,7 @@ public class ListInsertionDataChangeListenerTest extends AbstractDataChangeListe
 
         ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
         writeTx.merge(CONFIGURATION, TOP, top(topLevelList(TOP_BAR_KEY)));
-        assertCommit(writeTx.commit());
+        assertCommit(writeTx.submit());
 
         verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
     }
@@ -129,7 +129,7 @@ public class ListInsertionDataChangeListenerTest extends AbstractDataChangeListe
 
         ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
         writeTx.put(CONFIGURATION, TOP_BAR, topLevelList(TOP_BAR_KEY));
-        assertCommit(writeTx.commit());
+        assertCommit(writeTx.submit());
 
         verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
     }
@@ -143,7 +143,7 @@ public class ListInsertionDataChangeListenerTest extends AbstractDataChangeListe
 
         ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
         writeTx.merge(CONFIGURATION, TOP_BAR, topLevelList(TOP_BAR_KEY));
-        assertCommit(writeTx.commit());
+        assertCommit(writeTx.submit());
 
         verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
     }
index 43e951423c0a2bfdecef87969abd01c51816d28e..b577e2af1f2c845586615985fa77ec1169abd291 100644 (file)
@@ -7,14 +7,11 @@
  */
 package org.opendaylight.controller.md.sal.binding.impl.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.concurrent.ExecutionException;
 
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.TopBuilder;
@@ -35,7 +32,7 @@ public class WriteTransactionTest extends AbstractDataBrokerTest {
         WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
         writeTx.put(LogicalDatastoreType.OPERATIONAL, TOP_PATH, new TopBuilder().build());
         writeTx.put(LogicalDatastoreType.OPERATIONAL, NODE_PATH, new TopLevelListBuilder().setKey(TOP_LIST_KEY).build());
-        assertEquals(TransactionStatus.COMMITED, writeTx.commit().get().getResult());
+        writeTx.submit().get();
     }
 
 }
index 7f23ac26b6edc521557a412e25c2bca1a500ff71..5789270dee4e60953e5d7e4805bacffcbb96b6e4 100644 (file)
@@ -7,16 +7,12 @@
  */
 package org.opendaylight.controller.md.sal.binding.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -53,9 +49,9 @@ public class AbstractDataBrokerTest extends AbstractSchemaAwareTest {
         return domBroker;
     }
 
-    protected static final void assertCommit(final ListenableFuture<RpcResult<TransactionStatus>> commit) {
+    protected static final void assertCommit(final ListenableFuture<Void> commit) {
         try {
-            assertEquals(TransactionStatus.COMMITED,commit.get(500, TimeUnit.MILLISECONDS).getResult());
+            commit.get(500, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             throw new IllegalStateException(e);
         }
index 3df33ba377376182e3485f6506af70d4a5e932aa..49ac59fe95da5071aa7f9b59332ae2aef0b97113 100644 (file)
@@ -11,6 +11,14 @@ import org.opendaylight.controller.md.sal.common.api.data.DataReader;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
+/**
+ *
+ *
+ * @deprecated Use
+ *             {@link org.opendaylight.controller.md.sal.binding.api.ReadTransaction#read(org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType, InstanceIdentifier)}
+ *             instead.
+ */
+@Deprecated
 public final class TypeSafeDataReader {
 
     private final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate;
@@ -19,21 +27,25 @@ public final class TypeSafeDataReader {
         return delegate;
     }
 
-    public TypeSafeDataReader(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
+    public TypeSafeDataReader(
+            final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
         this.delegate = delegate;
     }
 
     @SuppressWarnings("unchecked")
-    public <D extends DataObject> D readConfigurationData(InstanceIdentifier<D> path) {
+    public <D extends DataObject> D readConfigurationData(
+            final InstanceIdentifier<D> path) {
         return (D) delegate.readConfigurationData(path);
     }
 
     @SuppressWarnings("unchecked")
-    public <D extends DataObject> D readOperationalData(InstanceIdentifier<D> path) {
+    public <D extends DataObject> D readOperationalData(
+            final InstanceIdentifier<D> path) {
         return (D) delegate.readOperationalData(path);
     }
 
-    public static TypeSafeDataReader forReader(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
+    public static TypeSafeDataReader forReader(
+            final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
         return new TypeSafeDataReader(delegate);
     }
 }
index 4beb5c62e335dcd2af46af1493a7ea8180d32f48..46c90b97560d440763d8b66f77ab10f998b6be7f 100644 (file)
@@ -10,12 +10,11 @@ package org.opendaylight.controller.md.sal.common.api.data;
 import org.opendaylight.yangtools.concepts.Path;
 
 /**
- * Read-only transaction, which provides stable view of data
- * and is {@link AutoCloseable} resource.
+ * Marker interface for a read-only view of the data tree.
  *
  * @see AsyncReadTransaction
  *
-* @param <P>
+ * @param <P>
  *            Type of path (subtree identifier), which represents location in
  *            tree
  * @param <D>
index e1cd4a712a0d9804ac1846a8e8bf4ad6f23dc8ca..afa86704ff7de0e17c12b3a15f401f9c88f06232 100644 (file)
@@ -9,12 +9,9 @@ package org.opendaylight.controller.md.sal.common.api.data;
 
 import org.opendaylight.yangtools.concepts.Path;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  *
- * Provides a stateful read view of the data tree.
+ * Marker interface for stateful read view of the data tree.
  *
  * <p>
  * View of the data tree is a stable point-in-time snapshot of the current data tree state when
@@ -47,9 +44,9 @@ import com.google.common.util.concurrent.ListenableFuture;
  * <p>
  * <b>Note:</b> example contains blocking calls on future only to illustrate
  * that action happened after other asynchronous action. Use of blocking call
- * {@link ListenableFuture#get()} is discouraged for most uses and you should
- * use
- * {@link com.google.common.util.concurrent.Futures#addCallback(ListenableFuture, com.google.common.util.concurrent.FutureCallback)}
+ * {@link com.google.common.util.concurrent.ListenableFuture#get()} is discouraged for most
+ * uses and you should use
+ * {@link com.google.common.util.concurrent.Futures#addCallback(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.FutureCallback)}
  * or other functions from {@link com.google.common.util.concurrent.Futures} to
  * register more specific listeners.
  *
@@ -58,30 +55,10 @@ import com.google.common.util.concurrent.ListenableFuture;
  *            tree
  * @param <D>
  *            Type of data (payload), which represents data payload
+ *
+ * @see org.opendaylight.controller.md.sal.binding.api.ReadTransaction
+ * @see org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction
  */
 public interface AsyncReadTransaction<P extends Path<P>, D> extends AsyncTransaction<P, D> {
 
-    /**
-     *
-     * Reads data from provided logical data store located at the provided path.
-     *<p>
-     * If the target is a subtree, then the whole subtree is read (and will be
-     * accessible from the returned data object).
-     *
-     * @param store
-     *            Logical data store from which read should occur.
-     * @param path
-     *            Path which uniquely identifies subtree which client want to
-     *            read
-     * @return Listenable Future which contains read result
-     *         <ul>
-     *         <li>If data at supplied path exists the
-     *         {@link ListeblaFuture#get()} returns Optional object containing
-     *         data once read is done.
-     *         <li>If data at supplied path does not exists the
-     *         {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
-     *         </ul>
-     */
-    ListenableFuture<Optional<D>> read(LogicalDatastoreType store, P path);
-
 }
index f7eae27320107ef6a90696ff73f2cf40b3c99fde..e47b54a0a1e6e138292530994a57b539501fc1bb 100644 (file)
@@ -11,6 +11,7 @@ import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.yangtools.concepts.Path;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 
 /**
@@ -26,15 +27,57 @@ import com.google.common.util.concurrent.ListenableFuture;
  * change for the data tree and it is not visible to any other concurrently running
  * transaction.
  * <p>
- * Applications publish the changes proposed in the transaction by calling {@link #commit}
- * on the transaction. This seals the transaction
+ * Applications make changes to the local data tree in the transaction by via the
+ * <b>put</b>, <b>merge</b>, and <b>delete</b> operations.
+ *
+ * <h2>Put operation</h2>
+ * Stores a piece of data at a specified path. This acts as an add / replace
+ * operation, which is to say that whole subtree will be replaced by the
+ * specified data.
+ * <p>
+ * Performing the following put operations:
+ *
+ * <pre>
+ * 1) container { list [ a ] }
+ * 2) container { list [ b ] }
+ * </pre>
+ *
+ * will result in the following data being present:
+ *
+ * <pre>
+ * container { list [ b ] }
+ * </pre>
+ * <h2>Merge operation</h2>
+ * Merges a piece of data with the existing data at a specified path. Any pre-existing data
+ * which is not explicitly overwritten will be preserved. This means that if you store a container,
+ * its child lists will be merged.
+ * <p>
+ * Performing the following merge operations:
+ *
+ * <pre>
+ * 1) container { list [ a ] }
+ * 2) container { list [ b ] }
+ * </pre>
+ *
+ * will result in the following data being present:
+ *
+ * <pre>
+ * container { list [ a, b ] }
+ * </pre>
+ *
+ * This also means that storing the container will preserve any
+ * augmentations which have been attached to it.
+ *
+ * <h2>Delete operation</h2>
+ * Removes a piece of data from a specified path.
+ * <p>
+ * After applying changes to the local data tree, applications publish the changes proposed in the
+ * transaction by calling {@link #submit} on the transaction. This seals the transaction
  * (preventing any further writes using this transaction) and submits it to be
  * processed and applied to global conceptual data tree.
  * <p>
  * The transaction commit may fail due to a concurrent transaction modifying and committing data in
- * an incompatible way. See {@link #commit()} for more concrete commit failure examples.
- *
- *
+ * an incompatible way. See {@link #submit} for more concrete commit failure examples.
  * <p>
  * <b>Implementation Note:</b> This interface is not intended to be implemented
  * by users of MD-SAL, but only to be consumed by them.
@@ -56,7 +99,7 @@ public interface AsyncWriteTransaction<P extends Path<P>, D> extends AsyncTransa
      * {@link TransactionStatus#CANCELED} will have no effect, and transaction
      * is considered cancelled.
      *
-     * Invoking cancel() on finished transaction  (future returned by {@link #commit()}
+     * Invoking cancel() on finished transaction  (future returned by {@link #submit()}
      * already completed with {@link TransactionStatus#COMMITED}) will always
      * fail (return false).
      *
@@ -65,77 +108,10 @@ public interface AsyncWriteTransaction<P extends Path<P>, D> extends AsyncTransa
      * <tt>true</tt> otherwise
      *
      */
-    public boolean cancel();
-
-    /**
-     * Store a piece of data at specified path. This acts as an add / replace
-     * operation, which is to say that whole subtree will be replaced by
-     * specified path. Performing the following put operations:
-     *
-     * <pre>
-     * 1) container { list [ a ] }
-     * 2) container { list [ b ] }
-     * </pre>
-     *
-     * will result in the following data being present:
-     *
-     * <pre>
-     * container { list [ b ] }
-     * </pre>
-     *
-     *
-     * If you need to make sure that a parent object exists, but you do not want modify
-     * its preexisting state by using put, consider using
-     * {@link #merge(LogicalDatastoreType, Path, Object)}
-     *
-     * @param store
-     *            Logical data store which should be modified
-     * @param path
-     *            Data object path
-     * @param data
-     *            Data object to be written to specified path
-     * @throws IllegalStateException
-     *             if the transaction is no longer {@link TransactionStatus#NEW}
-     */
-    public void put(LogicalDatastoreType store, P path, D data);
-
-    /**
-     * Store a piece of data at the specified path. This acts as a merge operation,
-     * which is to say that any pre-existing data which is not explicitly
-     * overwritten will be preserved. This means that if you store a container,
-     * its child lists will be merged. Performing the following merge
-     * operations:
-     *
-     * <pre>
-     * 1) container { list [ a ] }
-     * 2) container { list [ b ] }
-     * </pre>
-     *
-     * will result in the following data being present:
-     *
-     * <pre>
-     * container { list [ a, b ] }
-     * </pre>
-     *
-     * This also means that storing the container will preserve any
-     * augmentations which have been attached to it.
-     *<p>
-     * If you require an explicit replace operation, use
-     * {@link #put(LogicalDatastoreType, Path, Object)} instead.
-     *
-     * @param store
-     *            Logical data store which should be modified
-     * @param path
-     *            Data object path
-     * @param data
-     *            Data object to be written to specified path
-     * @throws IllegalStateException
-     *             if the transaction is no longer {@link TransactionStatus#NEW}
-     */
-    public void merge(LogicalDatastoreType store, P path, D data);
+    boolean cancel();
 
     /**
-     * Remove a piece of data from specified path. This operation does not fail
+     * Removes a piece of data from specified path. This operation does not fail
      * if the specified path does not exist.
      *
      * @param store
@@ -145,10 +121,14 @@ public interface AsyncWriteTransaction<P extends Path<P>, D> extends AsyncTransa
      * @throws IllegalStateException
      *             if the transaction is no longer {@link TransactionStatus#NEW}
      */
-    public void delete(LogicalDatastoreType store, P path);
+    void delete(LogicalDatastoreType store, P path);
 
     /**
-     * Submits transaction to be applied to update logical data tree.
+     * Submits this transaction to be asynchronously applied to update the logical data tree.
+     * The returned CheckedFuture conveys the result of applying the data changes.
+     * <p>
+     * <b>Note:</b> It is strongly recommended to process the CheckedFuture result in an asynchronous
+     * manner rather than using the blocking get() method. See example usage below.
      * <p>
      * This call logically seals the transaction, which prevents the client from
      * further changing data tree using this transaction. Any subsequent calls to
@@ -158,31 +138,65 @@ public interface AsyncWriteTransaction<P extends Path<P>, D> extends AsyncTransa
      * {@link IllegalStateException}.
      *
      * The transaction is marked as {@link TransactionStatus#SUBMITED} and
-     * enqueued into the data store backed for processing.
+     * enqueued into the data store back-end for processing.
      *
      * <p>
      * Whether or not the commit is successful is determined by versioning
-     * of data tree and validation of registered commit participants
-     * {@link AsyncConfigurationCommitHandler}
-     * if transaction changes {@link LogicalDatastoreType#CONFIGURATION} data tree.
-     *<p>
-     * The effects of successful commit of data depends on
-     * other data change listeners {@link AsyncDataChangeListener} and
-     * {@link AsyncConfigurationCommitHandler}, which was registered to the
-     * same {@link AsyncDataBroker}, to which this transaction belongs.
-     *
+     * of the data tree and validation of registered commit participants
+     * ({@link AsyncConfigurationCommitHandler})
+     * if the transaction changes the data tree.
+     * <p>
+     * The effects of a successful commit of data depends on data change listeners
+     * ({@link AsyncDataChangeListener}) and commit participants
+     * ({@link AsyncConfigurationCommitHandler}) that are registered with the data broker.
+     * <p>
+     * <h3>Example usage:</h3>
+     * <pre>
+     *  private void doWrite( final int tries ) {
+     *      WriteTransaction writeTx = dataBroker.newWriteOnlyTransaction();
+     *
+     *      MyDataObject data = ...;
+     *      InstanceIdentifier<MyDataObject> path = ...;
+     *      writeTx.put( LogicalDatastoreType.OPERATIONAL, path, data );
+     *
+     *      Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
+     *          public void onSuccess( Void result ) {
+     *              // succeeded
+     *          }
+     *
+     *          public void onFailure( Throwable t ) {
+     *              if( t instanceof OptimisticLockFailedException ) {
+     *                  if( ( tries - 1 ) > 0 ) {
+     *                      // do retry
+     *                      doWrite( tries - 1 );
+     *                  } else {
+     *                      // out of retries
+     *                  }
+     *              } else {
+     *                  // failed due to another type of TransactionCommitFailedException.
+     *              }
+     *          } );
+     * }
+     * ...
+     * doWrite( 2 );
+     * </pre>
      * <h2>Failure scenarios</h2>
      * <p>
      * Transaction may fail because of multiple reasons, such as
      * <ul>
-     * <li>Another transaction finished earlier and modified the same node in
-     * non-compatible way (see below). In this case the returned future will fail with
+     * <li>Another transaction finished earlier and modified the same node in a
+     * non-compatible way (see below). In this case the returned future will fail with an
      * {@link OptimisticLockFailedException}. It is the responsibility of the
      * caller to create a new transaction and submit the same modification again in
-     * order to update data tree.</li>
+     * order to update data tree. <i><b>Warning</b>: In most cases, retrying after an
+     * OptimisticLockFailedException will result in a high probability of success.
+     * However, there are scenarios, albeit unusual, where any number of retries will
+     * not succeed. Therefore it is strongly recommended to limit the number of retries (2 or 3)
+     * to avoid an endless loop.</i>
+     * </li>
      * <li>Data change introduced by this transaction did not pass validation by
      * commit handlers or data was incorrectly structured. Returned future will
-     * fail with {@link DataValidationFailedException}. User should not retry to
+     * fail with {@link DataValidationFailedException}. User should not retry to
      * create new transaction with same data, since it probably will fail again.
      * </li>
      * </ul>
@@ -273,8 +287,8 @@ public interface AsyncWriteTransaction<P extends Path<P>, D> extends AsyncTransa
      * txA.put(CONFIGURATION, PATH, A);    // writes to PATH value A
      * txB.put(CONFIGURATION, PATH, B)     // writes to PATH value B
      *
-     * ListenableFuture futureA = txA.commit(); // transaction A is sealed and committed
-     * ListenebleFuture futureB = txB.commit(); // transaction B is sealed and committed
+     * ListenableFuture futureA = txA.submit(); // transaction A is sealed and submitted
+     * ListenebleFuture futureB = txB.submit(); // transaction B is sealed and submitted
      * </pre>
      *
      * Commit of transaction A will be processed asynchronously and data tree
@@ -288,19 +302,21 @@ public interface AsyncWriteTransaction<P extends Path<P>, D> extends AsyncTransa
      * with {@link OptimisticLockFailedException} exception, which indicates to
      * client that concurrent transaction prevented the submitted transaction from being
      * applied.
-     *
-     * @return Result of the Commit, containing success information or list of
-     *         encountered errors, if commit was not successful. The Future
-     *         blocks until {@link TransactionStatus#COMMITED} is reached.
-     *         Future will fail with {@link TransactionCommitFailedException} if
-     *         Commit of this transaction failed. TODO: Usability: Consider
-     *         change from ListenableFuture to
-     *         {@link com.google.common.util.concurrent.CheckedFuture} which
-     *         will throw {@link TransactionCommitFailedException}.
+     * <br>
+     * @return a CheckFuture containing the result of the commit. The Future blocks until the
+     *         commit operation is complete. A successful commit returns nothing. On failure,
+     *         the Future will fail with a {@link TransactionCommitFailedException} or an exception
+     *         derived from TransactionCommitFailedException.
      *
      * @throws IllegalStateException
      *             if the transaction is not {@link TransactionStatus#NEW}
      */
-    public ListenableFuture<RpcResult<TransactionStatus>> commit();
+    CheckedFuture<Void,TransactionCommitFailedException> submit();
+
+    /**
+     * @deprecated Use {@link #submit()} instead.
+     */
+    @Deprecated
+    ListenableFuture<RpcResult<TransactionStatus>> commit();
 
 }
index c59c12ec5c3c59314d94094d421ab45c96deb4cb..d48bfc79fe8941935bbfa2b762d254d93f1da2d0 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.md.sal.common.api.data;
 
 import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 
 import com.google.common.base.Preconditions;
 
@@ -31,14 +33,17 @@ public class DataValidationFailedException extends TransactionCommitFailedExcept
 
     private Class<? extends Path<?>> pathType;
 
-    public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path, final String message, final Throwable cause) {
-        super(message, cause);
+    public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,
+                                                             final String message, final Throwable cause) {
+        super(message, cause, RpcResultBuilder.newError(ErrorType.APPLICATION, "invalid-value", message, null,
+                                                        path != null ? path.toString() : null, cause));
         this.pathType = Preconditions.checkNotNull(pathType, "path type must not be null");
         this.path = Preconditions.checkNotNull(path,"path must not be null.");
     }
 
-    public  <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,final String message) {
-        this(pathType,path,message,null);
+    public  <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,
+                                                              final String message) {
+        this(pathType, path, message, null);
     }
 
     public final Path<?> getPath() {
index 222289ab6cb967396478a67869dbb11cf6689903..5ddec6b1edfcec9c76425d8587bc989abf3d897f 100644 (file)
@@ -1,5 +1,8 @@
 package org.opendaylight.controller.md.sal.common.api.data;
 
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+
 /**
 *
 * Failure of asynchronous transaction commit caused by failure
@@ -18,17 +21,13 @@ public class OptimisticLockFailedException extends TransactionCommitFailedExcept
 
     private static final long serialVersionUID = 1L;
 
-    protected OptimisticLockFailedException(final String message, final Throwable cause, final boolean enableSuppression,
-            final boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-
     public OptimisticLockFailedException(final String message, final Throwable cause) {
-        super(message, cause);
+        super(message, cause, RpcResultBuilder.newError(ErrorType.APPLICATION, "resource-denied",
+                                                        message, null, null, cause));
     }
 
     public OptimisticLockFailedException(final String message) {
-        super(message);
+        this(message, null);
     }
 
 }
index f3c2e1093cfbdacea1b2019f5df64f64ac54aeb9..18a857e1d5c4bd0816674a6af99070c2a9dc1e76 100644 (file)
@@ -7,6 +7,15 @@
  */
 package org.opendaylight.controller.md.sal.common.api.data;
 
+import java.util.Arrays;
+import java.util.List;
+
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+
+import com.google.common.collect.ImmutableList;
+
 /**
  *
  * Failed commit of asynchronous transaction
@@ -17,18 +26,39 @@ package org.opendaylight.controller.md.sal.common.api.data;
  */
 public class TransactionCommitFailedException extends Exception {
 
-    private static final long serialVersionUID = -6138306275373237068L;
+    private static final long serialVersionUID = 1L;
 
-    protected TransactionCommitFailedException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
+    private final List<RpcError> errorList;
+
+    public TransactionCommitFailedException(final String message, final RpcError... errors) {
+        this(message, null, errors);
     }
 
-    public TransactionCommitFailedException(final String message, final Throwable cause) {
+    public TransactionCommitFailedException(final String message, final Throwable cause,
+                                            final RpcError... errors) {
         super(message, cause);
+
+        if( errors != null && errors.length > 0 ) {
+            errorList = ImmutableList.<RpcError>builder().addAll( Arrays.asList( errors ) ).build();
+        }
+        else {
+            // Add a default RpcError.
+            errorList = ImmutableList.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null,
+                    getMessage(), null, null, getCause()));
+        }
     }
 
-    public TransactionCommitFailedException(final String message) {
-        super(message);
+    /**
+     * Returns additional error information about this exception.
+     *
+     * @return a List of RpcErrors. There is always at least one RpcError.
+     */
+    public List<RpcError> getErrorList() {
+        return errorList;
     }
 
+    @Override
+    public String getMessage() {
+        return new StringBuilder( super.getMessage() ).append(", errors: ").append( errorList ).toString();
+    }
 }
index b030e6cb5f84b8da49d1780ab51ec2e7ab425ede..d544c4b3710b06a12dac1bba3b11b5e13ced4f89 100644 (file)
@@ -11,13 +11,19 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification;
 import org.opendaylight.yangtools.concepts.Path;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public abstract class AbstractDataTransaction<P extends Path<P>, D extends Object> extends
         AbstractDataModification<P, D> {
@@ -83,18 +89,23 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D extends Objec
 
     @Override
     public boolean equals(Object obj) {
-        if (this == obj)
+        if (this == obj) {
             return true;
-        if (obj == null)
+        }
+        if (obj == null) {
             return false;
-        if (getClass() != obj.getClass())
+        }
+        if (getClass() != obj.getClass()) {
             return false;
+        }
         AbstractDataTransaction<?, ?> other = (AbstractDataTransaction<?, ?>) obj;
         if (identifier == null) {
-            if (other.identifier != null)
+            if (other.identifier != null) {
                 return false;
-        } else if (!identifier.equals(other.identifier))
+            }
+        } else if (!identifier.equals(other.identifier)) {
             return false;
+        }
         return true;
     }
 
@@ -122,4 +133,15 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D extends Objec
         this.status = status;
         this.onStatusChange(status);
     }
+
+    public static ListenableFuture<RpcResult<TransactionStatus>> convertToLegacyCommitFuture(
+                                        CheckedFuture<Void,TransactionCommitFailedException> from ) {
+        return Futures.transform(from, new AsyncFunction<Void, RpcResult<TransactionStatus>>() {
+            @Override
+            public ListenableFuture<RpcResult<TransactionStatus>> apply(Void input) throws Exception {
+                return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>
+                                                              success(TransactionStatus.COMMITED).build());
+            }
+        } );
+    }
 }
index 5baa5e72d31e8fe0bbd8444c9d86444ca79e594c..0d5e47faee198b95f453a07c99bcfaa3181ded3d 100644 (file)
@@ -8,9 +8,39 @@
 package org.opendaylight.controller.md.sal.dom.api;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A transaction that provides read access to a logical data store.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncReadTransaction}.
+ */
 public interface DOMDataReadTransaction extends AsyncReadTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
 
+    /**
+     * Reads data from provided logical data store located at the provided path.
+     *<p>
+     * If the target is a subtree, then the whole subtree is read (and will be
+     * accessible from the returned data object).
+     *
+     * @param store
+     *            Logical data store from which read should occur.
+     * @param path
+     *            Path which uniquely identifies subtree which client want to
+     *            read
+     * @return Listenable Future which contains read result
+     *         <ul>
+     *         <li>If data at supplied path exists the
+     *         {@link ListeblaFuture#get()} returns Optional object containing
+     *         data once read is done.
+     *         <li>If data at supplied path does not exists the
+     *         {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+     *         </ul>
+     */
+    ListenableFuture<Optional<NormalizedNode<?,?>>> read(LogicalDatastoreType store,InstanceIdentifier path);
 }
index 9415973de5af3e0ec49d144ab3d5e541948580ac..6a8977154c792084bf53e020ffd94586ccb6017d 100644 (file)
@@ -8,9 +8,54 @@
 package org.opendaylight.controller.md.sal.dom.api;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+/**
+ * A transaction that provides mutation capabilities on a data tree.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+ */
 public interface DOMDataWriteTransaction extends AsyncWriteTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
 
+    /**
+     * Stores a piece of data at the specified path. This acts as an add / replace
+     * operation, which is to say that whole subtree will be replaced by the specified data.
+     * <p>
+     * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+     * <p>
+     * If you need to make sure that a parent object exists but you do not want modify
+     * its pre-existing state by using put, consider using {@link #merge} instead.
+     *
+     * @param store
+     *            the logical data store which should be modified
+     * @param path
+     *            the data object path
+     * @param data
+     *            the data object to be written to the specified path
+     * @throws IllegalStateException
+     *             if the transaction has already been submitted
+     */
+    void put(LogicalDatastoreType store, InstanceIdentifier path, NormalizedNode<?, ?> data);
+
+    /**
+     * Merges a piece of data with the existing data at a specified path. Any pre-existing data
+     * which is not explicitly overwritten will be preserved. This means that if you store a container,
+     * its child lists will be merged.
+     * <p>
+     * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+     *<p>
+     * If you require an explicit replace operation, use {@link #put} instead.
+     *
+     * @param store
+     *            the logical data store which should be modified
+     * @param path
+     *            the data object path
+     * @param data
+     *            the data object to be merged to the specified path
+     * @throws IllegalStateException
+     *             if the transaction has already been submitted
+     */
+    void merge(LogicalDatastoreType store, InstanceIdentifier path, NormalizedNode<?, ?> data);
 }
index 5694d0bca9a68fccd5f31557418a8824b35d7a5a..d354cca005974c332c2814c76e696f774b772bc4 100644 (file)
@@ -114,7 +114,7 @@ public abstract class AbstractDOMForwardedTransactionFactory<T extends DOMStoreT
      * <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
      * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
      * and then invoking finalized implementation callback
-     * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+     * {@link #submit(DOMDataWriteTransaction, Iterable)} with transaction which
      * was commited and gathered results.
      * </ul>
      *
@@ -167,7 +167,7 @@ public abstract class AbstractDOMForwardedTransactionFactory<T extends DOMStoreT
      * <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
      * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
      * and then invoking finalized implementation callback
-     * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+     * {@link #submit(DOMDataWriteTransaction, Iterable)} with transaction which
      * was commited and gathered results.
      * <li>
      * </ul>
index 7e37a1e3a3467837b16963a8026236738535f599..7731646a57d874348f0192be357a16f8de040212 100644 (file)
@@ -12,9 +12,9 @@ import static com.google.common.base.Preconditions.checkState;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
@@ -23,14 +23,13 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
 public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DOMStore> implements DOMDataBroker,
@@ -78,7 +77,7 @@ public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DO
     }
 
     @Override
-    public ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+    public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
         return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
index b9f096aafc251400b01ee6b7319395cc8ec739fd..227693ca4df5015f79d8f88cf02e666be7f25e39 100644 (file)
@@ -10,21 +10,20 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  * NormalizedNode implementation of {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChain} which is backed
@@ -73,7 +72,7 @@ public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTrans
     }
 
     @Override
-    public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit(
+    public synchronized CheckedFuture<Void,TransactionCommitFailedException> submit(
             final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
     }
index 13a0093d340b7ef3bc496cf6950d1118ad309ab5..8b9eb445fd45b4d19f5a3f6523a7ce2a711ef310 100644 (file)
@@ -12,12 +12,9 @@ import java.util.concurrent.ExecutionException;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,18 +79,19 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
     }
 
     @Override
-    public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
+    public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
         Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
-        ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
+        ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
                 transaction, cohorts, listener));
         if (listener.isPresent()) {
             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
         }
-        return commitFuture;
+
+        return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
     }
 
     /**
@@ -139,7 +137,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
      * support of cancelation.
      *
      */
-    private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
+    private static class CommitCoordinationTask implements Callable<Void> {
 
         private final DOMDataWriteTransaction tx;
         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
@@ -156,12 +154,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         }
 
         @Override
-        public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
+        public Void call() throws TransactionCommitFailedException {
 
             try {
                 canCommitBlocking();
                 preCommitBlocking();
-                return commitBlocking();
+                commitBlocking();
+                return null;
             } catch (TransactionCommitFailedException e) {
                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
                 abortBlocking(e);
@@ -217,9 +216,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          *             If one of cohorts failed preCommit
          *
          */
-        private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
+        private void commitBlocking() throws TransactionCommitFailedException {
             commitAll().checkedGet();
-            return RpcResultBuilder.<TransactionStatus>success(TransactionStatus.COMMITED).build();
         }
 
         /**
index 811d4d88394fa3a51da6facbd1fc00dff6e61cfe..5ce9241dd2943399ddac9a9d3d9cb7d9bb2c6c1d 100644 (file)
@@ -6,10 +6,7 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 
@@ -22,7 +19,7 @@ import com.google.common.util.concurrent.FutureCallback;
  * callback is invoked with associated transaction and throwable is invoked on listener.
  *
  */
-class DOMDataCommitErrorInvoker implements FutureCallback<RpcResult<TransactionStatus>> {
+class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
 
     private final DOMDataWriteTransaction tx;
     private final DOMDataCommitErrorListener listener;
@@ -46,7 +43,7 @@ class DOMDataCommitErrorInvoker implements FutureCallback<RpcResult<TransactionS
     }
 
     @Override
-    public void onSuccess(RpcResult<TransactionStatus> result) {
+    public void onSuccess(Void result) {
         // NOOP
     }
 }
\ No newline at end of file
index 2050d148a8a9638bd562b81d1b2f2a5374f8f4c9..234758ca75413e3381cf5b71fd67c2e2f41e815a 100644 (file)
@@ -7,13 +7,11 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  * Executor of Three Phase Commit coordination for
@@ -40,12 +38,12 @@ interface DOMDataCommitExecutor {
      *            subtransactoins.
      * @param listener
      *            Error listener which should be notified if transaction failed.
-     * @return ListenableFuture which contains RpcResult with
-     *         {@link TransactionStatus#COMMITED} if commit coordination on
-     *         cohorts finished successfully.
+     * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
+     *         nothing is returned from the Future, On failure,
+     *         the Future fails with a {@link TransactionCommitFailedException}.
      *
      */
-    ListenableFuture<RpcResult<TransactionStatus>> submit(DOMDataWriteTransaction tx,
+    CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
             Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
 
 }
index 4906b6e84df9c9b84516c9e958d146f15d9766af..2f2b6e508a198f013f94892ce2c446cc1bb27955 100644 (file)
@@ -7,12 +7,10 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  *
@@ -23,10 +21,10 @@ import com.google.common.util.concurrent.ListenableFuture;
 public interface DOMDataCommitImplementation {
 
     /**
-     * User-supplied implementation of {@link DOMDataWriteTransaction#commit()}
+     * User-supplied implementation of {@link DOMDataWriteTransaction#submit()}
      * for transaction.
      *
-     * Callback invoked when {@link DOMDataWriteTransaction#commit()} is invoked
+     * Callback invoked when {@link DOMDataWriteTransaction#submit()} is invoked
      * on transaction created by this factory.
      *
      * @param transaction
@@ -37,7 +35,7 @@ public interface DOMDataCommitImplementation {
      *            commited transaction.
      *
      */
-    ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+    CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts);
 }
 
index f5b96e27f5724be29a7bf844b508cbe6d51117cb..e1d27e453bd5a2e0652ec5ab70161eded553025b 100644 (file)
@@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
  * <li>{@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
  * </ul>
  * {@link #commit()} will result in invocation of
- * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * {@link DOMDataCommitImplementation#submit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
  * invocation with all {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort} for underlying
  * transactions.
  *
index f791522a2a77214e9a233f966801fac096948618..5bddd763fb396506702ec834f912245e2ec2177a 100644 (file)
@@ -13,6 +13,8 @@ import javax.annotation.concurrent.GuardedBy;
 
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -23,6 +25,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 
 /**
@@ -39,7 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture;
  * </ul>
  * <p>
  * {@link #commit()} will result in invocation of
- * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * {@link DOMDataCommitImplementation#submit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
  * invocation with all {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort} for underlying
  * transactions.
  *
@@ -74,7 +77,7 @@ class DOMForwardedWriteTransaction<T extends DOMStoreWriteTransaction> extends
      *
      */
     @GuardedBy("this")
-    private volatile ListenableFuture<RpcResult<TransactionStatus>> commitFuture;
+    private volatile CheckedFuture<Void, TransactionCommitFailedException> commitFuture;
 
     protected DOMForwardedWriteTransaction(final Object identifier,
             final ImmutableMap<LogicalDatastoreType, T> backingTxs, final DOMDataCommitImplementation commitImpl) {
@@ -119,6 +122,11 @@ class DOMForwardedWriteTransaction<T extends DOMStoreWriteTransaction> extends
 
     @Override
     public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+    }
+
+    @Override
+    public CheckedFuture<Void,TransactionCommitFailedException> submit() {
         checkNotReady();
 
         ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
@@ -126,7 +134,7 @@ class DOMForwardedWriteTransaction<T extends DOMStoreWriteTransaction> extends
             cohortsBuilder.add(subTx.ready());
         }
         ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts = cohortsBuilder.build();
-        commitFuture = commitImpl.commit(this, cohorts);
+        commitFuture = commitImpl.submit(this, cohorts);
 
         /*
          *We remove reference to Commit Implementation in order
@@ -148,5 +156,4 @@ class DOMForwardedWriteTransaction<T extends DOMStoreWriteTransaction> extends
     private void checkNotCommited() {
         checkState(commitFuture == null, "Transaction was already submited.");
     }
-
 }
\ No newline at end of file
index 27e322f23baf265a046d04317f2bfbf78ab2f74c..f57579858cf559404510c897a9f2fe76b6cfed36 100644 (file)
@@ -21,6 +21,7 @@ import java.util.concurrent.Future;
 
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
@@ -204,7 +205,7 @@ DataModificationTransaction, Delegator<T> {
         public Future<RpcResult<TransactionStatus>> commit() {
             Preconditions.checkState(status == TransactionStatus.NEW);
             status = TransactionStatus.SUBMITED;
-            return getDelegate().commit();
+            return AbstractDataTransaction.convertToLegacyCommitFuture(getDelegate().submit());
         }
 
         @Override
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/NotificationModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/NotificationModule.java
deleted file mode 100644 (file)
index b298a02..0000000
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.dom.broker;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.BrokerService;
-import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
-import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
-import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
-import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
-import org.opendaylight.controller.sal.core.api.notify.NotificationService;
-import org.opendaylight.controller.sal.core.spi.BrokerModule;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-
-public class NotificationModule implements BrokerModule {
-    private static Logger log = LoggerFactory
-            .getLogger(NotificationModule.class);
-
-    private final Multimap<QName, NotificationListener> listeners = HashMultimap
-            .create();
-
-    private static final Set<Class<? extends BrokerService>> PROVIDED_SERVICE_TYPE = ImmutableSet
-            .<Class<? extends BrokerService>>of(NotificationService.class,
-                    NotificationPublishService.class);
-
-    private static final Set<Class<? extends ConsumerFunctionality>> SUPPORTED_CONSUMER_FUNCTIONALITY = ImmutableSet
-            .of((Class<? extends ConsumerFunctionality>) NotificationListener.class,
-                    NotificationListener.class); // Workaround: if we use the
-                                                 // version of method with only
-                                                 // one argument, the generics
-                                                 // inference will not work
-
-    @Override
-    public Set<Class<? extends BrokerService>> getProvidedServices() {
-        return PROVIDED_SERVICE_TYPE;
-    }
-
-    @Override
-    public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {
-        return SUPPORTED_CONSUMER_FUNCTIONALITY;
-    }
-
-    @Override
-    public <T extends BrokerService> T getServiceForSession(Class<T> service,
-            ConsumerSession session) {
-        if (NotificationPublishService.class.equals(service)
-                && session instanceof ProviderSession) {
-            @SuppressWarnings("unchecked")
-            T ret = (T) newNotificationPublishService(session);
-            return ret;
-        } else if (NotificationService.class.equals(service)) {
-
-            @SuppressWarnings("unchecked")
-            T ret = (T) newNotificationConsumerService(session);
-            return ret;
-        }
-
-        throw new IllegalArgumentException(
-                "The requested session-specific service is not provided by this module.");
-    }
-
-    private void sendNotification(CompositeNode notification) {
-        QName type = notification.getNodeType();
-        Collection<NotificationListener> toNotify = listeners.get(type);
-        log.trace("Publishing notification " + type);
-
-        if (toNotify == null) {
-            // No listeners were registered - returns.
-            return;
-        }
-
-        for (NotificationListener listener : toNotify) {
-            try {
-                // FIXME: ensure that notification is immutable
-                listener.onNotification(notification);
-            } catch (Exception e) {
-                log.error("Uncaught exception in NotificationListener", e);
-            }
-        }
-
-    }
-
-    private NotificationService newNotificationConsumerService(
-            ConsumerSession session) {
-        return new NotificationConsumerSessionImpl();
-    }
-
-    private NotificationPublishService newNotificationPublishService(
-            ConsumerSession session) {
-        return new NotificationProviderSessionImpl();
-    }
-
-    private class NotificationConsumerSessionImpl implements
-            NotificationService {
-
-        private final Multimap<QName, NotificationListener> consumerListeners = HashMultimap
-                .create();
-        private boolean closed = false;
-
-
-        @Override
-        public Registration<NotificationListener> addNotificationListener(QName notification,
-                NotificationListener listener) {
-            checkSessionState();
-            if (notification == null) {
-                throw new IllegalArgumentException(
-                        "Notification type must not be null.");
-            }
-            if (listener == null) {
-                throw new IllegalArgumentException("Listener must not be null.");
-            }
-
-            consumerListeners.put(notification, listener);
-            listeners.put(notification, listener);
-            log.trace("Registered listener for notification: " + notification);
-            return null; // Return registration Object.
-        }
-
-        public void removeNotificationListener(QName notification,
-                NotificationListener listener) {
-            checkSessionState();
-            if (notification == null) {
-                throw new IllegalArgumentException(
-                        "Notification type must not be null.");
-            }
-            if (listener == null) {
-                throw new IllegalArgumentException("Listener must not be null.");
-            }
-            consumerListeners.remove(notification, listener);
-            listeners.remove(notification, listener);
-        }
-
-        public void closeSession() {
-            closed = true;
-            Map<QName, Collection<NotificationListener>> toRemove = consumerListeners
-                    .asMap();
-            for (Entry<QName, Collection<NotificationListener>> entry : toRemove
-                    .entrySet()) {
-                listeners.remove(entry.getKey(), entry.getValue());
-            }
-        }
-
-        protected void checkSessionState() {
-            if (closed)
-                throw new IllegalStateException("Session is closed");
-        }
-    }
-
-    private class NotificationProviderSessionImpl extends
-            NotificationConsumerSessionImpl implements
-            NotificationPublishService {
-
-        @Override
-        public void publish(CompositeNode notification) {
-            checkSessionState();
-            if (notification == null)
-                throw new IllegalArgumentException(
-                        "Notification must not be null.");
-            NotificationModule.this.sendNotification(notification);
-        }
-    }
-
-    @Override
-    public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {
-        return Collections.emptySet();
-    }
-}
index 29e078918e65eeae080f62c61d6ce5bae4fc3a9f..2dec6f2e4d1bdf773001c66343c8cf1ad4549160 100644 (file)
@@ -169,7 +169,7 @@ public class DOMBrokerPerformanceTest {
                             public List<ListenableFuture<?>> call() throws Exception {
                                 List<ListenableFuture<?>> builder = new ArrayList<>(txNum);
                                 for (DOMDataReadWriteTransaction tx :transactions) {
-                                    builder.add(tx.commit());
+                                    builder.add(tx.submit());
                                 }
                                 return builder;
                             }
@@ -267,7 +267,7 @@ public class DOMBrokerPerformanceTest {
                 measure("Txs:1 Submit", new Callable<ListenableFuture<?>>() {
                     @Override
                     public ListenableFuture<?> call() throws Exception {
-                        return writeTx.commit();
+                        return writeTx.submit();
                     }
                 }).get();
                 return null;
index fec73d665b90763a30ac7cc400d2bf364ffdd358..b006ca94e5d1387bfd84e7a76eccba3700756905 100644 (file)
@@ -107,7 +107,7 @@ public class DOMBrokerTest {
                 TestModel.TEST_PATH);
         assertTrue(writeTxContainer.get().isPresent());
 
-        writeTx.commit().get();
+        writeTx.submit().get();
 
         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
index 38f08b30f94ea37d4e73e3537c5c7bd798eb12ed..3ea0bcefa5bab97ea12a9ead64e40cce49b78277 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -21,7 +20,6 @@ import java.util.concurrent.TimeoutException;
 
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
@@ -29,7 +27,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -80,7 +77,7 @@ public class DOMTransactionChainTest {
          * First transaction is marked as ready, we are able to allocate chained
          * transactions
          */
-        ListenableFuture<RpcResult<TransactionStatus>> firstWriteTxFuture = firstTx.commit();
+        ListenableFuture<Void> firstWriteTxFuture = firstTx.submit();
 
         /**
          * We alocate chained transaction - read transaction.
@@ -126,7 +123,7 @@ public class DOMTransactionChainTest {
         /**
          * third transaction is sealed and commited
          */
-        ListenableFuture<RpcResult<TransactionStatus>> thirdDeleteTxFuture = thirdDeleteTx.commit();
+        ListenableFuture<Void> thirdDeleteTxFuture = thirdDeleteTx.submit();
         assertCommitSuccessful(thirdDeleteTxFuture);
 
         /**
@@ -188,11 +185,9 @@ public class DOMTransactionChainTest {
         return tx;
     }
 
-    private static void assertCommitSuccessful(final ListenableFuture<RpcResult<TransactionStatus>> future)
+    private static void assertCommitSuccessful(final ListenableFuture<Void> future)
             throws InterruptedException, ExecutionException {
-        RpcResult<TransactionStatus> rpcResult = future.get();
-        assertTrue(rpcResult.isSuccessful());
-        assertEquals(TransactionStatus.COMMITED, rpcResult.getResult());
+        future.get();
     }
 
     private static void assertTestContainerExists(final DOMDataReadTransaction readTx) throws InterruptedException,
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java
new file mode 100644 (file)
index 0000000..ee62c0b
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.rpcservice.rev140701.RockTheHouseInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class SchemaUpdateForTransactionTest {
+
+    private static final InstanceIdentifier TOP_PATH = InstanceIdentifier.of(Top.QNAME);
+    private SchemaContext schemaContext;
+    private InMemoryDOMDataStore domStore;
+
+    @Before
+    public void setupStore() {
+        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+        loadSchemas(RockTheHouseInput.class);
+    }
+
+    public void loadSchemas(final Class<?>... classes) {
+        YangModuleInfo moduleInfo;
+        try {
+            ModuleInfoBackedContext context = ModuleInfoBackedContext.create();
+            for (Class<?> clz : classes) {
+                moduleInfo = BindingReflections.getModuleInfo(clz);
+
+                context.registerModuleInfo(moduleInfo);
+            }
+            schemaContext = context.tryToCreateSchemaContext().get();
+            domStore.onGlobalContextUpdated(schemaContext);
+        } catch (Exception e) {
+            Throwables.propagateIfPossible(e);
+        }
+    }
+
+    /**
+     * Test suite tests allocating transaction when schema context
+     * does not contain module necessary for client write,
+     * then triggering update of global schema context
+     * and then performing write (according to new module).
+     *
+     * If transaction between allocation and schema context was
+     * unmodified, it is safe to change its schema context
+     * to new one (e.g. it will be same as if allocated after
+     * schema context update.)
+     *
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    @Test
+    public void testTransactionSchemaUpdate() throws InterruptedException, ExecutionException {
+
+        assertNotNull(domStore);
+
+        // We allocate transaction, initial schema context does not
+        // contain Lists model
+        DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+        assertNotNull(writeTx);
+
+        // we trigger schema context update to contain Lists model
+        loadSchemas(RockTheHouseInput.class, Top.class);
+
+        /**
+         *
+         * Writes /test in writeTx, this write should not fail
+         * with IllegalArgumentException since /test is in
+         * schema context.
+         *
+         */
+        writeTx.write(TOP_PATH, ImmutableNodes.containerNode(Top.QNAME));
+
+    }
+
+}
index dca8fcafef4a88e31ecc6898dc52bc12b7aa5936..de4ac7ac18a106f88f530ac4dc16ea047976b091 100644 (file)
@@ -7,15 +7,19 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
@@ -36,9 +40,6 @@ import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
  */
@@ -53,6 +54,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     private final MessageTransformer<NetconfMessage> messageTransformer;
     private final SchemaContextProviderFactory schemaContextProviderFactory;
     private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+    private final NotificationHandler notificationHandler;
 
     public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
@@ -79,6 +81,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
         this.sourceProviderFactory = sourceProviderFactory;
         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
         this.schemaContextProviderFactory = schemaContextProviderFactory;
+        this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
     }
 
     @Override
@@ -99,6 +102,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                 final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
                 updateMessageTransformer(schemaContextProvider);
                 salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+                notificationHandler.onRemoteSchemaUp();
             }
         });
 
@@ -142,7 +146,63 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
 
     @Override
     public void onNotification(final NetconfMessage notification) {
-        final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
-        salFacade.onNotification(parsedNotification);
+        notificationHandler.handleNotification(notification);
+    }
+
+    /**
+     * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+     */
+    private final static class NotificationHandler {
+
+        private final RemoteDeviceHandler<?> salFacade;
+        private final List<NetconfMessage> cache = new LinkedList<>();
+        private final MessageTransformer<NetconfMessage> messageTransformer;
+        private boolean passNotifications = false;
+        private final RemoteDeviceId id;
+
+        NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+            this.salFacade = salFacade;
+            this.messageTransformer = messageTransformer;
+            this.id = id;
+        }
+
+        synchronized void handleNotification(final NetconfMessage notification) {
+            if(passNotifications) {
+                passNotification(messageTransformer.toNotification(notification));
+            } else {
+                cacheNotification(notification);
+            }
+        }
+
+        /**
+         * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+         */
+        synchronized void onRemoteSchemaUp() {
+            passNotifications = true;
+
+            for (final NetconfMessage cachedNotification : cache) {
+                passNotification(messageTransformer.toNotification(cachedNotification));
+            }
+
+            cache.clear();
+        }
+
+        private void cacheNotification(final NetconfMessage notification) {
+            Preconditions.checkState(passNotifications == false);
+
+            logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+            if(logger.isTraceEnabled()) {
+                logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+            }
+
+            cache.add(notification);
+        }
+
+        private void passNotification(final CompositeNode parsedNotification) {
+            logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+            Preconditions.checkNotNull(parsedNotification);
+            salFacade.onNotification(parsedNotification);
+        }
+
     }
 }
index 0ef76d330dbab15d908401a695e61ba25b0d1802..960f2ef2e80f094c3155e9d8a3632008ce7f13d6 100644 (file)
@@ -44,6 +44,7 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.ModifyAction;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
@@ -90,16 +91,14 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
     }
 
     private void sendMerge(final InstanceIdentifier key, final CompositeNode value) throws InterruptedException, ExecutionException {
-        sendEditRpc(createEditConfigStructure(key, Optional.<String>absent(), Optional.of(value)), Optional.<String>absent());
+        sendEditRpc(createEditConfigStructure(key, Optional.<ModifyAction>absent(), Optional.of(value)), Optional.<ModifyAction>absent());
     }
 
     private void sendDelete(final InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
-        // FIXME use org.opendaylight.yangtools.yang.data.api.ModifyAction instead of strings
-        // TODO add string lowercase value to ModifyAction enum entries
-        sendEditRpc(createEditConfigStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode>absent()), Optional.of("none"));
+        sendEditRpc(createEditConfigStructure(toDelete, Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
     }
 
-    private void sendEditRpc(final CompositeNode editStructure, final Optional<String> defaultOperation) throws InterruptedException, ExecutionException {
+    private void sendEditRpc(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) throws InterruptedException, ExecutionException {
         final ImmutableCompositeNode editConfigRequest = createEditConfigRequest(editStructure, defaultOperation);
         final RpcResult<CompositeNode> rpcResult = rpc.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, editConfigRequest).get();
 
@@ -110,7 +109,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         }
     }
 
-    private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<String> defaultOperation) {
+    private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) {
         final CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
 
         // Target
@@ -119,7 +118,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
 
         // Default operation
         if(defaultOperation.isPresent()) {
-            SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, defaultOperation.get());
+            final SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, modifyOperationToXmlString(defaultOperation.get()));
             ret.add(defOp);
         }
 
@@ -134,7 +133,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         return ret.toInstance();
     }
 
-    private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<String> operation,
+    private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<ModifyAction> operation,
             final Optional<CompositeNode> lastChildOverride) {
         Preconditions.checkArgument(Iterables.isEmpty(dataPath.getPathArguments()) == false, "Instance identifier with empty path %s", dataPath);
 
@@ -174,7 +173,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         return predicates;
     }
 
-    private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<String> operation, final Optional<CompositeNode> lastChildOverride) {
+    private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
         final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
         builder.setQName(arg.getNodeType());
 
@@ -182,7 +181,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         addPredicatesToCompositeNodeBuilder(predicates, builder);
 
         if (operation.isPresent()) {
-            builder.setAttribute(NETCONF_OPERATION_QNAME, operation.get());
+            builder.setAttribute(NETCONF_OPERATION_QNAME, modifyOperationToXmlString(operation.get()));
         }
         if (lastChildOverride.isPresent()) {
             final List<Node<?>> children = lastChildOverride.get().getValue();
@@ -196,6 +195,10 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         return builder.toInstance();
     }
 
+    private String modifyOperationToXmlString(final ModifyAction operation) {
+        return operation.name().toLowerCase();
+    }
+
     /**
      * Send commit rpc to finish the transaction
      * In case of failure or unexpected error response, ExecutionException is thrown
index ec2a820daab098618461d617155fa6804a369326..defaab629f3684de2be6a5b1d7a578749cd493e0 100644 (file)
@@ -12,6 +12,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.InputStream;
@@ -82,6 +83,32 @@ public class NetconfDeviceTest {
         Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
     }
 
+    @Test
+    public void testNotificationBeforeSchema() throws Exception {
+        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+        final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory());
+
+        device.onNotification(netconfMessage);
+        device.onNotification(netconfMessage);
+
+        verify(facade, times(0)).onNotification(any(CompositeNode.class));
+
+        final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+                Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&amp;revision=" + TEST_REVISION));
+
+        device.onRemoteSessionUp(sessionCaps, listener);
+
+        verify(messageTransformer, timeout(10000).times(2)).toNotification(netconfMessage);
+        verify(facade, times(2)).onNotification(compositeNode);
+
+        device.onNotification(netconfMessage);
+        verify(messageTransformer, times(3)).toNotification(netconfMessage);
+        verify(facade, times(3)).onNotification(compositeNode);
+    }
+
     @Test
     public void testNetconfDeviceReconnect() throws Exception {
         final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
@@ -136,6 +163,7 @@ public class NetconfDeviceTest {
         final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
         doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
         doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
+        doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
         return remoteDeviceHandler;
     }
 
@@ -173,6 +201,7 @@ public class NetconfDeviceTest {
         final MessageTransformer<NetconfMessage> messageTransformer = mockClass(MessageTransformer.class);
         doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class));
         doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class));
+        doReturn(compositeNode).when(messageTransformer).toNotification(any(NetconfMessage.class));
         doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class));
         return messageTransformer;
     }
@@ -197,4 +226,4 @@ public class NetconfDeviceTest {
         doReturn(Futures.immediateFuture(rpcResult)).when(remoteDeviceCommunicator).sendRequest(any(NetconfMessage.class), any(QName.class));
         return remoteDeviceCommunicator;
     }
-}
\ No newline at end of file
+}
index 02deb5a815a4b52b34dda1d7f5d7c9b2453142ec..28e358a06acb99a8c05d54b208fd476d826164f3 100644 (file)
@@ -7,10 +7,31 @@
  */
 package org.opendaylight.controller.cluster.datastore.util;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.URI;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.TransformerFactoryConfigurationError;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
 import org.custommonkey.xmlunit.Diff;
 import org.custommonkey.xmlunit.XMLUnit;
 import org.junit.Test;
@@ -44,29 +65,10 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.xml.sax.SAXException;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.URI;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 
 /**
@@ -96,8 +98,8 @@ public class NormalizedNodeXmlConverterTest {
     }
   }
 
-  public static DataSchemaNode getSchemaNode(SchemaContext context,
-      String moduleName, String childNodeName) {
+  public static DataSchemaNode getSchemaNode(final SchemaContext context,
+      final String moduleName, final String childNodeName) {
     for (Module module : context.getModules()) {
       if (module.getName().equals(moduleName)) {
         DataSchemaNode found =
@@ -111,12 +113,13 @@ public class NormalizedNodeXmlConverterTest {
         + childNodeName);
   }
 
-  static DataSchemaNode findChildNode(Set<DataSchemaNode> children, String name) {
+  static DataSchemaNode findChildNode(final Set<DataSchemaNode> children, final String name) {
     List<DataNodeContainer> containers = Lists.newArrayList();
 
     for (DataSchemaNode dataSchemaNode : children) {
-      if (dataSchemaNode.getQName().getLocalName().equals(name))
+      if (dataSchemaNode.getQName().getLocalName().equals(name)) {
         return dataSchemaNode;
+    }
       if (dataSchemaNode instanceof DataNodeContainer) {
         containers.add((DataNodeContainer) dataSchemaNode);
       } else if (dataSchemaNode instanceof ChoiceNode) {
@@ -135,13 +138,13 @@ public class NormalizedNodeXmlConverterTest {
   }
 
   private static InstanceIdentifier.NodeIdentifier getNodeIdentifier(
-      String localName) {
-    return new InstanceIdentifier.NodeIdentifier(new QName(
+      final String localName) {
+    return new InstanceIdentifier.NodeIdentifier(QName.create(
         URI.create(NAMESPACE), revision, localName));
   }
 
   public static InstanceIdentifier.AugmentationIdentifier getAugmentIdentifier(
-      String... childNames) {
+      final String... childNames) {
     Set<QName> qn = Sets.newHashSet();
 
     for (String childName : childNames) {
@@ -233,7 +236,7 @@ public class NormalizedNodeXmlConverterTest {
 
 
 
-  public void init(String yangPath, String xmlPath, ContainerNode expectedNode)
+  public void init(final String yangPath, final String xmlPath, final ContainerNode expectedNode)
       throws Exception {
     SchemaContext schema = parseTestSchema(yangPath);
     this.xmlPath = xmlPath;
@@ -242,7 +245,7 @@ public class NormalizedNodeXmlConverterTest {
     this.expectedNode = expectedNode;
   }
 
-  SchemaContext parseTestSchema(String yangPath) throws Exception {
+  SchemaContext parseTestSchema(final String yangPath) throws Exception {
 
     YangParserImpl yangParserImpl = new YangParserImpl();
     InputStream stream =
@@ -268,8 +271,9 @@ public class NormalizedNodeXmlConverterTest {
             .parse(Collections.singletonList(doc.getDocumentElement()),
                 containerNode);
 
-    if (expectedNode != null)
-      junit.framework.Assert.assertEquals(expectedNode, built);
+    if (expectedNode != null) {
+        junit.framework.Assert.assertEquals(expectedNode, built);
+    }
 
     logger.info("{}", built);
 
@@ -287,10 +291,9 @@ public class NormalizedNodeXmlConverterTest {
     System.out.println(toString(doc.getDocumentElement()));
     System.out.println(toString(el));
 
-    boolean diff =
-        new Diff(
-            XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
-            XMLUnit.buildTestDocument(toString(el))).similar();
+    new Diff(
+        XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
+        XMLUnit.buildTestDocument(toString(el))).similar();
   }
 
   private static ContainerNode listLeafListWithAttributes() {
@@ -353,8 +356,9 @@ public class NormalizedNodeXmlConverterTest {
             .parse(Collections.singletonList(doc.getDocumentElement()),
                 containerNode);
 
-    if (expectedNode != null)
-      junit.framework.Assert.assertEquals(expectedNode, built);
+    if (expectedNode != null) {
+        junit.framework.Assert.assertEquals(expectedNode, built);
+    }
 
     logger.info("{}", built);
 
@@ -372,14 +376,13 @@ public class NormalizedNodeXmlConverterTest {
     System.out.println(toString(doc.getDocumentElement()));
     System.out.println(toString(el));
 
-    boolean diff =
-        new Diff(
-            XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
-            XMLUnit.buildTestDocument(toString(el))).similar();
+    new Diff(
+        XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
+        XMLUnit.buildTestDocument(toString(el))).similar();
   }
 
 
-  private Document loadDocument(String xmlPath) throws Exception {
+  private Document loadDocument(final String xmlPath) throws Exception {
     InputStream resourceAsStream =
         NormalizedNodeXmlConverterTest.class.getResourceAsStream(xmlPath);
 
@@ -399,7 +402,7 @@ public class NormalizedNodeXmlConverterTest {
     BUILDERFACTORY = factory;
   }
 
-  private Document readXmlToDocument(InputStream xmlContent)
+  private Document readXmlToDocument(final InputStream xmlContent)
       throws IOException, SAXException {
     DocumentBuilder dBuilder;
     try {
@@ -413,7 +416,7 @@ public class NormalizedNodeXmlConverterTest {
     return doc;
   }
 
-  public static String toString(Element xml) {
+  public static String toString(final Element xml) {
     try {
       Transformer transformer =
           TransformerFactory.newInstance().newTransformer();
@@ -442,11 +445,10 @@ public class NormalizedNodeXmlConverterTest {
     System.out.println(toString(convertedDoc.getDocumentElement()));
     XMLUnit.setIgnoreWhitespace(true);
     XMLUnit.setIgnoreComments(true);
-    boolean diff =
-        new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
-            .getDocumentElement())),
-            XMLUnit.buildTestDocument(toString(convertedDoc
-                .getDocumentElement()))).similar();
+    new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
+        .getDocumentElement())),
+        XMLUnit.buildTestDocument(toString(convertedDoc
+            .getDocumentElement()))).similar();
     System.out.println(toString(expectedDoc.getDocumentElement()));
 
   }
@@ -464,11 +466,10 @@ public class NormalizedNodeXmlConverterTest {
     System.out.println(toString(convertedDoc.getDocumentElement()));
     XMLUnit.setIgnoreWhitespace(true);
     XMLUnit.setIgnoreComments(true);
-    boolean diff =
-        new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
-            .getDocumentElement())),
-            XMLUnit.buildTestDocument(toString(convertedDoc
-                .getDocumentElement()))).similar();
+    new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
+        .getDocumentElement())),
+        XMLUnit.buildTestDocument(toString(convertedDoc
+            .getDocumentElement()))).similar();
     System.out.println(toString(expectedDoc.getDocumentElement()));
 
     // now we will try to convert xml back to normalize node.
index 92a7014438cff009299fdd3a2cf37ae8959d0d6b..611fb490907584d4e47bee30686b7244af049140 100644 (file)
@@ -84,7 +84,7 @@ public class RestCodec {
                     if (input instanceof IdentityValuesDTO) {
                         return identityrefCodec.deserialize(input);
                     }
-                    logger.info(
+                    logger.debug(
                             "Value is not instance of IdentityrefTypeDefinition but is {}. Therefore NULL is used as translation of  - {}",
                             input == null ? "null" : input.getClass(), String.valueOf(input));
                     return null;
index 536d140cf1b78e05b7a3e77d38e43a32d8a1f8f4..ff4678d36fc507dfdf21794c452308fb71dbc3a1 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.sal.restconf.impl.test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -17,8 +18,6 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
@@ -28,16 +27,20 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
+
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.JerseyTest;
 import org.junit.BeforeClass;
@@ -64,11 +67,16 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 public class RestGetOperationTest extends JerseyTest {
 
     static class NodeData {
@@ -90,6 +98,8 @@ public class RestGetOperationTest extends JerseyTest {
     private static SchemaContext schemaContextModules;
     private static SchemaContext schemaContextBehindMountPoint;
 
+    private static final String RESTCONF_NS = "urn:ietf:params:xml:ns:yang:ietf-restconf";
+
     @BeforeClass
     public static void init() throws FileNotFoundException {
         schemaContextYangsIetf = TestUtils.loadSchemaContext("/full-versions/yangs");
@@ -241,7 +251,7 @@ public class RestGetOperationTest extends JerseyTest {
         validateModulesResponseJson(response);
 
         response = target(uri).request("application/yang.api+xml").get();
-        validateModulesResponseXml(response);
+        validateModulesResponseXml(response,schemaContextModules);
     }
 
     // /streams/
@@ -257,9 +267,12 @@ public class RestGetOperationTest extends JerseyTest {
         assertTrue(responseBody.contains("streams"));
 
         response = target(uri).request("application/yang.api+xml").get();
-        responseBody = response.readEntity(String.class);
-        assertNotNull(responseBody);
-        assertTrue(responseBody.contains("<streams xmlns=\"urn:ietf:params:xml:ns:yang:ietf-restconf\""));
+        Document responseXmlBody = response.readEntity(Document.class);
+        assertNotNull(responseXmlBody);
+        Element rootNode = responseXmlBody.getDocumentElement();
+
+        assertEquals("streams", rootNode.getLocalName());
+        assertEquals(RESTCONF_NS, rootNode.getNamespaceURI());
     }
 
     // /modules/module
@@ -271,18 +284,23 @@ public class RestGetOperationTest extends JerseyTest {
 
         Response response = target(uri).request("application/yang.api+xml").get();
         assertEquals(200, response.getStatus());
-        String responseBody = response.readEntity(String.class);
-        assertTrue("Module2 in xml wasn't found", prepareXmlRegex("module2", "2014-01-02", "module:2", responseBody)
-                .find());
-        String[] split = responseBody.split("<module");
-        assertEquals("<module element is returned more then once", 2, split.length);
+        Document responseXml = response.readEntity(Document.class);
+
+
+
+        QName qname = assertedModuleXmlToModuleQName(responseXml.getDocumentElement());
+        assertNotNull(qname);
+
+        assertEquals("module2", qname.getLocalName());
+        assertEquals("module:2", qname.getNamespace().toString());
+        assertEquals("2014-01-02", qname.getFormattedRevision());
 
         response = target(uri).request("application/yang.api+json").get();
         assertEquals(200, response.getStatus());
-        responseBody = response.readEntity(String.class);
+        String responseBody = response.readEntity(String.class);
         assertTrue("Module2 in json wasn't found", prepareJsonRegex("module2", "2014-01-02", "module:2", responseBody)
                 .find());
-        split = responseBody.split("\"module\"");
+        String[] split = responseBody.split("\"module\"");
         assertEquals("\"module\" element is returned more then once", 2, split.length);
 
     }
@@ -296,19 +314,12 @@ public class RestGetOperationTest extends JerseyTest {
 
         Response response = target(uri).request("application/yang.api+xml").get();
         assertEquals(200, response.getStatus());
-        String responseBody = response.readEntity(String.class);
-        assertTrue("Xml response for /operations dummy-rpc1-module1 is incorrect",
-                validateOperationsResponseXml(responseBody, "dummy-rpc1-module1", "module:1").find());
-        assertTrue("Xml response for /operations dummy-rpc2-module1 is incorrect",
-                validateOperationsResponseXml(responseBody, "dummy-rpc2-module1", "module:1").find());
-        assertTrue("Xml response for /operations dummy-rpc1-module2 is incorrect",
-                validateOperationsResponseXml(responseBody, "dummy-rpc1-module2", "module:2").find());
-        assertTrue("Xml response for /operations dummy-rpc2-module2 is incorrect",
-                validateOperationsResponseXml(responseBody, "dummy-rpc2-module2", "module:2").find());
+        Document responseDoc = response.readEntity(Document.class);
+        validateOperationsResponseXml(responseDoc, schemaContextModules);
 
         response = target(uri).request("application/yang.api+json").get();
         assertEquals(200, response.getStatus());
-        responseBody = response.readEntity(String.class);
+        String responseBody = response.readEntity(String.class);
         assertTrue("Json response for /operations dummy-rpc1-module1 is incorrect",
                 validateOperationsResponseJson(responseBody, "dummy-rpc1-module1", "module1").find());
         assertTrue("Json response for /operations dummy-rpc2-module1 is incorrect",
@@ -320,6 +331,30 @@ public class RestGetOperationTest extends JerseyTest {
 
     }
 
+    private void validateOperationsResponseXml(final Document responseDoc, final SchemaContext schemaContext) {
+        Element operationsElem = responseDoc.getDocumentElement();
+        assertEquals(RESTCONF_NS, operationsElem.getNamespaceURI());
+        assertEquals("operations", operationsElem.getLocalName());
+
+
+        HashSet<QName> foundOperations = new HashSet<>();
+
+        NodeList operationsList = operationsElem.getChildNodes();
+        for(int i = 0;i < operationsList.getLength();i++) {
+            org.w3c.dom.Node operation = operationsList.item(i);
+
+            String namespace = operation.getNamespaceURI();
+            String name = operation.getLocalName();
+            QName opQName = QName.create(URI.create(namespace), null, name);
+            foundOperations.add(opQName);
+        }
+
+        for(RpcDefinition schemaOp : schemaContext.getOperations()) {
+            assertTrue(foundOperations.contains(schemaOp.getQName().withoutRevision()));
+        }
+
+    }
+
     // /operations/pathToMountPoint/yang-ext:mount
     @Test
     public void getOperationsBehindMountPointTest() throws FileNotFoundException, UnsupportedEncodingException {
@@ -337,15 +372,13 @@ public class RestGetOperationTest extends JerseyTest {
 
         Response response = target(uri).request("application/yang.api+xml").get();
         assertEquals(200, response.getStatus());
-        String responseBody = response.readEntity(String.class);
-        assertTrue("Xml response for /operations/mount_point rpc-behind-module1 is incorrect",
-                validateOperationsResponseXml(responseBody, "rpc-behind-module1", "module:1:behind:mount:point").find());
-        assertTrue("Xml response for /operations/mount_point rpc-behind-module2 is incorrect",
-                validateOperationsResponseXml(responseBody, "rpc-behind-module2", "module:2:behind:mount:point").find());
+
+        Document responseDoc = response.readEntity(Document.class);
+        validateOperationsResponseXml(responseDoc, schemaContextBehindMountPoint);
 
         response = target(uri).request("application/yang.api+json").get();
         assertEquals(200, response.getStatus());
-        responseBody = response.readEntity(String.class);
+        String responseBody = response.readEntity(String.class);
         assertTrue("Json response for /operations/mount_point rpc-behind-module1 is incorrect",
                 validateOperationsResponseJson(responseBody, "rpc-behind-module1", "module1-behind-mount-point").find());
         assertTrue("Json response for /operations/mount_point rpc-behind-module2 is incorrect",
@@ -441,15 +474,7 @@ public class RestGetOperationTest extends JerseyTest {
 
         response = target(uri).request("application/yang.api+xml").get();
         assertEquals(200, response.getStatus());
-        responseBody = response.readEntity(String.class);
-        assertTrue(
-                "module1-behind-mount-point in json wasn't found",
-                prepareXmlRegex("module1-behind-mount-point", "2014-02-03", "module:1:behind:mount:point", responseBody)
-                        .find());
-        assertTrue(
-                "module2-behind-mount-point in json wasn't found",
-                prepareXmlRegex("module2-behind-mount-point", "2014-02-04", "module:2:behind:mount:point", responseBody)
-                        .find());
+        validateModulesResponseXml(response, schemaContextBehindMountPoint);
 
     }
 
@@ -481,26 +506,83 @@ public class RestGetOperationTest extends JerseyTest {
 
         response = target(uri).request("application/yang.api+xml").get();
         assertEquals(200, response.getStatus());
-        responseBody = response.readEntity(String.class);
-        assertTrue(
-                "module1-behind-mount-point in json wasn't found",
-                prepareXmlRegex("module1-behind-mount-point", "2014-02-03", "module:1:behind:mount:point", responseBody)
-                        .find());
-        split = responseBody.split("<module");
-        assertEquals("<module element is returned more then once", 2, split.length);
+        Document responseXml = response.readEntity(Document.class);
+
+        QName module = assertedModuleXmlToModuleQName(responseXml.getDocumentElement());
+
+        assertEquals("module1-behind-mount-point", module.getLocalName());
+        assertEquals("2014-02-03", module.getFormattedRevision());
+        assertEquals("module:1:behind:mount:point", module.getNamespace().toString());
+
 
     }
 
-    private void validateModulesResponseXml(final Response response) {
+    private void validateModulesResponseXml(final Response response, final SchemaContext schemaContext) {
         assertEquals(200, response.getStatus());
-        String responseBody = response.readEntity(String.class);
+        Document responseBody = response.readEntity(Document.class);
+        NodeList moduleNodes = responseBody.getDocumentElement().getElementsByTagNameNS(RESTCONF_NS, "module");
 
-        assertTrue("Module1 in xml wasn't found", prepareXmlRegex("module1", "2014-01-01", "module:1", responseBody)
-                .find());
-        assertTrue("Module2 in xml wasn't found", prepareXmlRegex("module2", "2014-01-02", "module:2", responseBody)
-                .find());
-        assertTrue("Module3 in xml wasn't found", prepareXmlRegex("module3", "2014-01-03", "module:3", responseBody)
-                .find());
+        assertTrue(moduleNodes.getLength() > 0);
+
+        HashSet<QName> foundModules = new HashSet<>();
+
+        for(int i=0;i < moduleNodes.getLength();i++) {
+            org.w3c.dom.Node module = moduleNodes.item(i);
+
+            QName name = assertedModuleXmlToModuleQName(module);
+            foundModules.add(name);
+        }
+
+        assertAllModules(foundModules,schemaContext);
+    }
+
+    private void assertAllModules(final Set<QName> foundModules, final SchemaContext schemaContext) {
+        for(Module module : schemaContext.getModules()) {
+            QName current = QName.create(module.getQNameModule(),module.getName());
+            assertTrue("Module not found in response.",foundModules.contains(current));
+        }
+
+    }
+
+    private QName assertedModuleXmlToModuleQName(final org.w3c.dom.Node module) {
+        assertEquals("module", module.getLocalName());
+        assertEquals(RESTCONF_NS, module.getNamespaceURI());
+        String revision = null;
+        String namespace = null;
+        String name = null;
+
+
+        NodeList childNodes = module.getChildNodes();
+
+        for(int i =0;i < childNodes.getLength(); i++) {
+            org.w3c.dom.Node child = childNodes.item(i);
+            assertEquals(RESTCONF_NS, child.getNamespaceURI());
+
+            switch(child.getLocalName()) {
+                case "name":
+                    assertNull("Name element appeared multiple times",name);
+                    name = child.getTextContent().trim();
+                    break;
+                case "revision":
+                    assertNull("Revision element appeared multiple times",revision);
+                    revision = child.getTextContent().trim();
+                    break;
+
+                case "namespace":
+                    assertNull("Namespace element appeared multiple times",namespace);
+                    namespace = child.getTextContent().trim();
+                    break;
+            }
+        }
+
+        assertNotNull("Revision was not part of xml",revision);
+        assertNotNull("Module namespace was not part of xml",namespace);
+        assertNotNull("Module identiffier was not part of xml",name);
+
+
+        // TODO Auto-generated method stub
+
+        return QName.create(namespace,revision,name);
     }
 
     private void validateModulesResponseJson(final Response response) {
@@ -542,34 +624,6 @@ public class RestGetOperationTest extends JerseyTest {
 
     }
 
-    private Matcher prepareXmlRegex(final String module, final String revision, final String namespace,
-            final String searchIn) {
-        StringBuilder regex = new StringBuilder();
-        regex.append("^");
-
-        regex.append(".*<module.*");
-        regex.append(".*>");
-
-        regex.append(".*<name>");
-        regex.append(".*" + module);
-        regex.append(".*<\\/name>");
-
-        regex.append(".*<revision>");
-        regex.append(".*" + revision);
-        regex.append(".*<\\/revision>");
-
-        regex.append(".*<namespace>");
-        regex.append(".*" + namespace);
-        regex.append(".*<\\/namespace>");
-
-        regex.append(".*<\\/module.*>");
-
-        regex.append(".*");
-        regex.append("$");
-
-        Pattern ptrn = Pattern.compile(regex.toString(), Pattern.DOTALL);
-        return ptrn.matcher(searchIn);
-    }
 
     private void prepareMockForModulesTest(final ControllerContext mockedControllerContext)
             throws FileNotFoundException {
@@ -626,7 +680,7 @@ public class RestGetOperationTest extends JerseyTest {
         getDataWithUriIncludeWhiteCharsParameter("operational");
     }
 
-    private void getDataWithUriIncludeWhiteCharsParameter(String target) throws UnsupportedEncodingException {
+    private void getDataWithUriIncludeWhiteCharsParameter(final String target) throws UnsupportedEncodingException {
         mockReadConfigurationDataMethod();
         String uri = "/" + target + "/ietf-interfaces:interfaces/interface/eth0";
         Response response = target(uri).queryParam("prettyPrint", "false").request("application/xml").get();
index de5ddd9a75298f61d5c4ce15c00b9df0e170cc92..b7518e094d0e61594a391c2fc4a78b181294e00a 100644 (file)
@@ -20,10 +20,10 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
@@ -100,24 +100,23 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
         executor.shutdown();
 
         if (dataProvider != null) {
-            WriteTransaction t = dataProvider.newWriteOnlyTransaction();
-            t.delete(LogicalDatastoreType.OPERATIONAL,TOASTER_IID);
-            ListenableFuture<RpcResult<TransactionStatus>> future = t.commit();
-            Futures.addCallback( future, new FutureCallback<RpcResult<TransactionStatus>>() {
+            WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
+            tx.delete(LogicalDatastoreType.OPERATIONAL,TOASTER_IID);
+            Futures.addCallback( tx.submit(), new FutureCallback<Void>() {
                 @Override
-                public void onSuccess( RpcResult<TransactionStatus> result ) {
+                public void onSuccess( final Void result ) {
                     LOG.debug( "Delete Toaster commit result: " + result );
                 }
 
                 @Override
-                public void onFailure( Throwable t ) {
+                public void onFailure( final Throwable t ) {
                     LOG.error( "Delete of Toaster failed", t );
                 }
             } );
         }
     }
 
-    private Toaster buildToaster( ToasterStatus status ) {
+    private Toaster buildToaster( final ToasterStatus status ) {
 
         // note - we are simulating a device whose manufacture and model are
         // fixed (embedded) into the hardware.
@@ -170,7 +169,7 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
 
         final SettableFuture<RpcResult<Void>> futureResult = SettableFuture.create();
 
-        checkStatusAndMakeToast( input, futureResult );
+        checkStatusAndMakeToast( input, futureResult, 2 );
 
         return futureResult;
     }
@@ -186,27 +185,27 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
     }
 
     private void checkStatusAndMakeToast( final MakeToastInput input,
-                                          final SettableFuture<RpcResult<Void>> futureResult ) {
+                                          final SettableFuture<RpcResult<Void>> futureResult,
+                                          final int tries ) {
 
         // Read the ToasterStatus and, if currently Up, try to write the status to Down.
         // If that succeeds, then we essentially have an exclusive lock and can proceed
         // to make toast.
 
         final ReadWriteTransaction tx = dataProvider.newReadWriteTransaction();
-        ListenableFuture<Optional<DataObject>> readFuture =
+        ListenableFuture<Optional<Toaster>> readFuture =
                                           tx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID );
 
-        final ListenableFuture<RpcResult<TransactionStatus>> commitFuture =
-            Futures.transform( readFuture, new AsyncFunction<Optional<DataObject>,
-                                                                   RpcResult<TransactionStatus>>() {
+        final ListenableFuture<Void> commitFuture =
+            Futures.transform( readFuture, new AsyncFunction<Optional<Toaster>,Void>() {
 
                 @Override
-                public ListenableFuture<RpcResult<TransactionStatus>> apply(
-                        Optional<DataObject> toasterData ) throws Exception {
+                public ListenableFuture<Void> apply(
+                        final Optional<Toaster> toasterData ) throws Exception {
 
                     ToasterStatus toasterStatus = ToasterStatus.Up;
                     if( toasterData.isPresent() ) {
-                        toasterStatus = ((Toaster)toasterData.get()).getToasterStatus();
+                        toasterStatus = toasterData.get().getToasterStatus();
                     }
 
                     LOG.debug( "Read toaster status: {}", toasterStatus );
@@ -216,8 +215,8 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
                         if( outOfBread() ) {
                             LOG.debug( "Toaster is out of bread" );
 
-                            return Futures.immediateFuture( RpcResultBuilder.<TransactionStatus>failed()
-                                    .withRpcError( makeToasterOutOfBreadError() ).build() );
+                            return Futures.immediateFailedCheckedFuture(
+                                    new TransactionCommitFailedException( "", makeToasterOutOfBreadError() ) );
                         }
 
                         LOG.debug( "Setting Toaster status to Down" );
@@ -227,7 +226,7 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
                         // concurrent toasting.
                         tx.put( LogicalDatastoreType.OPERATIONAL, TOASTER_IID,
                                 buildToaster( ToasterStatus.Down ) );
-                        return tx.commit();
+                        return tx.submit();
                     }
 
                     LOG.debug( "Oops - already making toast!" );
@@ -235,51 +234,44 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
                     // Return an error since we are already making toast. This will get
                     // propagated to the commitFuture below which will interpret the null
                     // TransactionStatus in the RpcResult as an error condition.
-                    return Futures.immediateFuture( RpcResultBuilder.<TransactionStatus>failed()
-                            .withRpcError( makeToasterInUseError() ).build() );
+                    return Futures.immediateFailedCheckedFuture(
+                            new TransactionCommitFailedException( "", makeToasterInUseError() ) );
                 }
         } );
 
-        Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+        Futures.addCallback( commitFuture, new FutureCallback<Void>() {
             @Override
-            public void onSuccess( RpcResult<TransactionStatus> result ) {
-                if( result.getResult() == TransactionStatus.COMMITED  ) {
-
-                    // OK to make toast
-                    currentMakeToastTask.set( executor.submit(
-                                                    new MakeToastTask( input, futureResult ) ) );
-                } else {
-
-                    LOG.debug( "Setting error result" );
-
-                    // Either the transaction failed to commit for some reason or, more likely,
-                    // the read above returned ToasterStatus.Down. Either way, fail the
-                    // futureResult and copy the errors.
-
-                    futureResult.set( RpcResultBuilder.<Void>failed().withRpcErrors(
-                                                                     result.getErrors() ).build() );
-                }
+            public void onSuccess( final Void result ) {
+                // OK to make toast
+                currentMakeToastTask.set( executor.submit( new MakeToastTask( input, futureResult ) ) );
             }
 
             @Override
-            public void onFailure( Throwable ex ) {
+            public void onFailure( final Throwable ex ) {
                 if( ex instanceof OptimisticLockFailedException ) {
 
                     // Another thread is likely trying to make toast simultaneously and updated the
                     // status before us. Try reading the status again - if another make toast is
                     // now in progress, we should get ToasterStatus.Down and fail.
 
-                    LOG.debug( "Got OptimisticLockFailedException - trying again" );
+                    if( ( tries - 1 ) > 0 ) {
+                        LOG.debug( "Got OptimisticLockFailedException - trying again" );
 
-                    checkStatusAndMakeToast( input, futureResult );
+                        checkStatusAndMakeToast( input, futureResult, tries - 1 );
+                    }
+                    else {
+                        futureResult.set( RpcResultBuilder.<Void> failed()
+                                .withError( ErrorType.APPLICATION, ex.getMessage() ).build() );
+                    }
 
                 } else {
 
-                    LOG.error( "Failed to commit Toaster status", ex );
+                    LOG.debug( "Failed to commit Toaster status", ex );
 
-                    // Got some unexpected error so fail.
+                    // Probably already making toast.
                     futureResult.set( RpcResultBuilder.<Void> failed()
-                                        .withError( ErrorType.APPLICATION, ex.getMessage() ).build() );
+                            .withRpcErrors( ((TransactionCommitFailedException)ex).getErrorList() )
+                            .build() );
                 }
             }
         } );
@@ -327,20 +319,14 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
         WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
         tx.put( LogicalDatastoreType.OPERATIONAL,TOASTER_IID, buildToaster( ToasterStatus.Up ) );
 
-        ListenableFuture<RpcResult<TransactionStatus>> commitFuture = tx.commit();
-
-        Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+        Futures.addCallback( tx.submit(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess( RpcResult<TransactionStatus> result ) {
-                if( result.getResult() != TransactionStatus.COMMITED ) {
-                    LOG.error( "Failed to update toaster status: " + result.getErrors() );
-                }
-
-                notifyCallback( result.getResult() == TransactionStatus.COMMITED );
+            public void onSuccess( final Void result ) {
+                notifyCallback( true );
             }
 
             @Override
-            public void onFailure( Throwable t ) {
+            public void onFailure( final Throwable t ) {
                 // We shouldn't get an OptimisticLockFailedException (or any ex) as no
                 // other component should be updating the operational state.
                 LOG.error( "Failed to update toaster status", t );
@@ -348,7 +334,7 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
                 notifyCallback( false );
             }
 
-            void notifyCallback( boolean result ) {
+            void notifyCallback( final boolean result ) {
                 if( resultCallback != null ) {
                     resultCallback.apply( result );
                 }
@@ -400,7 +386,7 @@ public class OpendaylightToaster implements ToasterService, ToasterProviderRunti
 
             setToasterStatusUp( new Function<Boolean,Void>() {
                 @Override
-                public Void apply( Boolean result ) {
+                public Void apply( final Boolean result ) {
 
                     currentMakeToastTask.set( null );
 
index 5840c9dbd14d1aa3b3fe050d079e426d8ec21f20..85cdf0335fecc793fc29e0c357f41cf4ee41c1e6 100644 (file)
@@ -9,33 +9,65 @@
 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
 
 import java.lang.ref.SoftReference;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import java.util.concurrent.atomic.AtomicReference;
 
-import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class YangStoreServiceImpl implements YangStoreService {
+    private static final Logger LOG = LoggerFactory.getLogger(YangStoreServiceImpl.class);
+
+    /**
+     * This is a rather interesting locking model. We need to guard against both the
+     * cache expiring from GC and being invalidated by schema context change. The
+     * context can change while we are doing processing, so we do not want to block
+     * it, so no synchronization can happen on the methods.
+     *
+     * So what we are doing is the following:
+     *
+     * We synchronize with GC as usual, using a SoftReference.
+     *
+     * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
+     * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
+     * that may happen while the getter is already busy acting on the old schema context,
+     * so it needs to understand that a refresh has happened and retry. To do that, it
+     * attempts a CAS operation -- if it fails, in knows that the SoftReference has
+     * been replaced and thus it needs to retry.
+     *
+     * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
+     * to stop multiple threads doing the same work.
+     */
+    private final AtomicReference<SoftReference<YangStoreSnapshotImpl>> ref = new AtomicReference<>(new SoftReference<YangStoreSnapshotImpl>(null));
     private final SchemaContextProvider service;
-    @GuardedBy("this")
-    private SoftReference<YangStoreSnapshotImpl> cache = new SoftReference<>(null);
 
-    public YangStoreServiceImpl(SchemaContextProvider service) {
+    public YangStoreServiceImpl(final SchemaContextProvider service) {
         this.service = service;
     }
 
     @Override
     public synchronized YangStoreSnapshotImpl getYangStoreSnapshot() throws YangStoreException {
-        YangStoreSnapshotImpl yangStoreSnapshot = cache.get();
-        if (yangStoreSnapshot == null) {
-            yangStoreSnapshot = new YangStoreSnapshotImpl(service.getSchemaContext());
-            cache = new SoftReference<>(yangStoreSnapshot);
+        SoftReference<YangStoreSnapshotImpl> r = ref.get();
+        YangStoreSnapshotImpl ret = r.get();
+
+        while (ret == null) {
+            // We need to be compute a new value
+            ret = new YangStoreSnapshotImpl(service.getSchemaContext());
+
+            if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
+                LOG.debug("Concurrent refresh detected, recomputing snapshot");
+                r = ref.get();
+                ret = null;
+            }
         }
-        return yangStoreSnapshot;
+
+        return ret;
     }
 
     /**
      * Called when schema context changes, invalidates cache.
      */
-    public synchronized void refresh() {
-        cache.clear();
+    public void refresh() {
+        ref.set(new SoftReference<YangStoreSnapshotImpl>(null));
     }
 }
index 723e484c9c456eee6dfed86a8805cd1fcf624781..e9383886eaac79f2462ff262bf6097789142205d 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.netconf.cli.commands.local;
 
-import com.google.common.collect.Lists;
 import org.opendaylight.controller.netconf.cli.NetconfDeviceConnectionManager;
 import org.opendaylight.controller.netconf.cli.commands.AbstractCommand;
 import org.opendaylight.controller.netconf.cli.commands.Command;
@@ -21,6 +20,8 @@ import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 
+import com.google.common.collect.Lists;
+
 /**
  * Local disconnect command
  */
@@ -40,7 +41,7 @@ public class Disconnect extends AbstractCommand {
         connectionManager.disconnect();
 
         return new Output(new CompositeNodeTOImpl(getCommandId(), null,
-                Lists.<Node<?>> newArrayList(new SimpleNodeTOImpl<>(new QName(getCommandId(), "status"), null,
+                Lists.<Node<?>> newArrayList(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "status"), null,
                         "Connection disconnected"))));
     }
 
index 2021bf722c5390560e2860f334563c34323d6815..5a2445279b4df24214d0c998f8fa78ba6f17d3a2 100644 (file)
@@ -7,20 +7,23 @@
  */
 package org.opendaylight.controller.netconf.cli.io;
 
-import com.google.common.collect.Maps;
 import java.util.Map;
+
 import junit.framework.Assert;
+
 import org.junit.Test;
 import org.opendaylight.controller.netconf.cli.commands.CommandConstants;
 import org.opendaylight.yangtools.yang.common.QName;
 
+import com.google.common.collect.Maps;
+
 public class IOUtilTest {
 
     @Test
     public void testQNameFromKeyStringNew() throws Exception {
         final String s = IOUtil.qNameToKeyString(CommandConstants.HELP_QNAME, "module");
         final Map<String, QName> modulesMap = Maps.newHashMap();
-        modulesMap.put("module", new QName(CommandConstants.HELP_QNAME, "module"));
+        modulesMap.put("module", QName.create(CommandConstants.HELP_QNAME, "module"));
         final QName qName = IOUtil.qNameFromKeyString(s, modulesMap);
         Assert.assertEquals(CommandConstants.HELP_QNAME, qName);
     }
index 829ac304bd667f680d813e32ff58ed9f9b0b6f37..87b3f837e8c43a25f74f78cfbdd0b425bd8d94e9 100644 (file)
@@ -11,9 +11,8 @@ import io.netty.channel.Channel;
 import io.netty.util.concurrent.Promise;
 import java.io.IOException;
 import org.opendaylight.controller.netconf.nettyutil.AbstractChannelInitializer;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.SshHandler;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.Invoker;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 
 final class SshClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
@@ -33,8 +32,7 @@ final class SshClientChannelInitializer extends AbstractChannelInitializer<Netco
     @Override
     public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
         try {
-            final Invoker invoker = Invoker.subsystem("netconf");
-            ch.pipeline().addFirst(new SshHandler(authenticationHandler, invoker));
+            ch.pipeline().addFirst(SshHandler.createForNetconfSubsystem(authenticationHandler));
             super.initialize(ch,promise);
         } catch (final IOException e) {
             throw new RuntimeException(e);
index afa17532d55ce6628b1659db90e666acf1e97250..18ed18e4ae2d4e89e658995193e8102b8c17b49c 100644 (file)
@@ -56,7 +56,7 @@ public class TestingNetconfClient implements Closeable {
         this.label = clientLabel;
         sessionListener = config.getSessionListener();
         Future<NetconfClientSession> clientFuture = netconfClientDispatcher.createClient(config);
-        clientSession = get(clientFuture);
+        clientSession = get(clientFuture);//TODO: make static
         this.sessionId = clientSession.getSessionId();
     }
 
index 1360a54d6fbaf2609067f1617337e8af76ec7852..de3f732b25763fa19d6b481a64bfbcf4d8bcf87c 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.nettyutil;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -104,7 +105,7 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
 
     private void start() {
         final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
-        logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
+        logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
 
         channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
 
@@ -125,12 +126,20 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
                         // Do not fail negotiation if promise is done or canceled
                         // It would result in setting result of the promise second time and that throws exception
                         if (isPromiseFinished() == false) {
-                            // FIXME BUG-1365 calling "negotiation failed" closes the channel, but the channel does not get closed if data is still being transferred
-                            // Loopback connection initiation might
                             negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+                            changeState(State.FAILED);
+
+                            channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+                                @Override
+                                public void operationComplete(ChannelFuture future) throws Exception {
+                                    if(future.isSuccess()) {
+                                        logger.debug("Channel {} closed: success", future.channel());
+                                    } else {
+                                        logger.warn("Channel {} closed: fail", future.channel());
+                                    }
+                                }
+                            });
                         }
-
-                        changeState(State.FAILED);
                     } else if(channel.isOpen()) {
                         channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
                     }
@@ -214,9 +223,9 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
     protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
 
     private synchronized void changeState(final State newState) {
-        logger.debug("Changing state from : {} to : {}", state, newState);
-        Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
-                newState);
+        logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
+        Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state,
+                newState, channel);
         this.state = newState;
     }
 
index f260bcbcefbf0fff5baa41bda46b8b6c4efc617a..a87a08ded72ea97db3396c24697b780be501911d 100644 (file)
@@ -9,56 +9,15 @@
 package org.opendaylight.controller.netconf.nettyutil.handler;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
+public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
 
-public class NetconfEOMAggregator extends ByteToMessageDecoder {
-    private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+    public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(NetconfMessageConstants.END_OF_MESSAGE);
 
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
-        int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
-        if (index == -1) {
-            logger.debug("Message is not complete, read again.");
-            if (logger.isTraceEnabled()) {
-                String str = in.toString(Charsets.UTF_8);
-                logger.trace("Message read so far: {}", str);
-            }
-            ctx.read();
-        } else {
-            ByteBuf msg = in.readBytes(index);
-            in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
-            in.discardReadBytes();
-            logger.debug("Message is complete.");
-            out.add(msg);
-        }
+    public NetconfEOMAggregator() {
+        super(Integer.MAX_VALUE, DELIMITER);
     }
-
-    private int indexOfSequence(ByteBuf in, byte[] sequence) {
-        int index = -1;
-        for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) {
-            if (in.getByte(i) == sequence[0]) {
-                index = i;
-                for (int j = 1; j < sequence.length; j++) {
-                    if (in.getByte(i + j) != sequence[j]) {
-                        index = -1;
-                        break;
-                    }
-                }
-                if (index != -1) {
-                    return index;
-                }
-            }
-        }
-        return index;
-    }
-
 }
index fae2000bb53c7b0873f90cc677464ef04be6666a..d810a870ff29f68b59b19788bf3064e0218cb148 100644 (file)
@@ -12,6 +12,7 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -61,7 +62,8 @@ public class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMess
             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
 
-            StreamResult result = new StreamResult(new OutputStreamWriter(os));
+            // Wrap OutputStreamWriter with BufferedWriter as suggested in javadoc for OutputStreamWriter
+            StreamResult result = new StreamResult(new BufferedWriter(new OutputStreamWriter(os)));
             DOMSource source = new DOMSource(msg.getDocument());
             transformer.transform(source, result);
         }
index 84353a4646d4500a4fa612d23ad71545e0f70bd4..993709258a3410b80a8a6fbdde47f0374ca15619 100644 (file)
@@ -7,14 +7,12 @@
  */
 package org.opendaylight.controller.netconf.nettyutil.handler.exi;
 
-import org.opendaylight.controller.netconf.api.NetconfMessage;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.openexi.proc.common.AlignmentType;
 import org.openexi.proc.common.EXIOptions;
 import org.openexi.proc.common.EXIOptionsException;
 
-import com.google.common.base.Preconditions;
-
 public final class EXIParameters {
     private static final String EXI_PARAMETER_ALIGNMENT = "alignment";
     private static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
@@ -29,20 +27,12 @@ public final class EXIParameters {
     private static final String EXI_FIDELITY_PIS = "pis";
     private static final String EXI_FIDELITY_PREFIXES = "prefixes";
 
-    private static final String EXI_PARAMETER_SCHEMA = "schema";
-    private static final String EXI_PARAMETER_SCHEMA_NONE = "none";
-    private static final String EXI_PARAMETER_SCHEMA_BUILT_IN = "builtin";
-    private static final String EXI_PARAMETER_SCHEMA_BASE_1_1 = "base:1.1";
-
     private final EXIOptions options;
 
     private EXIParameters(final EXIOptions options) {
         this.options = Preconditions.checkNotNull(options);
     }
 
-    public static EXIParameters fromNetconfMessage(final NetconfMessage root) throws EXIOptionsException {
-        return fromXmlElement(XmlElement.fromDomDocument(root.getDocument()));
-    }
 
     public static EXIParameters fromXmlElement(final XmlElement root) throws EXIOptionsException {
         final EXIOptions options =  new EXIOptions();
@@ -77,30 +67,6 @@ public final class EXIParameters {
                 options.setPreserveNS(true);
             }
         }
-
-        if (root.getElementsByTagName(EXI_PARAMETER_SCHEMA).getLength() > 0) {
-/*
-                        GrammarFactory grammarFactory = GrammarFactory.newInstance();
-                        if (operationElement
-                                .getElementsByTagName(EXI_PARAMETER_SCHEMA_NONE)
-                                .getLength() > 0) {
-                            this.grammars = grammarFactory.createSchemaLessGrammars();
-                        }
-
-                        if (operationElement.getElementsByTagName(
-                                EXI_PARAMETER_SCHEMA_BUILT_IN).getLength() > 0) {
-                            this.grammars = grammarFactory.createXSDTypesOnlyGrammars();
-                        }
-
-                        if (operationElement.getElementsByTagName(
-                                EXI_PARAMETER_SCHEMA_BASE_1_1).getLength() > 0) {
-                            this.grammars = grammarFactory
-                                    .createGrammars(NETCONF_XSD_LOCATION);
-                        }
-*/
-
-        }
-
         return new EXIParameters(options);
     }
 
index 1b0a34d7e0fa7d0b6ab32c6c3600de52a5f1543f..72eb774b5303efb14769d7fe1da644ea34456d82 100644 (file)
@@ -48,7 +48,7 @@ public final class NetconfStartExiMessage extends NetconfMessage {
         Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
                 START_EXI);
 
-        addAlignemnt(exiOptions, doc, startExiElement);
+        addAlignment(exiOptions, doc, startExiElement);
         addFidelity(exiOptions, doc, startExiElement);
 
         rpcElement.appendChild(startExiElement);
@@ -75,7 +75,7 @@ public final class NetconfStartExiMessage extends NetconfMessage {
         }
     }
 
-    private static void addAlignemnt(EXIOptions exiOptions, Document doc, Element startExiElement) {
+    private static void addAlignment(EXIOptions exiOptions, Document doc, Element startExiElement) {
         Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
                 ALIGNMENT_KEY);
         alignmentElement.setTextContent(exiOptions.getAlignmentType().toString());
index 67027d8014881f7b1286a2eb6229ae3258a32a8a..b67aa0f96dcd53a084965fc3766399a26f2a5869 100644 (file)
@@ -14,7 +14,7 @@ import java.io.IOException;
 
 /**
  * Class Providing username/password authentication option to
- * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.SshHandler}
+ * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler}
  */
 public class LoginPassword extends AuthenticationHandler {
     private final String username;
index d542e1952a5e847e09ad771828b1ab57df7b1989..eab2546d6e430d64a185a9ae2e44afd83375259b 100644 (file)
@@ -13,33 +13,38 @@ import java.io.IOException;
  * Abstract class providing mechanism of invoking various SSH level services.
  * Class is not allowed to be extended, as it provides its own implementations via instance initiators.
  */
-public abstract class Invoker {
+abstract class Invoker {
     private boolean invoked = false;
 
-    private Invoker(){}
+    private Invoker() {
+    }
 
     protected boolean isInvoked() {
-        // TODO invoked is always false
         return invoked;
     }
 
+    public void setInvoked() {
+        this.invoked = true;
+    }
+
     abstract void invoke(SshSession session) throws IOException;
 
-    /**
-     * Invoker implementation to invokes subsystem SSH service.
-     *
-     * @param subsystem
-     * @return
-     */
+    public static Invoker netconfSubsystem(){
+        return subsystem("netconf");
+    }
+
     public static Invoker subsystem(final String subsystem) {
         return new Invoker() {
             @Override
-            void invoke(SshSession session) throws IOException {
+            synchronized void invoke(SshSession session) throws IOException {
                 if (isInvoked()) {
                     throw new IllegalStateException("Already invoked.");
                 }
-
-                session.startSubSystem(subsystem);
+                try {
+                    session.startSubSystem(subsystem);
+                } finally {
+                    setInvoked();
+                }
             }
         };
     }
index 3520fe029d41cbf01218cb6c8df9dd4500522be3..271b781b99aa2ebd1154c8a4786180a574d0594b 100644 (file)
@@ -10,18 +10,16 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.channel.Channel;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocket;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocket;
 
 /**
  * Wrapper class around GANYMED SSH java library.
  */
-public class SshClient {
+class SshClient {
     private final VirtualSocket socket;
     private final Map<Integer, SshSession> openSessions = new HashMap<>();
     private final AuthenticationHandler authenticationHandler;
@@ -51,15 +49,10 @@ public class SshClient {
         authenticationHandler.authenticate(connection);
     }
 
-    public void closeSession(SshSession session) {
-        if (session.getState() == Channel.STATE_OPEN || session.getState() == Channel.STATE_OPENING) {
-            session.close();
-        }
-    }
 
     public void close() {
         for (SshSession session : openSessions.values()){
-            closeSession(session);
+            session.close();
         }
 
         openSessions.clear();
@@ -68,4 +61,11 @@ public class SshClient {
             connection.close();
         }
     }
+
+    @Override
+    public String toString() {
+        return "SshClient{" +
+                "socket=" + socket +
+                '}';
+    }
 }
index ad8b25ff2156d8e937d65d054b41b1e3f34c159e..1a2eb3f1ab43d188179351341264c0e46bc52350 100644 (file)
@@ -8,8 +8,13 @@
 
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -18,7 +23,6 @@ import java.io.OutputStream;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,7 +31,7 @@ import org.slf4j.LoggerFactory;
  * Worker thread class. Handles all downstream and upstream events in SSH Netty
  * pipeline.
  */
-public class SshClientAdapter implements Runnable {
+class SshClientAdapter implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
 
     private static final int BUFFER_SIZE = 1024;
@@ -51,6 +55,7 @@ public class SshClientAdapter implements Runnable {
         this.invoker = invoker;
     }
 
+    // TODO: refactor
     public void run() {
         try {
             SshSession session = sshClient.openSession();
@@ -80,12 +85,6 @@ public class SshClientAdapter implements Runnable {
                 byteBuf.writeBytes(tranBuff);
                 ctx.fireChannelRead(byteBuf);
             }
-
-        } catch (VirtualSocketException e) {
-            // Netty closed connection prematurely.
-            // Or maybe tried to open ganymed connection without having initialized session
-            // (ctx.channel().remoteAddress() is null)
-            // Just pass and move on.
         } catch (Exception e) {
             logger.error("Unexpected exception", e);
         } finally {
@@ -123,12 +122,23 @@ public class SshClientAdapter implements Runnable {
         }
     }
 
-    public void start(ChannelHandlerContext ctx) {
-        if (this.ctx != null) {
-            // context is already associated.
-            return;
+    public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
+        checkArgument(channelFuture.isSuccess());
+        checkNotNull(ctx.channel().remoteAddress());
+        synchronized (this) {
+            checkState(this.ctx == null);
+            this.ctx = ctx;
         }
-        this.ctx = ctx;
-        new Thread(this).start();
+        String threadName = toString();
+        Thread thread = new Thread(this, threadName);
+        thread.start();
+        return thread;
+    }
+
+    @Override
+    public String toString() {
+        return "SshClientAdapter{" +
+                "sshClient=" + sshClient +
+                '}';
     }
 }
@@ -6,7 +6,7 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh;
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
@@ -14,26 +14,30 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
-
 import java.io.IOException;
 import java.net.SocketAddress;
-
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshClient;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.Invoker;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshClientAdapter;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Netty SSH handler class. Acts as interface between Netty and SSH library. All standard Netty message handling
  * stops at instance of this class. All downstream events are handed of to wrapped {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshClientAdapter};
  */
 public class SshHandler extends ChannelOutboundHandlerAdapter {
+    private static final Logger logger = LoggerFactory.getLogger(SshHandler.class);
     private static final String SOCKET = "socket";
 
     private final VirtualSocket virtualSocket = new VirtualSocket();
     private final SshClientAdapter sshClientAdapter;
 
+
+    public static SshHandler createForNetconfSubsystem(AuthenticationHandler authenticationHandler) throws IOException {
+        return new SshHandler(authenticationHandler, Invoker.netconfSubsystem());
+    }
+
+
     public SshHandler(AuthenticationHandler authenticationHandler, Invoker invoker) throws IOException {
         SshClient sshClient = new SshClient(virtualSocket, authenticationHandler);
         this.sshClientAdapter = new SshClientAdapter(sshClient, invoker);
@@ -67,7 +71,11 @@ public class SshHandler extends ChannelOutboundHandlerAdapter {
 
         promise.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture channelFuture) {
-                sshClientAdapter.start(ctx);
+                if (channelFuture.isSuccess()) {
+                    sshClientAdapter.start(ctx, channelFuture);
+                } else {
+                    logger.debug("Failed to connect to remote host");
+                }
             }}
         );
     }
index 8311554cdafde19ca8a2e994d22a0f2b2971b12c..44893b879431fe738e40e91ea26c88c7b9c6009d 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 import ch.ethz.ssh2.Session;
 import ch.ethz.ssh2.StreamGobbler;
 
+import ch.ethz.ssh2.channel.Channel;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -19,33 +20,18 @@ import java.io.OutputStream;
 /**
  * Wrapper class for proprietary SSH sessions implementations
  */
-public class SshSession implements Closeable {
+class SshSession implements Closeable {
     private final Session session;
 
     public SshSession(Session session) {
         this.session = session;
     }
 
-    public void execCommand(String cmd) throws IOException {
-        session.execCommand(cmd);
-    }
-
-    public void execCommand(String cmd, String charsetName) throws IOException {
-        session.execCommand(cmd, charsetName);
-    }
-
-    public void startShell() throws IOException {
-        session.startShell();
-    }
 
     public void startSubSystem(String name) throws IOException {
         session.startSubSystem(name);
     }
 
-    public int getState() {
-        return session.getState();
-    }
-
     public InputStream getStdout() {
         return new StreamGobbler(session.getStdout());
     }
@@ -58,24 +44,10 @@ public class SshSession implements Closeable {
         return session.getStdin();
     }
 
-    public int waitUntilDataAvailable(long timeout) throws IOException {
-        return session.waitUntilDataAvailable(timeout);
-    }
-
-    public int waitForCondition(int conditionSet, long timeout) {
-        return session.waitForCondition(conditionSet, timeout);
-    }
-
-    public Integer getExitStatus() {
-        return session.getExitStatus();
-    }
-
-    public String getExitSignal() {
-        return session.getExitSignal();
-    }
-
     @Override
     public void close() {
-        session.close();
+        if (session.getState() == Channel.STATE_OPEN || session.getState() == Channel.STATE_OPENING) {
+            session.close();
+        }
     }
 }
index 6debeba97e3fbee1e6a1a96cdba65d90d038ecda..69cce8057ebf13deac5c9234cfcca1638ba8b95b 100644 (file)
@@ -25,32 +25,33 @@ import java.nio.channels.SocketChannel;
  * use OIO application in asynchronous environment and NIO EventLoop. Using VirtualSocket OIO applications
  * are able to use full potential of NIO environment.
  */
+//TODO: refactor - socket should be created when connection is established
 public class VirtualSocket extends Socket implements ChannelHandler {
     private static final String INPUT_STREAM = "inputStream";
     private static final String OUTPUT_STREAM = "outputStream";
 
-    private final ChannelInputStream chis = new ChannelInputStream();
-    private final ChannelOutputStream chos = new ChannelOutputStream();
+    private final ChannelInputStream chais = new ChannelInputStream();
+    private final ChannelOutputStream chaos = new ChannelOutputStream();
     private ChannelHandlerContext ctx;
 
 
     public InputStream getInputStream() {
-        return this.chis;
+        return this.chais;
     }
 
     public OutputStream getOutputStream() {
-        return this.chos;
+        return this.chaos;
     }
 
     public void handlerAdded(ChannelHandlerContext ctx) {
         this.ctx = ctx;
 
         if (ctx.channel().pipeline().get(OUTPUT_STREAM) == null) {
-            ctx.channel().pipeline().addFirst(OUTPUT_STREAM, chos);
+            ctx.channel().pipeline().addFirst(OUTPUT_STREAM, chaos);
         }
 
         if (ctx.channel().pipeline().get(INPUT_STREAM) == null) {
-            ctx.channel().pipeline().addFirst(INPUT_STREAM, chis);
+            ctx.channel().pipeline().addFirst(INPUT_STREAM, chais);
         }
     }
 
@@ -69,7 +70,6 @@ public class VirtualSocket extends Socket implements ChannelHandler {
         ctx.fireExceptionCaught(throwable);
     }
 
-    public VirtualSocket() {super();}
 
     @Override
     public void connect(SocketAddress endpoint) throws IOException {}
@@ -83,12 +83,7 @@ public class VirtualSocket extends Socket implements ChannelHandler {
     @Override
     public InetAddress getInetAddress() {
         InetSocketAddress isa = getInetSocketAddress();
-
-        if (isa == null) {
-            throw new VirtualSocketException();
-        }
-
-        return getInetSocketAddress().getAddress();
+        return isa.getAddress();
     }
 
     @Override
@@ -187,7 +182,7 @@ public class VirtualSocket extends Socket implements ChannelHandler {
 
     @Override
     public String toString() {
-        return "Virtual socket InetAdress["+getInetAddress()+"], Port["+getPort()+"]";
+        return "VirtualSocket{" + getInetAddress() + ":" + getPort() + "}";
     }
 
     @Override
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/VirtualSocketException.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/virtualsocket/VirtualSocketException.java
deleted file mode 100644 (file)
index 626ebe9..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-/**
- * Exception class which provides notification about exceptional situations at the virtual socket layer.
- */
-// FIXME: Switch to checked exception, create a runtime exception to workaround Socket API
-public class VirtualSocketException extends RuntimeException {
-    private static final long serialVersionUID = 1L;
-}
index 8a2387d2c1d450df3e0a9bc964c435e84688844a..febf3abf8e6fcbb6b035dfafffebb784e6392f44 100644 (file)
       <artifactId>mockito-configuration</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-netty-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-client</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index 08bf9836b22135a295f2e54d68fa6c73ffeadf57..670f50ddd09f9b3b80d855ce244f4a4cce47da14 100644 (file)
@@ -7,9 +7,11 @@
  */
 package org.opendaylight.controller.netconf.ssh;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.local.LocalAddress;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.concurrent.ExecutorService;
@@ -68,6 +70,11 @@ public final class NetconfSSHServer extends Thread implements AutoCloseable {
         logger.trace("SSH server socket closed.");
     }
 
+    @VisibleForTesting
+    public InetSocketAddress getLocalSocketAddress() {
+        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
+    }
+
     @Override
     public void run() {
         while (up) {
index 8045d32a5038400c650da7ec23fce8e5527ec10e..6300c56e72d80a774891cea2409052946bfab867 100644 (file)
@@ -32,6 +32,7 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.local.LocalAddress;
 import io.netty.channel.local.LocalChannel;
 import io.netty.handler.stream.ChunkedStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -119,14 +120,14 @@ public class Handshaker implements Runnable {
 class SSHClientHandler extends ChannelInboundHandlerAdapter {
     private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class);
     private final AutoCloseable remoteConnection;
-    private final OutputStream remoteOutputStream;
+    private final BufferedOutputStream remoteOutputStream;
     private final String session;
     private ChannelHandlerContext channelHandlerContext;
 
     public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream,
                             String session) {
         this.remoteConnection = remoteConnection;
-        this.remoteOutputStream = remoteOutputStream;
+        this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream);
         this.session = session;
     }
 
@@ -137,7 +138,7 @@ class SSHClientHandler extends ChannelInboundHandlerAdapter {
     }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
         ByteBuf bb = (ByteBuf) msg;
         // we can block the server here so that slow client does not cause memory pressure
         try {
index 5d0c71aa62e0e05d57768c320367b690a50395a3..b768e2b1d1ce08d8cef9585c538837157405bdc6 100644 (file)
@@ -25,29 +25,36 @@ import org.slf4j.LoggerFactory;
  * traffic between the echo client and server by sending the first message to
  * the server.
  */
-public class EchoClient implements Runnable {
+public class EchoClient extends Thread {
     private static final Logger logger = LoggerFactory.getLogger(EchoClient.class);
 
-    private final ChannelHandler clientHandler;
 
+    private final ChannelInitializer<LocalChannel> channelInitializer;
 
-    public EchoClient(ChannelHandler clientHandler) {
-        this.clientHandler = clientHandler;
+
+    public EchoClient(final ChannelHandler clientHandler) {
+        channelInitializer = new ChannelInitializer<LocalChannel>() {
+            @Override
+            public void initChannel(LocalChannel ch) throws Exception {
+                ch.pipeline().addLast(clientHandler);
+            }
+        };
+    }
+
+    public EchoClient(ChannelInitializer<LocalChannel> channelInitializer) {
+        this.channelInitializer = channelInitializer;
     }
 
+    @Override
     public void run() {
         // Configure the client.
         EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap b = new Bootstrap();
+
             b.group(group)
                     .channel(LocalChannel.class)
-                    .handler(new ChannelInitializer<LocalChannel>() {
-                        @Override
-                        public void initChannel(LocalChannel ch) throws Exception {
-                            ch.pipeline().addLast(clientHandler);
-                        }
-                    });
+                    .handler(channelInitializer);
 
             // Start the client.
             LocalAddress localAddress = new LocalAddress("foo");
index 81182a580eff12b59a77872d416c19f6feba9a30..2a5791710a34cd7869ca4250cab4717c33b32f05 100644 (file)
@@ -13,6 +13,8 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Charsets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
@@ -23,31 +25,41 @@ import org.slf4j.LoggerFactory;
  * traffic between the echo client and server by sending the first message to
  * the server.
  */
-public class EchoClientHandler extends ChannelInboundHandlerAdapter {
+public class EchoClientHandler extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
     private static final Logger logger = LoggerFactory.getLogger(EchoClientHandler.class);
 
     private ChannelHandlerContext ctx;
+    private final StringBuilder fromServer = new StringBuilder();
+
+    public static enum State {CONNECTING, CONNECTED, FAILED_TO_CONNECT, CONNECTION_CLOSED}
+
+
+    private State state = State.CONNECTING;
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public synchronized void channelActive(ChannelHandlerContext ctx) {
         checkState(this.ctx == null);
-        logger.info("client active");
+        logger.info("channelActive");
         this.ctx = ctx;
+        state = State.CONNECTED;
     }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        ByteBuf bb = (ByteBuf) msg;
-        logger.info(">{}", bb.toString(Charsets.UTF_8));
-        bb.release();
+    public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        state = State.CONNECTION_CLOSED;
     }
 
     @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        ByteBuf bb = (ByteBuf) msg;
+        String string = bb.toString(Charsets.UTF_8);
+        fromServer.append(string);
+        logger.info(">{}", string);
+        bb.release();
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    public synchronized void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         // Close the connection when an exception is raised.
         logger.warn("Unexpected exception from downstream.", cause);
         checkState(this.ctx.equals(ctx));
@@ -55,8 +67,30 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter {
         this.ctx = null;
     }
 
-    public void write(String message) {
+    public synchronized void write(String message) {
         ByteBuf byteBuf = Unpooled.copiedBuffer(message.getBytes());
         ctx.writeAndFlush(byteBuf);
     }
+
+    public synchronized boolean isConnected() {
+        return state == State.CONNECTED;
+    }
+
+    public synchronized String read() {
+        return fromServer.toString();
+    }
+
+    @Override
+    public synchronized void operationComplete(ChannelFuture future) throws Exception {
+        checkState(state == State.CONNECTING);
+        if (future.isSuccess()) {
+            logger.trace("Successfully connected, state will be switched in channelActive");
+        } else {
+            state = State.FAILED_TO_CONNECT;
+        }
+    }
+
+    public State getState() {
+        return state;
+    }
 }
index 2bda51b495c3505741ff9c697712cb3f98a2b1ee..488c3701457039a022b1b0caed1a0d14e899641e 100644 (file)
@@ -8,12 +8,28 @@
 
 package org.opendaylight.controller.netconf.netty;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
+import com.google.common.base.Stopwatch;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.HashedWheelTimer;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.netconf.netty.EchoClientHandler.State;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler;
 import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
 import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider;
 import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
@@ -23,6 +39,21 @@ import org.slf4j.LoggerFactory;
 
 public class SSHTest {
     public static final Logger logger = LoggerFactory.getLogger(SSHTest.class);
+    public static final String AHOJ = "ahoj\n";
+    private EventLoopGroup nettyGroup;
+    HashedWheelTimer hashedWheelTimer;
+
+    @Before
+    public void setUp() throws Exception {
+        hashedWheelTimer = new HashedWheelTimer();
+        nettyGroup = new NioEventLoopGroup();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        hashedWheelTimer.stop();
+        nettyGroup.shutdownGracefully();
+    }
 
     @Test
     public void test() throws Exception {
@@ -30,10 +61,63 @@ public class SSHTest {
         AuthProvider authProvider = mock(AuthProvider.class);
         doReturn(PEMGenerator.generate().toCharArray()).when(authProvider).getPEMAsCharArray();
         doReturn(true).when(authProvider).authenticated(anyString(), anyString());
-        NetconfSSHServer thread = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(), authProvider, new NioEventLoopGroup());
-        Thread.sleep(2000);
-        logger.info("Closing socket");
-        thread.close();
-        thread.join();
+        NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(),
+                authProvider, new NioEventLoopGroup());
+
+        InetSocketAddress address = netconfSSHServer.getLocalSocketAddress();
+        final EchoClientHandler echoClientHandler = connectClient(address);
+        Stopwatch stopwatch = new Stopwatch().start();
+        while(echoClientHandler.isConnected() == false && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
+            Thread.sleep(100);
+        }
+        assertTrue(echoClientHandler.isConnected());
+        logger.info("connected, writing to client");
+        echoClientHandler.write(AHOJ);
+        // check that server sent back the same string
+        stopwatch = stopwatch.reset().start();
+        while (echoClientHandler.read().endsWith(AHOJ) == false && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
+            Thread.sleep(100);
+        }
+        try {
+            String read = echoClientHandler.read();
+            assertTrue(read + " should end with " + AHOJ, read.endsWith(AHOJ));
+        } finally {
+            logger.info("Closing socket");
+            netconfSSHServer.close();
+            netconfSSHServer.join();
+        }
     }
+
+    public EchoClientHandler connectClient(InetSocketAddress address) {
+        final EchoClientHandler echoClientHandler = new EchoClientHandler();
+        ChannelInitializer<NioSocketChannel> channelInitializer = new ChannelInitializer<NioSocketChannel>() {
+            @Override
+            public void initChannel(NioSocketChannel ch) throws Exception {
+                ch.pipeline().addFirst(SshHandler.createForNetconfSubsystem(new LoginPassword("a", "a")));
+                ch.pipeline().addLast(echoClientHandler);
+            }
+        };
+        Bootstrap b = new Bootstrap();
+
+        b.group(nettyGroup)
+                .channel(NioSocketChannel.class)
+                .handler(channelInitializer);
+
+        // Start the client.
+        b.connect(address).addListener(echoClientHandler);
+        return echoClientHandler;
+    }
+
+    @Test
+    public void testClientWithoutServer() throws Exception {
+        InetSocketAddress address = new InetSocketAddress(12345);
+        final EchoClientHandler echoClientHandler = connectClient(address);
+        Stopwatch stopwatch = new Stopwatch().start();
+        while(echoClientHandler.getState() == State.CONNECTING && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
+            Thread.sleep(100);
+        }
+        assertFalse(echoClientHandler.isConnected());
+        assertEquals(State.FAILED_TO_CONNECT, echoClientHandler.getState());
+    }
+
 }