Merge "BUG-1384: YangStoreServiceImpl.refresh() should never block"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 18 Jul 2014 09:07:03 +0000 (09:07 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 18 Jul 2014 09:07:03 +0000 (09:07 +0000)
14 files changed:
opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java
opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/yang/threadpool/impl/flexible/FlexibleThreadPoolModule.java
opendaylight/config/threadpool-config-impl/src/main/yang/threadpool-impl-flexible.yang
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/RpcProviderRegistryImpl.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTwoPhaseCommitTransaction.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowComparator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionNegotiator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEOMAggregator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToXMLEncoder.java

index 3dfa6e2bc756419b18f3224b32b127255e00d35d..5036399828a539e5ec5deb451881f3e4e9218b68 100644 (file)
@@ -10,15 +10,20 @@ package org.opendaylight.controller.config.threadpool.util;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 
+import com.google.common.base.Optional;
+
 /**
  * Implementation of {@link ThreadPool} using flexible number of threads wraps
  * {@link ExecutorService}.
@@ -28,12 +33,33 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
 
     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
             ThreadFactory threadFactory) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
+    }
+
+    public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+            ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+    }
+
+    private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+            ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
 
         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
-                new SynchronousQueue<Runnable>(), threadFactory);
+                queue, threadFactory, new FlexibleRejectionHandler());
         executor.prestartAllCoreThreads();
     }
 
+    /**
+     * Overriding the queue:
+     * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+     * occurs in RejectedExecutionHandler.
+     * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+     */
+    private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
+        final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
+        return new ForwardingBlockingQueue(delegate);
+    }
+
     @Override
     public ExecutorService getExecutor() {
         return Executors.unconfigurableExecutorService(executor);
@@ -77,4 +103,37 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
         executor.shutdown();
     }
 
+    /**
+     * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+     */
+    private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+        @Override
+        public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+            try {
+                executor.getQueue().put(r);
+            } catch (InterruptedException e) {
+                throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+            }
+        }
+    }
+
+    private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+        private final BlockingQueue<Runnable> delegate;
+
+        public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected BlockingQueue<Runnable> delegate() {
+            return delegate;
+        }
+
+        @Override
+        public boolean offer(final Runnable r) {
+            // ThreadPoolExecutor will spawn a new thread after core size is reached only
+            // if the queue.offer returns false.
+            return false;
+        }
+    }
 }
index 94639d43c0248787c736a2d7df7738597a4ef975..d6abe168fbedfa9a7d2db6fc16d1151e621ca874 100644 (file)
@@ -17,6 +17,7 @@
 */
 package org.opendaylight.controller.config.yang.threadpool.impl.flexible;
 
+import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.config.api.JmxAttributeValidationException;
@@ -50,11 +51,15 @@ public final class FlexibleThreadPoolModule extends org.opendaylight.controller.
         JmxAttributeValidationException.checkNotNull(getMaxThreadCount(), maxThreadCountJmxAttribute);
         JmxAttributeValidationException.checkCondition(getMaxThreadCount() > 0, "must be greater than zero",
                 maxThreadCountJmxAttribute);
+
+        if(getQueueCapacity() != null) {
+            JmxAttributeValidationException.checkCondition(getQueueCapacity() > 0, "Queue capacity cannot be < 1", queueCapacityJmxAttribute);
+        }
     }
 
     @Override
     public java.lang.AutoCloseable createInstance() {
         return new FlexibleThreadPoolWrapper(getMinThreadCount(), getMaxThreadCount(), getKeepAliveMillis(),
-                TimeUnit.MILLISECONDS, getThreadFactoryDependency());
+                TimeUnit.MILLISECONDS, getThreadFactoryDependency(), Optional.fromNullable(getQueueCapacity()));
     }
 }
index be275ef4870b3797e5195d61380a8c5935f3ff83..c124f6388fb2f5f41b1c48a86f55231ea4c023b4 100644 (file)
@@ -46,6 +46,12 @@ module threadpool-impl-flexible {
                 type uint32;
             }
 
+            leaf queueCapacity {
+                type uint16;
+                mandatory false;
+                description "Capacity of queue that holds waiting tasks";
+            }
+
             container threadFactory {
                 uses config:service-ref {
                     refine type {
index 8fedbe4d4c1c1b35d7404fa9f5c3daf4ce3997f0..f81a332ab6e8a2832b62e0e9a7ff164b1488ab46 100644 (file)
@@ -12,7 +12,7 @@
     <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
       <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
 
-        <!-- Netconf dispatcher to be used by all netconf-connectors --> 
+        <!-- Netconf dispatcher to be used by all netconf-connectors -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher">prefix:netconf-client-dispatcher</type>
           <name>global-netconf-dispatcher</name>
           </timer>
         </module>
 
-        <!-- Thread factory to be used by all threadpools in netconf-connectors --> 
+        <!-- Thread factory to be used by all threadpools in netconf-connectors -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
           <name>global-netconf-processing-executor-threadfactory</name>
           <name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
-        </module> 
-        <!-- Flexible threadpool for all netconf connectors, Max thread count is set to 4 --> 
+        </module>
+        <!-- flexible threadpool for all netconf connectors, Max thread count is set to 4.  -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">prefix:threadpool-flexible</type>
           <name>global-netconf-processing-executor</name>
           <minThreadCount xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">1</minThreadCount>
           <max-thread-count xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">4</max-thread-count>
           <keepAliveMillis xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">600000</keepAliveMillis>
+
           <threadFactory xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
             <name>global-netconf-processing-executor-threadfactory</name>
           </threadFactory>
-        </module>  
+        </module>
       </modules>
 
       <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
index 952d84d885c41e73fe2312bf22c723f62cc44b35..c61ec4926a65f58f31fd03657162b0af70b23c0e 100644 (file)
@@ -9,6 +9,10 @@ package org.opendaylight.controller.sal.binding.impl;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 import java.util.EventListener;
 import java.util.HashMap;
 import java.util.Map;
@@ -37,14 +41,21 @@ import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RpcProviderRegistryImpl implements //
-        RpcProviderRegistry, //
-        RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
+public class RpcProviderRegistryImpl implements RpcProviderRegistry, RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
 
     private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
 
-    // publicProxies is a cache of proxy objects where each value in the map corresponds to a specific RpcService
-    private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
+    // cache of proxy objects where each value in the map corresponds to a specific RpcService
+    private final LoadingCache<Class<? extends RpcService>, RpcService> publicProxies = CacheBuilder.newBuilder().weakKeys().
+            build(new CacheLoader<Class<? extends RpcService>, RpcService>() {
+                @Override
+                public RpcService load(final Class<? extends RpcService> type) {
+                    final RpcService proxy = rpcFactory.getDirectProxyFor(type);
+                    LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
+                    return proxy;
+                }
+            });
+
     private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
     private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
             .create();
@@ -93,26 +104,7 @@ public class RpcProviderRegistryImpl implements //
     @SuppressWarnings("unchecked")
     @Override
     public final <T extends RpcService> T getRpcService(final Class<T> type) {
-
-        T potentialProxy = (T) publicProxies.get(type);
-        if (potentialProxy != null) {
-            return potentialProxy;
-        }
-        synchronized (this) {
-            /**
-             * Potential proxy could be instantiated by other thread while we
-             * were waiting for the lock.
-             */
-
-            potentialProxy = (T) publicProxies.get(type);
-            if (potentialProxy != null) {
-                return potentialProxy;
-            }
-            T proxy = rpcFactory.getDirectProxyFor(type);
-            LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
-            publicProxies.put(type, proxy);
-            return proxy;
-        }
+        return (T) publicProxies.getUnchecked(type);
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -205,8 +197,7 @@ public class RpcProviderRegistryImpl implements //
 
     }
 
-    private class RouteChangeForwarder<T extends RpcService> implements
-            RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+    private class RouteChangeForwarder<T extends RpcService> implements RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
 
         private final Class<T> type;
 
@@ -240,8 +231,7 @@ public class RpcProviderRegistryImpl implements //
         }
     }
 
-    public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements
-            RpcRegistration<T> {
+    public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
 
         private final Class<T> serviceType;
         private RpcProviderRegistryImpl registry;
index 3ed02dfb411fcb26cda91cc8cf227cf9f2ef4a0e..01a5989dcd8a6b5afa67a50844e540cbd68c80c5 100644 (file)
@@ -1,14 +1,17 @@
 package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
 
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executors;
+
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
-    public InMemoryConfigDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+    public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public InMemoryConfigDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryConfigDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+    public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryConfigDataStoreProviderModule oldModule, final java.lang.AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
@@ -19,7 +22,7 @@ public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.cont
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-      InMemoryDOMDataStore   ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.sameThreadExecutor());
+      InMemoryDOMDataStore   ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
       getSchemaServiceDependency().registerSchemaServiceListener(ids);
       return ids;
     }
index eea95990a1509a4231c2d5452ce661cb6c3270c5..172b0dbc01b0deb3e384a12a46722806db273bbc 100644 (file)
@@ -1,14 +1,17 @@
 package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
 
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executors;
+
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
-    public InMemoryOperationalDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+    public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public InMemoryOperationalDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryOperationalDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+    public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryOperationalDataStoreProviderModule oldModule, final java.lang.AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
@@ -19,7 +22,7 @@ public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-      InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.sameThreadExecutor());
+      InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
       getSchemaServiceDependency().registerSchemaServiceListener(ids);
       return ids;
     }
index dca8fcafef4a88e31ecc6898dc52bc12b7aa5936..de4ac7ac18a106f88f530ac4dc16ea047976b091 100644 (file)
@@ -7,15 +7,19 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
@@ -36,9 +40,6 @@ import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
  */
@@ -53,6 +54,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     private final MessageTransformer<NetconfMessage> messageTransformer;
     private final SchemaContextProviderFactory schemaContextProviderFactory;
     private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+    private final NotificationHandler notificationHandler;
 
     public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
@@ -79,6 +81,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
         this.sourceProviderFactory = sourceProviderFactory;
         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
         this.schemaContextProviderFactory = schemaContextProviderFactory;
+        this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
     }
 
     @Override
@@ -99,6 +102,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                 final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
                 updateMessageTransformer(schemaContextProvider);
                 salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+                notificationHandler.onRemoteSchemaUp();
             }
         });
 
@@ -142,7 +146,63 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
 
     @Override
     public void onNotification(final NetconfMessage notification) {
-        final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
-        salFacade.onNotification(parsedNotification);
+        notificationHandler.handleNotification(notification);
+    }
+
+    /**
+     * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+     */
+    private final static class NotificationHandler {
+
+        private final RemoteDeviceHandler<?> salFacade;
+        private final List<NetconfMessage> cache = new LinkedList<>();
+        private final MessageTransformer<NetconfMessage> messageTransformer;
+        private boolean passNotifications = false;
+        private final RemoteDeviceId id;
+
+        NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+            this.salFacade = salFacade;
+            this.messageTransformer = messageTransformer;
+            this.id = id;
+        }
+
+        synchronized void handleNotification(final NetconfMessage notification) {
+            if(passNotifications) {
+                passNotification(messageTransformer.toNotification(notification));
+            } else {
+                cacheNotification(notification);
+            }
+        }
+
+        /**
+         * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+         */
+        synchronized void onRemoteSchemaUp() {
+            passNotifications = true;
+
+            for (final NetconfMessage cachedNotification : cache) {
+                passNotification(messageTransformer.toNotification(cachedNotification));
+            }
+
+            cache.clear();
+        }
+
+        private void cacheNotification(final NetconfMessage notification) {
+            Preconditions.checkState(passNotifications == false);
+
+            logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+            if(logger.isTraceEnabled()) {
+                logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+            }
+
+            cache.add(notification);
+        }
+
+        private void passNotification(final CompositeNode parsedNotification) {
+            logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+            Preconditions.checkNotNull(parsedNotification);
+            salFacade.onNotification(parsedNotification);
+        }
+
     }
 }
index 0ef76d330dbab15d908401a695e61ba25b0d1802..960f2ef2e80f094c3155e9d8a3632008ce7f13d6 100644 (file)
@@ -44,6 +44,7 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.ModifyAction;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
@@ -90,16 +91,14 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
     }
 
     private void sendMerge(final InstanceIdentifier key, final CompositeNode value) throws InterruptedException, ExecutionException {
-        sendEditRpc(createEditConfigStructure(key, Optional.<String>absent(), Optional.of(value)), Optional.<String>absent());
+        sendEditRpc(createEditConfigStructure(key, Optional.<ModifyAction>absent(), Optional.of(value)), Optional.<ModifyAction>absent());
     }
 
     private void sendDelete(final InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
-        // FIXME use org.opendaylight.yangtools.yang.data.api.ModifyAction instead of strings
-        // TODO add string lowercase value to ModifyAction enum entries
-        sendEditRpc(createEditConfigStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode>absent()), Optional.of("none"));
+        sendEditRpc(createEditConfigStructure(toDelete, Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
     }
 
-    private void sendEditRpc(final CompositeNode editStructure, final Optional<String> defaultOperation) throws InterruptedException, ExecutionException {
+    private void sendEditRpc(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) throws InterruptedException, ExecutionException {
         final ImmutableCompositeNode editConfigRequest = createEditConfigRequest(editStructure, defaultOperation);
         final RpcResult<CompositeNode> rpcResult = rpc.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, editConfigRequest).get();
 
@@ -110,7 +109,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         }
     }
 
-    private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<String> defaultOperation) {
+    private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) {
         final CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
 
         // Target
@@ -119,7 +118,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
 
         // Default operation
         if(defaultOperation.isPresent()) {
-            SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, defaultOperation.get());
+            final SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, modifyOperationToXmlString(defaultOperation.get()));
             ret.add(defOp);
         }
 
@@ -134,7 +133,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         return ret.toInstance();
     }
 
-    private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<String> operation,
+    private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<ModifyAction> operation,
             final Optional<CompositeNode> lastChildOverride) {
         Preconditions.checkArgument(Iterables.isEmpty(dataPath.getPathArguments()) == false, "Instance identifier with empty path %s", dataPath);
 
@@ -174,7 +173,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         return predicates;
     }
 
-    private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<String> operation, final Optional<CompositeNode> lastChildOverride) {
+    private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
         final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
         builder.setQName(arg.getNodeType());
 
@@ -182,7 +181,7 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         addPredicatesToCompositeNodeBuilder(predicates, builder);
 
         if (operation.isPresent()) {
-            builder.setAttribute(NETCONF_OPERATION_QNAME, operation.get());
+            builder.setAttribute(NETCONF_OPERATION_QNAME, modifyOperationToXmlString(operation.get()));
         }
         if (lastChildOverride.isPresent()) {
             final List<Node<?>> children = lastChildOverride.get().getValue();
@@ -196,6 +195,10 @@ final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransact
         return builder.toInstance();
     }
 
+    private String modifyOperationToXmlString(final ModifyAction operation) {
+        return operation.name().toLowerCase();
+    }
+
     /**
      * Send commit rpc to finish the transaction
      * In case of failure or unexpected error response, ExecutionException is thrown
index ec2a820daab098618461d617155fa6804a369326..defaab629f3684de2be6a5b1d7a578749cd493e0 100644 (file)
@@ -12,6 +12,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.InputStream;
@@ -82,6 +83,32 @@ public class NetconfDeviceTest {
         Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
     }
 
+    @Test
+    public void testNotificationBeforeSchema() throws Exception {
+        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+        final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory());
+
+        device.onNotification(netconfMessage);
+        device.onNotification(netconfMessage);
+
+        verify(facade, times(0)).onNotification(any(CompositeNode.class));
+
+        final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+                Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&amp;revision=" + TEST_REVISION));
+
+        device.onRemoteSessionUp(sessionCaps, listener);
+
+        verify(messageTransformer, timeout(10000).times(2)).toNotification(netconfMessage);
+        verify(facade, times(2)).onNotification(compositeNode);
+
+        device.onNotification(netconfMessage);
+        verify(messageTransformer, times(3)).toNotification(netconfMessage);
+        verify(facade, times(3)).onNotification(compositeNode);
+    }
+
     @Test
     public void testNetconfDeviceReconnect() throws Exception {
         final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
@@ -136,6 +163,7 @@ public class NetconfDeviceTest {
         final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
         doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
         doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
+        doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
         return remoteDeviceHandler;
     }
 
@@ -173,6 +201,7 @@ public class NetconfDeviceTest {
         final MessageTransformer<NetconfMessage> messageTransformer = mockClass(MessageTransformer.class);
         doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class));
         doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class));
+        doReturn(compositeNode).when(messageTransformer).toNotification(any(NetconfMessage.class));
         doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class));
         return messageTransformer;
     }
@@ -197,4 +226,4 @@ public class NetconfDeviceTest {
         doReturn(Futures.immediateFuture(rpcResult)).when(remoteDeviceCommunicator).sendRequest(any(NetconfMessage.class), any(QName.class));
         return remoteDeviceCommunicator;
     }
-}
\ No newline at end of file
+}
index af61db1a8096874945fe92f7fc5478c948b0f5da..b1db280c2472802aed2e97ce525422308eca3b12 100644 (file)
@@ -95,6 +95,8 @@ final class FlowComparator {
         if (statsFlow == storedFlow) {
             return true;
         }
+        if (storedFlow == null && statsFlow != null) return false;
+        if (statsFlow == null && storedFlow != null) return false;
         if (storedFlow.getClass() != statsFlow.getClass()) {
             return false;
         }
index 1360a54d6fbaf2609067f1617337e8af76ec7852..de3f732b25763fa19d6b481a64bfbcf4d8bcf87c 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.nettyutil;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -104,7 +105,7 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
 
     private void start() {
         final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
-        logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
+        logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
 
         channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
 
@@ -125,12 +126,20 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
                         // Do not fail negotiation if promise is done or canceled
                         // It would result in setting result of the promise second time and that throws exception
                         if (isPromiseFinished() == false) {
-                            // FIXME BUG-1365 calling "negotiation failed" closes the channel, but the channel does not get closed if data is still being transferred
-                            // Loopback connection initiation might
                             negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+                            changeState(State.FAILED);
+
+                            channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+                                @Override
+                                public void operationComplete(ChannelFuture future) throws Exception {
+                                    if(future.isSuccess()) {
+                                        logger.debug("Channel {} closed: success", future.channel());
+                                    } else {
+                                        logger.warn("Channel {} closed: fail", future.channel());
+                                    }
+                                }
+                            });
                         }
-
-                        changeState(State.FAILED);
                     } else if(channel.isOpen()) {
                         channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
                     }
@@ -214,9 +223,9 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
     protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
 
     private synchronized void changeState(final State newState) {
-        logger.debug("Changing state from : {} to : {}", state, newState);
-        Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
-                newState);
+        logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
+        Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state,
+                newState, channel);
         this.state = newState;
     }
 
index f260bcbcefbf0fff5baa41bda46b8b6c4efc617a..a87a08ded72ea97db3396c24697b780be501911d 100644 (file)
@@ -9,56 +9,15 @@
 package org.opendaylight.controller.netconf.nettyutil.handler;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
+public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
 
-public class NetconfEOMAggregator extends ByteToMessageDecoder {
-    private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+    public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(NetconfMessageConstants.END_OF_MESSAGE);
 
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
-        int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
-        if (index == -1) {
-            logger.debug("Message is not complete, read again.");
-            if (logger.isTraceEnabled()) {
-                String str = in.toString(Charsets.UTF_8);
-                logger.trace("Message read so far: {}", str);
-            }
-            ctx.read();
-        } else {
-            ByteBuf msg = in.readBytes(index);
-            in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
-            in.discardReadBytes();
-            logger.debug("Message is complete.");
-            out.add(msg);
-        }
+    public NetconfEOMAggregator() {
+        super(Integer.MAX_VALUE, DELIMITER);
     }
-
-    private int indexOfSequence(ByteBuf in, byte[] sequence) {
-        int index = -1;
-        for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) {
-            if (in.getByte(i) == sequence[0]) {
-                index = i;
-                for (int j = 1; j < sequence.length; j++) {
-                    if (in.getByte(i + j) != sequence[j]) {
-                        index = -1;
-                        break;
-                    }
-                }
-                if (index != -1) {
-                    return index;
-                }
-            }
-        }
-        return index;
-    }
-
 }
index fae2000bb53c7b0873f90cc677464ef04be6666a..d810a870ff29f68b59b19788bf3064e0218cb148 100644 (file)
@@ -12,6 +12,7 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -61,7 +62,8 @@ public class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMess
             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
 
-            StreamResult result = new StreamResult(new OutputStreamWriter(os));
+            // Wrap OutputStreamWriter with BufferedWriter as suggested in javadoc for OutputStreamWriter
+            StreamResult result = new StreamResult(new BufferedWriter(new OutputStreamWriter(os)));
             DOMSource source = new DOMSource(msg.getDocument());
             transformer.transform(source, result);
         }