X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=350132cf99a5dfb05681b0732912e7a4220335d2;hp=dca8fcafef4a88e31ecc6898dc52bc12b7aa5936;hb=c74d5c2399e500fe3e690edc8cee497b1cb6f867;hpb=83e1c610eeefba667a19c243fbc1098072a8079d diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index dca8fcafef..350132cf99 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,15 +7,13 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import java.io.InputStream; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutorService; import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; @@ -38,6 +36,11 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade @@ -53,10 +56,20 @@ public final class NetconfDevice implements RemoteDevice messageTransformer; private final SchemaContextProviderFactory schemaContextProviderFactory; private final SchemaSourceProviderFactory sourceProviderFactory; + private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; + private final NotificationHandler notificationHandler; public static NetconfDevice createNetconfDevice(final RemoteDeviceId id, final AbstractCachingSchemaSourceProvider schemaSourceProvider, final ExecutorService executor, final RemoteDeviceHandler salFacade) { + return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl()); + } + + @VisibleForTesting + protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id, + final AbstractCachingSchemaSourceProvider schemaSourceProvider, + final ExecutorService executor, final RemoteDeviceHandler salFacade, + final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(), new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory() { @@ -65,20 +78,23 @@ public final class NetconfDevice implements RemoteDevice salFacade, - final ExecutorService processingExecutor, final MessageTransformer messageTransformer, - final SchemaContextProviderFactory schemaContextProviderFactory, - final SchemaSourceProviderFactory sourceProviderFactory) { + final ExecutorService processingExecutor, final MessageTransformer messageTransformer, + final SchemaContextProviderFactory schemaContextProviderFactory, + final SchemaSourceProviderFactory sourceProviderFactory, + final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { this.id = id; this.messageTransformer = messageTransformer; this.salFacade = salFacade; this.sourceProviderFactory = sourceProviderFactory; + this.stateSchemasResolver = stateSchemasResolver; this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor); this.schemaContextProviderFactory = schemaContextProviderFactory; + this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); } @Override @@ -95,10 +111,16 @@ public final class NetconfDevice implements RemoteDevice delegate = sourceProviderFactory.createSourceProvider(deviceRpc); final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities); updateMessageTransformer(schemaContextProvider); salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc); + notificationHandler.onRemoteSchemaUp(); } }); @@ -114,6 +136,7 @@ public final class NetconfDevice implements RemoteDevice salFacade; + private final List cache = new LinkedList<>(); + private final MessageTransformer messageTransformer; + private boolean passNotifications = false; + private final RemoteDeviceId id; + + NotificationHandler(final RemoteDeviceHandler salFacade, final MessageTransformer messageTransformer, final RemoteDeviceId id) { + this.salFacade = salFacade; + this.messageTransformer = messageTransformer; + this.id = id; + } + + synchronized void handleNotification(final NetconfMessage notification) { + if(passNotifications) { + passNotification(messageTransformer.toNotification(notification)); + } else { + cacheNotification(notification); + } + } + + /** + * Forward all cached notifications and pass all notifications from this point directly to sal facade. + */ + synchronized void onRemoteSchemaUp() { + passNotifications = true; + + for (final NetconfMessage cachedNotification : cache) { + passNotification(messageTransformer.toNotification(cachedNotification)); + } + + cache.clear(); + } + + private void cacheNotification(final NetconfMessage notification) { + Preconditions.checkState(passNotifications == false); + + logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification); + if(logger.isTraceEnabled()) { + logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument())); + } + + cache.add(notification); + } + + private void passNotification(final CompositeNode parsedNotification) { + logger.debug("{}: Forwarding notification {}", id, parsedNotification); + Preconditions.checkNotNull(parsedNotification); + salFacade.onNotification(parsedNotification); + } + } + }