*/
package org.opendaylight.controller.sal.connect.netconf;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.controller.sal.connect.api.MessageTransformer;
import org.opendaylight.controller.sal.connect.api.RemoteDevice;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
private final MessageTransformer<NetconfMessage> messageTransformer;
private final SchemaContextProviderFactory schemaContextProviderFactory;
private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+ private final NotificationHandler notificationHandler;
public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
this.sourceProviderFactory = sourceProviderFactory;
this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
this.schemaContextProviderFactory = schemaContextProviderFactory;
+ this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
}
@Override
final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
updateMessageTransformer(schemaContextProvider);
salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp();
}
});
@Override
public void onNotification(final NetconfMessage notification) {
- final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
- salFacade.onNotification(parsedNotification);
+ notificationHandler.handleNotification(notification);
+ }
+
+ /**
+ * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+ */
+ private final static class NotificationHandler {
+
+ private final RemoteDeviceHandler<?> salFacade;
+ private final List<NetconfMessage> cache = new LinkedList<>();
+ private final MessageTransformer<NetconfMessage> messageTransformer;
+ private boolean passNotifications = false;
+ private final RemoteDeviceId id;
+
+ NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+ this.salFacade = salFacade;
+ this.messageTransformer = messageTransformer;
+ this.id = id;
+ }
+
+ synchronized void handleNotification(final NetconfMessage notification) {
+ if(passNotifications) {
+ passNotification(messageTransformer.toNotification(notification));
+ } else {
+ cacheNotification(notification);
+ }
+ }
+
+ /**
+ * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+ */
+ synchronized void onRemoteSchemaUp() {
+ passNotifications = true;
+
+ for (final NetconfMessage cachedNotification : cache) {
+ passNotification(messageTransformer.toNotification(cachedNotification));
+ }
+
+ cache.clear();
+ }
+
+ private void cacheNotification(final NetconfMessage notification) {
+ Preconditions.checkState(passNotifications == false);
+
+ logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+ if(logger.isTraceEnabled()) {
+ logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+ }
+
+ cache.add(notification);
+ }
+
+ private void passNotification(final CompositeNode parsedNotification) {
+ logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+ Preconditions.checkNotNull(parsedNotification);
+ salFacade.onNotification(parsedNotification);
+ }
+
}
}
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.InputStream;
Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
}
+ @Test
+ public void testNotificationBeforeSchema() throws Exception {
+ final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+ final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+ final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
+ final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory());
+
+ device.onNotification(netconfMessage);
+ device.onNotification(netconfMessage);
+
+ verify(facade, times(0)).onNotification(any(CompositeNode.class));
+
+ final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+ Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
+
+ device.onRemoteSessionUp(sessionCaps, listener);
+
+ verify(messageTransformer, timeout(10000).times(2)).toNotification(netconfMessage);
+ verify(facade, times(2)).onNotification(compositeNode);
+
+ device.onNotification(netconfMessage);
+ verify(messageTransformer, times(3)).toNotification(netconfMessage);
+ verify(facade, times(3)).onNotification(compositeNode);
+ }
+
@Test
public void testNetconfDeviceReconnect() throws Exception {
final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
+ doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
return remoteDeviceHandler;
}
final MessageTransformer<NetconfMessage> messageTransformer = mockClass(MessageTransformer.class);
doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class));
doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class));
+ doReturn(compositeNode).when(messageTransformer).toNotification(any(NetconfMessage.class));
doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class));
return messageTransformer;
}