From 1989d4582af76979d85d9dd82787494001cfda7a Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Wed, 16 Jul 2014 10:29:47 +0200 Subject: [PATCH] Bug-1376 Add caching for premature notifications in netconf connector If a notification is received before schema context is fully built for device, it is cached and forwarded when schema is present. Change-Id: Ib21fc0fab9cf83a1102ccbb5c9dc98330aa95577 Signed-off-by: Maros Marsalek --- .../sal/connect/netconf/NetconfDevice.java | 72 +++++++++++++++++-- .../connect/netconf/NetconfDeviceTest.java | 29 ++++++++ 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index dca8fcafef..de4ac7ac18 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,15 +7,19 @@ */ package org.opendaylight.controller.sal.connect.netconf; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.InputStream; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutorService; - import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; @@ -36,9 +40,6 @@ import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade */ @@ -53,6 +54,7 @@ public final class NetconfDevice implements RemoteDevice messageTransformer; private final SchemaContextProviderFactory schemaContextProviderFactory; private final SchemaSourceProviderFactory sourceProviderFactory; + private final NotificationHandler notificationHandler; public static NetconfDevice createNetconfDevice(final RemoteDeviceId id, final AbstractCachingSchemaSourceProvider schemaSourceProvider, @@ -79,6 +81,7 @@ public final class NetconfDevice implements RemoteDevice salFacade; + private final List cache = new LinkedList<>(); + private final MessageTransformer messageTransformer; + private boolean passNotifications = false; + private final RemoteDeviceId id; + + NotificationHandler(final RemoteDeviceHandler salFacade, final MessageTransformer messageTransformer, final RemoteDeviceId id) { + this.salFacade = salFacade; + this.messageTransformer = messageTransformer; + this.id = id; + } + + synchronized void handleNotification(final NetconfMessage notification) { + if(passNotifications) { + passNotification(messageTransformer.toNotification(notification)); + } else { + cacheNotification(notification); + } + } + + /** + * Forward all cached notifications and pass all notifications from this point directly to sal facade. + */ + synchronized void onRemoteSchemaUp() { + passNotifications = true; + + for (final NetconfMessage cachedNotification : cache) { + passNotification(messageTransformer.toNotification(cachedNotification)); + } + + cache.clear(); + } + + private void cacheNotification(final NetconfMessage notification) { + Preconditions.checkState(passNotifications == false); + + logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification); + if(logger.isTraceEnabled()) { + logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument())); + } + + cache.add(notification); + } + + private void passNotification(final CompositeNode parsedNotification) { + logger.debug("{}: Forwarding notification {}", id, parsedNotification); + Preconditions.checkNotNull(parsedNotification); + salFacade.onNotification(parsedNotification); + } + } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java index c1b9f7b47b..cd81a869f1 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java @@ -12,6 +12,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.InputStream; @@ -83,6 +84,32 @@ public class NetconfDeviceTest { Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected(); } + @Test + public void testNotificationBeforeSchema() throws Exception { + final RemoteDeviceHandler facade = getFacade(); + final RemoteDeviceCommunicator listener = getListener(); + + final MessageTransformer messageTransformer = getMessageTransformer(); + final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory()); + + device.onNotification(netconfMessage); + device.onNotification(netconfMessage); + + verify(facade, times(0)).onNotification(any(CompositeNode.class)); + + final NetconfSessionCapabilities sessionCaps = getSessionCaps(true, + Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION)); + + device.onRemoteSessionUp(sessionCaps, listener); + + verify(messageTransformer, timeout(10000).times(2)).toNotification(netconfMessage); + verify(facade, times(2)).onNotification(compositeNode); + + device.onNotification(netconfMessage); + verify(messageTransformer, times(3)).toNotification(netconfMessage); + verify(facade, times(3)).onNotification(compositeNode); + } + @Test public void testNetconfDeviceReconnect() throws Exception { final RemoteDeviceHandler facade = getFacade(); @@ -137,6 +164,7 @@ public class NetconfDeviceTest { final RemoteDeviceHandler remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class); doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class)); doNothing().when(remoteDeviceHandler).onDeviceDisconnected(); + doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class)); return remoteDeviceHandler; } @@ -174,6 +202,7 @@ public class NetconfDeviceTest { final MessageTransformer messageTransformer = mockClass(MessageTransformer.class); doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class)); doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class)); + doReturn(compositeNode).when(messageTransformer).toNotification(any(NetconfMessage.class)); doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class)); return messageTransformer; } -- 2.36.6