X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=de4ac7ac18a106f88f530ac4dc16ea047976b091;hp=94beaed0dfc61aeb0c5f260c24a1a689ec9fd7a1;hb=c222e37f2a0f0f3f6266242fbea2d3b018f4e6e3;hpb=478ce1fa1dc30974b7cf23fd5258f1af5366d547 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index 94beaed0df..de4ac7ac18 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,541 +7,202 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import static com.google.common.base.Preconditions.checkState; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; +import java.util.LinkedList; import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; -import org.opendaylight.controller.md.sal.common.api.data.DataModification; -import org.opendaylight.controller.md.sal.common.api.data.DataReader; -import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; -import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; -import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; -import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.controller.sal.connect.api.MessageTransformer; +import org.opendaylight.controller.sal.connect.api.RemoteDevice; +import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; +import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory; +import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; +import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory; +import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider; +import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; +import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.controller.sal.core.api.data.DataBrokerService; -import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance; -import org.opendaylight.controller.sal.core.api.mount.MountProvisionService; -import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; -import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.Node; -import org.opendaylight.yangtools.yang.data.api.SimpleNode; -import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl; -import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; -import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl; -import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder; -import org.opendaylight.yangtools.yang.model.api.Module; -import org.opendaylight.yangtools.yang.model.api.RpcDefinition; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider; import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; -import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; -import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.concurrent.EventExecutor; - -public class NetconfDevice implements Provider, // - DataReader, // - DataCommitHandler, // - RpcImplementation, // - AutoCloseable { - - InetSocketAddress socketAddress; - - MountProvisionInstance mountInstance; - - EventExecutor eventExecutor; - - ExecutorService processingExecutor; - - InstanceIdentifier path; - - ReconnectStrategy reconnectStrategy; - - AbstractCachingSchemaSourceProvider schemaSourceProvider; - - private NetconfDeviceSchemaContextProvider deviceContextProvider; - - protected Logger logger; - - Registration> operReaderReg; - Registration> confReaderReg; - Registration> commitHandlerReg; - List rpcReg; - - String name; - - MountProvisionService mountService; - - NetconfClientDispatcher dispatcher; - - static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance(); - - SchemaSourceProvider remoteSourceProvider; - - private volatile DataBrokerService dataBroker; - - NetconfDeviceListener listener; - - private boolean rollbackSupported; - - private NetconfReconnectingClientConfiguration clientConfig; - private volatile DataProviderService dataProviderService; +/** + * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade + */ +public final class NetconfDevice implements RemoteDevice { - public NetconfDevice(String name) { - this.name = name; - this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name); - this.path = InstanceIdentifier.builder(INVENTORY_PATH) - .nodeWithKey(INVENTORY_NODE, Collections.singletonMap(INVENTORY_ID, name)).toInstance(); - } + private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class); - public void start() { - checkState(dispatcher != null, "Dispatcher must be set."); - checkState(schemaSourceProvider != null, "Schema Source Provider must be set."); - checkState(eventExecutor != null, "Event executor must be set."); + private final RemoteDeviceId id; - Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener); - listener = (NetconfDeviceListener) clientConfig.getSessionListener(); + private final RemoteDeviceHandler salFacade; + private final ListeningExecutorService processingExecutor; + private final MessageTransformer messageTransformer; + private final SchemaContextProviderFactory schemaContextProviderFactory; + private final SchemaSourceProviderFactory sourceProviderFactory; + private final NotificationHandler notificationHandler; - logger.info("Starting NETCONF Client {} for address {}", name, socketAddress); + public static NetconfDevice createNetconfDevice(final RemoteDeviceId id, + final AbstractCachingSchemaSourceProvider schemaSourceProvider, + final ExecutorService executor, final RemoteDeviceHandler salFacade) { - dispatcher.createReconnectingClient(clientConfig); + return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(), + new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory() { + @Override + public SchemaSourceProvider createSourceProvider(final RpcImplementation deviceRpc) { + return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id, + deviceRpc)); + } + }); } - Optional getSchemaContext() { - if (deviceContextProvider == null) { - return Optional.absent(); - } - return deviceContextProvider.currentContext; + @VisibleForTesting + protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService processingExecutor, final MessageTransformer messageTransformer, + final SchemaContextProviderFactory schemaContextProviderFactory, + final SchemaSourceProviderFactory sourceProviderFactory) { + this.id = id; + this.messageTransformer = messageTransformer; + this.salFacade = salFacade; + this.sourceProviderFactory = sourceProviderFactory; + this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor); + this.schemaContextProviderFactory = schemaContextProviderFactory; + this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); } - void bringDown() { - if (rpcReg != null) { - for (RpcRegistration reg : rpcReg) { - reg.close(); + @Override + public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities, + final RemoteDeviceCommunicator listener) { + // SchemaContext setup has to be performed in a dedicated thread since + // we are in a netty thread in this method + // Yang models are being downloaded in this method and it would cause a + // deadlock if we used the netty thread + // http://netty.io/wiki/thread-model.html + logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities); + + final ListenableFuture salInitializationFuture = processingExecutor.submit(new Runnable() { + @Override + public void run() { + final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener); + final SchemaSourceProvider delegate = sourceProviderFactory.createSourceProvider(deviceRpc); + final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities); + updateMessageTransformer(schemaContextProvider); + salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc); + notificationHandler.onRemoteSchemaUp(); } - rpcReg = null; - } - closeGracefully(confReaderReg); - confReaderReg = null; - closeGracefully(operReaderReg); - operReaderReg = null; - closeGracefully(commitHandlerReg); - commitHandlerReg = null; - - updateDeviceState(false, Collections. emptySet()); - } + }); - private void closeGracefully(final AutoCloseable resource) { - if (resource != null) { - try { - resource.close(); - } catch (Exception e) { - logger.warn("Ignoring exception while closing {}", resource, e); + Futures.addCallback(salInitializationFuture, new FutureCallback() { + @Override + public void onSuccess(final Object result) { + logger.debug("{}: Initialization in sal successful", id); + logger.info("{}: Netconf connector initialized successfully", id); } - } - } - void bringUp(final SchemaSourceProvider delegate, final Set capabilities, final boolean rollbackSupported) { - // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener. - // Reason: delegate.getSchema blocks thread when waiting for response - // however, if the netty thread is blocked, no incoming message can be processed - // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html - // TODO redesign +refactor - processingExecutor.submit(new Runnable() { @Override - public void run() { - NetconfDevice.this.rollbackSupported = rollbackSupported; - remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); - deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider); - deviceContextProvider.createContextFromCapabilities(capabilities); - if (mountInstance != null && getSchemaContext().isPresent()) { - mountInstance.setSchemaContext(getSchemaContext().get()); - } - - updateDeviceState(true, capabilities); - - if (mountInstance != null) { - confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this); - operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this); - commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this); - - List rpcs = new ArrayList<>(); - // TODO same condition twice - if (mountInstance != null && getSchemaContext().isPresent()) { - for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) { - rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this)); - } - } - rpcReg = rpcs; - } + public void onFailure(final Throwable t) { + // Unable to initialize device, set as disconnected + logger.error("{}: Initialization failed", id, t); + salFacade.onDeviceDisconnected(); } }); } - private void updateDeviceState(boolean up, Set capabilities) { - checkDataStoreState(); - - DataModificationTransaction transaction = dataBroker.beginTransaction(); - - CompositeNodeBuilder it = ImmutableCompositeNode.builder(); - it.setQName(INVENTORY_NODE); - it.addLeaf(INVENTORY_ID, name); - it.addLeaf(INVENTORY_CONNECTED, up); - - logger.debug("Client capabilities {}", capabilities); - for (QName capability : capabilities) { - it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString()); - } - - logger.debug("Update device state transaction " + transaction.getIdentifier() - + " putting operational data started."); - transaction.removeOperationalData(path); - transaction.putOperationalData(path, it.toInstance()); - logger.debug("Update device state transaction " + transaction.getIdentifier() - + " putting operational data ended."); - - // FIXME: this has to be asynchronous - RpcResult transactionStatus = null; - try { - transactionStatus = transaction.commit().get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); - } - // TODO better ex handling - - if (transactionStatus.isSuccessful()) { - logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL."); - } else { - logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!"); - logger.debug("Update device state transaction status " + transaction.getStatus()); - } + /** + * Update initial message transformer to use retrieved schema + */ + private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) { + messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext()); } - @Override - public CompositeNode readConfigurationData(InstanceIdentifier path) { - RpcResult result = null; - try { - result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME, - wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); - } - - CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME); - return data == null ? null : (CompositeNode) findNode(data, path); + private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider sourceProvider, final NetconfSessionCapabilities capabilities) { + return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider); } - @Override - public CompositeNode readOperationalData(InstanceIdentifier path) { - RpcResult result = null; - try { - result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); - } - - CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME); - return (CompositeNode) findNode(data, path); + private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator listener) { + Preconditions.checkArgument(capHolder.isMonitoringSupported(), + "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder); + return new NetconfDeviceRpc(listener, messageTransformer); } @Override - public Set getSupportedRpcs() { - return Collections.emptySet(); + public void onRemoteSessionDown() { + salFacade.onDeviceDisconnected(); } @Override - public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { - return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc); + public void onNotification(final NetconfMessage notification) { + notificationHandler.handleNotification(notification); } - @Override - public Collection getProviderFunctionality() { - return Collections.emptySet(); - } + /** + * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade. + */ + private final static class NotificationHandler { - @Override - public void onSessionInitiated(ProviderSession session) { - dataBroker = session.getService(DataBrokerService.class); - - processingExecutor.submit(new Runnable() { - @Override - public void run() { - updateInitialState(); - } - }); + private final RemoteDeviceHandler salFacade; + private final List cache = new LinkedList<>(); + private final MessageTransformer messageTransformer; + private boolean passNotifications = false; + private final RemoteDeviceId id; - mountService = session.getService(MountProvisionService.class); - if (mountService != null) { - mountInstance = mountService.createOrGetMountPoint(path); + NotificationHandler(final RemoteDeviceHandler salFacade, final MessageTransformer messageTransformer, final RemoteDeviceId id) { + this.salFacade = salFacade; + this.messageTransformer = messageTransformer; + this.id = id; } - } - - private void updateInitialState() { - checkDataStoreState(); - DataModificationTransaction transaction = dataBroker.beginTransaction(); - if (operationalNodeNotExisting(transaction)) { - transaction.putOperationalData(path, getNodeWithId()); - } - if (configurationNodeNotExisting(transaction)) { - transaction.putConfigurationData(path, getNodeWithId()); - } - - try { - transaction.commit().get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); - } - } - - private void checkDataStoreState() { - // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore - dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder( - Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build()); } - - CompositeNode getNodeWithId() { - SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name); - return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.> singletonList(id)); - } - - boolean configurationNodeNotExisting(DataModificationTransaction transaction) { - return null == transaction.readConfigurationData(path); - } - - boolean operationalNodeNotExisting(DataModificationTransaction transaction) { - return null == transaction.readOperationalData(path); - } - - static Node findNode(CompositeNode node, InstanceIdentifier identifier) { - - Node current = node; - for (InstanceIdentifier.PathArgument arg : identifier.getPath()) { - if (current instanceof SimpleNode) { - return null; - } else if (current instanceof CompositeNode) { - CompositeNode currentComposite = (CompositeNode) current; - - current = currentComposite.getFirstCompositeByName(arg.getNodeType()); - if (current == null) { - current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision()); - } - if (current == null) { - current = currentComposite.getFirstSimpleByName(arg.getNodeType()); - } - if (current == null) { - current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision()); - } - if (current == null) { - return null; - } + synchronized void handleNotification(final NetconfMessage notification) { + if(passNotifications) { + passNotification(messageTransformer.toNotification(notification)); + } else { + cacheNotification(notification); } } - return current; - } - @Override - public DataCommitTransaction requestCommit( - DataModification modification) { - NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, - modification, true, rollbackSupported); - try { - twoPhaseCommit.prepare(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); - } - return twoPhaseCommit; - } + /** + * Forward all cached notifications and pass all notifications from this point directly to sal facade. + */ + synchronized void onRemoteSchemaUp() { + passNotifications = true; - Set getCapabilities(Collection capabilities) { - return FluentIterable.from(capabilities).filter(new Predicate() { - @Override - public boolean apply(final String capability) { - return capability.contains("?") && capability.contains("module=") && capability.contains("revision="); + for (final NetconfMessage cachedNotification : cache) { + passNotification(messageTransformer.toNotification(cachedNotification)); } - }).transform(new Function() { - @Override - public QName apply(final String capability) { - String[] parts = capability.split("\\?"); - String namespace = parts[0]; - FluentIterable queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&"))); - String revision = getStringAndTransform(queryParams, "revision=", "revision="); - - String moduleName = getStringAndTransform(queryParams, "module=", "module="); + cache.clear(); + } - if (revision == null) { - logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision="); - revision = getStringAndTransform(queryParams, "amp;revision==", "revision="); + private void cacheNotification(final NetconfMessage notification) { + Preconditions.checkState(passNotifications == false); - if (revision != null) { - logger.warn("Netconf device returned revision incorectly escaped for {}", capability); - } - } - if (revision == null) { - return QName.create(URI.create(namespace), null, moduleName); - } - return QName.create(namespace, revision, moduleName); + logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification); + if(logger.isTraceEnabled()) { + logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument())); } - private String getStringAndTransform(final Iterable queryParams, final String match, - final String substringToRemove) { - Optional found = Iterables.tryFind(queryParams, new Predicate() { - @Override - public boolean apply(final String input) { - return input.startsWith(match); - } - }); - - return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null; - } - - }).toSet(); - } - - @Override - public void close() { - bringDown(); - } - - public String getName() { - return name; - } - - public InetSocketAddress getSocketAddress() { - return socketAddress; - } - - public MountProvisionInstance getMountInstance() { - return mountInstance; - } - - public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) { - this.reconnectStrategy = reconnectStrategy; - } - - public void setProcessingExecutor(final ExecutorService processingExecutor) { - this.processingExecutor = processingExecutor; - } - - public void setSocketAddress(final InetSocketAddress socketAddress) { - this.socketAddress = socketAddress; - } - - public void setEventExecutor(final EventExecutor eventExecutor) { - this.eventExecutor = eventExecutor; - } - - public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider schemaSourceProvider) { - this.schemaSourceProvider = schemaSourceProvider; - } - - public void setDispatcher(final NetconfClientDispatcher dispatcher) { - this.dispatcher = dispatcher; - } - - public void setClientConfig(final NetconfReconnectingClientConfiguration clientConfig) { - this.clientConfig = clientConfig; - } - - public void setDataProviderService(final DataProviderService dataProviderService) { - this.dataProviderService = dataProviderService; - } -} - -class NetconfDeviceSchemaContextProvider { - - NetconfDevice device; - - SchemaSourceProvider sourceProvider; - - Optional currentContext; - - NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider sourceProvider) { - this.device = device; - this.sourceProvider = sourceProvider; - this.currentContext = Optional.absent(); - } - - void createContextFromCapabilities(Iterable capabilities) { - YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider); - if (!sourceContext.getMissingSources().isEmpty()) { - device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources()); - } - device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources()); - List modelsToParse = YangSourceContext.getValidInputStreams(sourceContext); - if (!sourceContext.getValidSources().isEmpty()) { - SchemaContext schemaContext = tryToCreateContext(modelsToParse); - currentContext = Optional.fromNullable(schemaContext); - } else { - currentContext = Optional.absent(); + cache.add(notification); } - if (currentContext.isPresent()) { - device.logger.debug("Schema context successfully created."); - } - } - SchemaContext tryToCreateContext(List modelsToParse) { - YangParserImpl parser = new YangParserImpl(); - try { - - Set models = parser.parseYangModelsFromStreams(modelsToParse); - return parser.resolveSchemaContext(models); - } catch (Exception e) { - device.logger.debug("Error occured during parsing YANG schemas", e); - return null; + private void passNotification(final CompositeNode parsedNotification) { + logger.debug("{}: Forwarding notification {}", id, parsedNotification); + Preconditions.checkNotNull(parsedNotification); + salFacade.onNotification(parsedNotification); } + } }