Merge "Fix config-manager activator"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
index dca8fcafef4a88e31ecc6898dc52bc12b7aa5936..350132cf99a5dfb05681b0732912e7a4220335d2 100644 (file)
@@ -7,15 +7,13 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
@@ -38,6 +36,11 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
@@ -53,10 +56,20 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     private final MessageTransformer<NetconfMessage> messageTransformer;
     private final SchemaContextProviderFactory schemaContextProviderFactory;
     private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+    private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+    private final NotificationHandler notificationHandler;
 
     public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
             final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
+        return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+    }
+
+    @VisibleForTesting
+    protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
+            final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
+            final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+            final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
 
         return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
                 new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
@@ -65,20 +78,23 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                         return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
                                 deviceRpc));
                     }
-                });
+                }, stateSchemasResolver);
     }
 
     @VisibleForTesting
     protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
-            final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
-            final SchemaContextProviderFactory schemaContextProviderFactory,
-            final SchemaSourceProviderFactory<InputStream> sourceProviderFactory) {
+                            final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
+                            final SchemaContextProviderFactory schemaContextProviderFactory,
+                            final SchemaSourceProviderFactory<InputStream> sourceProviderFactory,
+                            final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
         this.id = id;
         this.messageTransformer = messageTransformer;
         this.salFacade = salFacade;
         this.sourceProviderFactory = sourceProviderFactory;
+        this.stateSchemasResolver = stateSchemasResolver;
         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
         this.schemaContextProviderFactory = schemaContextProviderFactory;
+        this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
     }
 
     @Override
@@ -95,10 +111,16 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
             @Override
             public void run() {
                 final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
+
+                final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
+                logger.warn("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
+                // TODO use this for shared schema context
+
                 final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
                 final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
                 updateMessageTransformer(schemaContextProvider);
                 salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+                notificationHandler.onRemoteSchemaUp();
             }
         });
 
@@ -114,6 +136,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                 // Unable to initialize device, set as disconnected
                 logger.error("{}: Initialization failed", id, t);
                 salFacade.onDeviceDisconnected();
+                // TODO ssh connection is still open if sal initialization fails
             }
         });
     }
@@ -142,7 +165,63 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
 
     @Override
     public void onNotification(final NetconfMessage notification) {
-        final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
-        salFacade.onNotification(parsedNotification);
+        notificationHandler.handleNotification(notification);
     }
+
+    /**
+     * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+     */
+    private final static class NotificationHandler {
+
+        private final RemoteDeviceHandler<?> salFacade;
+        private final List<NetconfMessage> cache = new LinkedList<>();
+        private final MessageTransformer<NetconfMessage> messageTransformer;
+        private boolean passNotifications = false;
+        private final RemoteDeviceId id;
+
+        NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+            this.salFacade = salFacade;
+            this.messageTransformer = messageTransformer;
+            this.id = id;
+        }
+
+        synchronized void handleNotification(final NetconfMessage notification) {
+            if(passNotifications) {
+                passNotification(messageTransformer.toNotification(notification));
+            } else {
+                cacheNotification(notification);
+            }
+        }
+
+        /**
+         * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+         */
+        synchronized void onRemoteSchemaUp() {
+            passNotifications = true;
+
+            for (final NetconfMessage cachedNotification : cache) {
+                passNotification(messageTransformer.toNotification(cachedNotification));
+            }
+
+            cache.clear();
+        }
+
+        private void cacheNotification(final NetconfMessage notification) {
+            Preconditions.checkState(passNotifications == false);
+
+            logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+            if(logger.isTraceEnabled()) {
+                logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+            }
+
+            cache.add(notification);
+        }
+
+        private void passNotification(final CompositeNode parsedNotification) {
+            logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+            Preconditions.checkNotNull(parsedNotification);
+            salFacade.onNotification(parsedNotification);
+        }
+    }
+
 }