*/
package org.opendaylight.controller.sal.connect.netconf;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.controller.sal.connect.api.MessageTransformer;
import org.opendaylight.controller.sal.connect.api.RemoteDevice;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
private final MessageTransformer<NetconfMessage> messageTransformer;
private final SchemaContextProviderFactory schemaContextProviderFactory;
private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+ private final NotificationHandler notificationHandler;
public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
+ return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+ }
+
+ @VisibleForTesting
+ protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
+ final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
+ final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+ final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
deviceRpc));
}
- });
+ }, stateSchemasResolver);
}
@VisibleForTesting
protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
- final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
- final SchemaContextProviderFactory schemaContextProviderFactory,
- final SchemaSourceProviderFactory<InputStream> sourceProviderFactory) {
+ final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
+ final SchemaContextProviderFactory schemaContextProviderFactory,
+ final SchemaSourceProviderFactory<InputStream> sourceProviderFactory,
+ final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
this.id = id;
this.messageTransformer = messageTransformer;
this.salFacade = salFacade;
this.sourceProviderFactory = sourceProviderFactory;
+ this.stateSchemasResolver = stateSchemasResolver;
this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
this.schemaContextProviderFactory = schemaContextProviderFactory;
+ this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
}
@Override
@Override
public void run() {
final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
+
+ final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
+ logger.warn("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
+ // TODO use this for shared schema context
+
final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
updateMessageTransformer(schemaContextProvider);
salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp();
}
});
// Unable to initialize device, set as disconnected
logger.error("{}: Initialization failed", id, t);
salFacade.onDeviceDisconnected();
+ // TODO ssh connection is still open if sal initialization fails
}
});
}
@Override
public void onNotification(final NetconfMessage notification) {
- final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
- salFacade.onNotification(parsedNotification);
+ notificationHandler.handleNotification(notification);
}
+
+ /**
+ * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+ */
+ private final static class NotificationHandler {
+
+ private final RemoteDeviceHandler<?> salFacade;
+ private final List<NetconfMessage> cache = new LinkedList<>();
+ private final MessageTransformer<NetconfMessage> messageTransformer;
+ private boolean passNotifications = false;
+ private final RemoteDeviceId id;
+
+ NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+ this.salFacade = salFacade;
+ this.messageTransformer = messageTransformer;
+ this.id = id;
+ }
+
+ synchronized void handleNotification(final NetconfMessage notification) {
+ if(passNotifications) {
+ passNotification(messageTransformer.toNotification(notification));
+ } else {
+ cacheNotification(notification);
+ }
+ }
+
+ /**
+ * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+ */
+ synchronized void onRemoteSchemaUp() {
+ passNotifications = true;
+
+ for (final NetconfMessage cachedNotification : cache) {
+ passNotification(messageTransformer.toNotification(cachedNotification));
+ }
+
+ cache.clear();
+ }
+
+ private void cacheNotification(final NetconfMessage notification) {
+ Preconditions.checkState(passNotifications == false);
+
+ logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+ if(logger.isTraceEnabled()) {
+ logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+ }
+
+ cache.add(notification);
+ }
+
+ private void passNotification(final CompositeNode parsedNotification) {
+ logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+ Preconditions.checkNotNull(parsedNotification);
+ salFacade.onNotification(parsedNotification);
+ }
+ }
+
}