Bug-1376 Add caching for premature notifications in netconf connector 50/9050/5
authorMaros Marsalek <mmarsale@cisco.com>
Wed, 16 Jul 2014 08:29:47 +0000 (10:29 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Thu, 17 Jul 2014 14:28:08 +0000 (14:28 +0000)
If a notification is received before schema context is fully built for device, it is cached and forwarded when schema is present.

Change-Id: Ib21fc0fab9cf83a1102ccbb5c9dc98330aa95577
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java

index dca8fcafef4a88e31ecc6898dc52bc12b7aa5936..de4ac7ac18a106f88f530ac4dc16ea047976b091 100644 (file)
@@ -7,15 +7,19 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
@@ -36,9 +40,6 @@ import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
  */
@@ -53,6 +54,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     private final MessageTransformer<NetconfMessage> messageTransformer;
     private final SchemaContextProviderFactory schemaContextProviderFactory;
     private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+    private final NotificationHandler notificationHandler;
 
     public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
@@ -79,6 +81,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
         this.sourceProviderFactory = sourceProviderFactory;
         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
         this.schemaContextProviderFactory = schemaContextProviderFactory;
+        this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
     }
 
     @Override
@@ -99,6 +102,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                 final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
                 updateMessageTransformer(schemaContextProvider);
                 salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+                notificationHandler.onRemoteSchemaUp();
             }
         });
 
@@ -142,7 +146,63 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
 
     @Override
     public void onNotification(final NetconfMessage notification) {
-        final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
-        salFacade.onNotification(parsedNotification);
+        notificationHandler.handleNotification(notification);
+    }
+
+    /**
+     * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+     */
+    private final static class NotificationHandler {
+
+        private final RemoteDeviceHandler<?> salFacade;
+        private final List<NetconfMessage> cache = new LinkedList<>();
+        private final MessageTransformer<NetconfMessage> messageTransformer;
+        private boolean passNotifications = false;
+        private final RemoteDeviceId id;
+
+        NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+            this.salFacade = salFacade;
+            this.messageTransformer = messageTransformer;
+            this.id = id;
+        }
+
+        synchronized void handleNotification(final NetconfMessage notification) {
+            if(passNotifications) {
+                passNotification(messageTransformer.toNotification(notification));
+            } else {
+                cacheNotification(notification);
+            }
+        }
+
+        /**
+         * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+         */
+        synchronized void onRemoteSchemaUp() {
+            passNotifications = true;
+
+            for (final NetconfMessage cachedNotification : cache) {
+                passNotification(messageTransformer.toNotification(cachedNotification));
+            }
+
+            cache.clear();
+        }
+
+        private void cacheNotification(final NetconfMessage notification) {
+            Preconditions.checkState(passNotifications == false);
+
+            logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+            if(logger.isTraceEnabled()) {
+                logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+            }
+
+            cache.add(notification);
+        }
+
+        private void passNotification(final CompositeNode parsedNotification) {
+            logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+            Preconditions.checkNotNull(parsedNotification);
+            salFacade.onNotification(parsedNotification);
+        }
+
     }
 }
index c1b9f7b47ba8b8009b620655a729703e1809346f..cd81a869f1cf0cdfe073250276fe93367507d9fc 100644 (file)
@@ -12,6 +12,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.InputStream;
@@ -83,6 +84,32 @@ public class NetconfDeviceTest {
         Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
     }
 
+    @Test
+    public void testNotificationBeforeSchema() throws Exception {
+        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+        final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory());
+
+        device.onNotification(netconfMessage);
+        device.onNotification(netconfMessage);
+
+        verify(facade, times(0)).onNotification(any(CompositeNode.class));
+
+        final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+                Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&amp;revision=" + TEST_REVISION));
+
+        device.onRemoteSessionUp(sessionCaps, listener);
+
+        verify(messageTransformer, timeout(10000).times(2)).toNotification(netconfMessage);
+        verify(facade, times(2)).onNotification(compositeNode);
+
+        device.onNotification(netconfMessage);
+        verify(messageTransformer, times(3)).toNotification(netconfMessage);
+        verify(facade, times(3)).onNotification(compositeNode);
+    }
+
     @Test
     public void testNetconfDeviceReconnect() throws Exception {
         final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
@@ -137,6 +164,7 @@ public class NetconfDeviceTest {
         final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
         doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
         doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
+        doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
         return remoteDeviceHandler;
     }
 
@@ -174,6 +202,7 @@ public class NetconfDeviceTest {
         final MessageTransformer<NetconfMessage> messageTransformer = mockClass(MessageTransformer.class);
         doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class));
         doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class));
+        doReturn(compositeNode).when(messageTransformer).toNotification(any(NetconfMessage.class));
         doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class));
         return messageTransformer;
     }