*/
package org.opendaylight.controller.sal.connect.netconf;
-import static com.google.common.base.Preconditions.checkState;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
-
import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.api.MessageTransformer;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
+import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory;
+import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
-import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.concurrent.EventExecutor;
-
-public class NetconfDevice implements Provider, //
- DataReader<InstanceIdentifier, CompositeNode>, //
- DataCommitHandler<InstanceIdentifier, CompositeNode>, //
- RpcImplementation, //
- AutoCloseable {
-
- InetSocketAddress socketAddress;
-
- MountProvisionInstance mountInstance;
-
- EventExecutor eventExecutor;
-
- ExecutorService processingExecutor;
-
- InstanceIdentifier path;
-
- ReconnectStrategy reconnectStrategy;
-
- AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
-
- private NetconfDeviceSchemaContextProvider deviceContextProvider;
-
- protected Logger logger;
-
- Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
- Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
- Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
- List<RpcRegistration> rpcReg;
-
- String name;
-
- MountProvisionService mountService;
-
- NetconfClientDispatcher dispatcher;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
- static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
-
- SchemaSourceProvider<InputStream> remoteSourceProvider;
-
- DataBrokerService dataBroker;
+/**
+ * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
+ */
+public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilities, NetconfMessage> {
- NetconfDeviceListener listener;
+ private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
- private boolean rollbackSupported;
+ private final RemoteDeviceId id;
+ private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+ private final ListeningExecutorService processingExecutor;
+ private final MessageTransformer<NetconfMessage> messageTransformer;
+ private final SchemaContextProviderFactory schemaContextProviderFactory;
+ private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+ private final NotificationHandler notificationHandler;
- public NetconfDevice(String name) {
- this.name = name;
- this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
- this.path = InstanceIdentifier.builder(INVENTORY_PATH)
- .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
+ public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
+ final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
+ final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
+ return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
}
- public void start() {
- checkState(dispatcher != null, "Dispatcher must be set.");
- checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
- checkState(eventExecutor != null, "Event executor must be set.");
-
- listener = new NetconfDeviceListener(this);
+ @VisibleForTesting
+ protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
+ final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
+ final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+ final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
- logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
-
- dispatcher.createClient(socketAddress, listener, reconnectStrategy);
+ return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
+ new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
+ @Override
+ public SchemaSourceProvider<InputStream> createSourceProvider(final RpcImplementation deviceRpc) {
+ return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
+ deviceRpc));
+ }
+ }, stateSchemasResolver);
}
- Optional<SchemaContext> getSchemaContext() {
- if (deviceContextProvider == null) {
- return Optional.absent();
- }
- return deviceContextProvider.currentContext;
+ @VisibleForTesting
+ protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+ final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
+ final SchemaContextProviderFactory schemaContextProviderFactory,
+ final SchemaSourceProviderFactory<InputStream> sourceProviderFactory,
+ final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+ this.id = id;
+ this.messageTransformer = messageTransformer;
+ this.salFacade = salFacade;
+ this.sourceProviderFactory = sourceProviderFactory;
+ this.stateSchemasResolver = stateSchemasResolver;
+ this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
+ this.schemaContextProviderFactory = schemaContextProviderFactory;
+ this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
}
- void bringDown() {
- if (rpcReg != null) {
- for (RpcRegistration reg : rpcReg) {
- reg.close();
+ @Override
+ public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities,
+ final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ // SchemaContext setup has to be performed in a dedicated thread since
+ // we are in a netty thread in this method
+ // Yang models are being downloaded in this method and it would cause a
+ // deadlock if we used the netty thread
+ // http://netty.io/wiki/thread-model.html
+ logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
+
+ final ListenableFuture<?> salInitializationFuture = processingExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
+
+ final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
+ logger.warn("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
+ // TODO use this for shared schema context
+
+ final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
+ final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
+ updateMessageTransformer(schemaContextProvider);
+ salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp();
}
- rpcReg = null;
- }
- closeGracefully(confReaderReg);
- confReaderReg = null;
- closeGracefully(operReaderReg);
- operReaderReg = null;
- closeGracefully(commitHandlerReg);
- commitHandlerReg = null;
-
- updateDeviceState(false, Collections.<QName> emptySet());
- }
+ });
- private void closeGracefully(final AutoCloseable resource) {
- if (resource != null) {
- try {
- resource.close();
- } catch (Exception e) {
- logger.warn("Ignoring exception while closing {}", resource, e);
+ Futures.addCallback(salInitializationFuture, new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(final Object result) {
+ logger.debug("{}: Initialization in sal successful", id);
+ logger.info("{}: Netconf connector initialized successfully", id);
}
- }
- }
- void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
- // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
- // Reason: delegate.getSchema blocks thread when waiting for response
- // however, if the netty thread is blocked, no incoming message can be processed
- // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
- // TODO redesign +refactor
- processingExecutor.submit(new Runnable() {
@Override
- public void run() {
- NetconfDevice.this.rollbackSupported = rollbackSupported;
- remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
- deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
- deviceContextProvider.createContextFromCapabilities(capabilities);
- if (mountInstance != null && getSchemaContext().isPresent()) {
- mountInstance.setSchemaContext(getSchemaContext().get());
- }
-
- updateDeviceState(true, capabilities);
-
- if (mountInstance != null) {
- confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
- operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
- commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
-
- List<RpcRegistration> rpcs = new ArrayList<>();
- // TODO same condition twice
- if (mountInstance != null && getSchemaContext().isPresent()) {
- for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
- rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
- }
- }
- rpcReg = rpcs;
- }
+ public void onFailure(final Throwable t) {
+ // Unable to initialize device, set as disconnected
+ logger.error("{}: Initialization failed", id, t);
+ salFacade.onDeviceDisconnected();
+ // TODO ssh connection is still open if sal initialization fails
}
});
}
- private void updateDeviceState(boolean up, Set<QName> capabilities) {
- DataModificationTransaction transaction = dataBroker.beginTransaction();
-
- CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
- it.setQName(INVENTORY_NODE);
- it.addLeaf(INVENTORY_ID, name);
- it.addLeaf(INVENTORY_CONNECTED, up);
-
- logger.debug("Client capabilities {}", capabilities);
- for (QName capability : capabilities) {
- it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
- }
-
- logger.debug("Update device state transaction " + transaction.getIdentifier()
- + " putting operational data started.");
- transaction.removeOperationalData(path);
- transaction.putOperationalData(path, it.toInstance());
- logger.debug("Update device state transaction " + transaction.getIdentifier()
- + " putting operational data ended.");
-
- // FIXME: this has to be asynchronous
- RpcResult<TransactionStatus> transactionStatus = null;
- try {
- transactionStatus = transaction.commit().get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
- // TODO better ex handling
-
- if (transactionStatus.isSuccessful()) {
- logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
- } else {
- logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
- logger.debug("Update device state transaction status " + transaction.getStatus());
- }
- }
-
- @Override
- public CompositeNode readConfigurationData(InstanceIdentifier path) {
- RpcResult<CompositeNode> result = null;
- try {
- result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
- wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
-
- CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return data == null ? null : (CompositeNode) findNode(data, path);
+ /**
+ * Update initial message transformer to use retrieved schema
+ */
+ private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) {
+ messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext());
}
- @Override
- public CompositeNode readOperationalData(InstanceIdentifier path) {
- RpcResult<CompositeNode> result = null;
- try {
- result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
-
- CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return (CompositeNode) findNode(data, path);
+ private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider<InputStream> sourceProvider, final NetconfSessionCapabilities capabilities) {
+ return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider);
}
- @Override
- public Set<QName> getSupportedRpcs() {
- return Collections.emptySet();
+ private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ Preconditions.checkArgument(capHolder.isMonitoringSupported(),
+ "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder);
+ return new NetconfDeviceRpc(listener, messageTransformer);
}
@Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
+ public void onRemoteSessionDown() {
+ salFacade.onDeviceDisconnected();
}
@Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return Collections.emptySet();
+ public void onNotification(final NetconfMessage notification) {
+ notificationHandler.handleNotification(notification);
}
- @Override
- public void onSessionInitiated(ProviderSession session) {
- dataBroker = session.getService(DataBrokerService.class);
+ /**
+ * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+ */
+ private final static class NotificationHandler {
- DataModificationTransaction transaction = dataBroker.beginTransaction();
- if (operationalNodeNotExisting(transaction)) {
- transaction.putOperationalData(path, getNodeWithId());
- }
- if (configurationNodeNotExisting(transaction)) {
- transaction.putConfigurationData(path, getNodeWithId());
- }
+ private final RemoteDeviceHandler<?> salFacade;
+ private final List<NetconfMessage> cache = new LinkedList<>();
+ private final MessageTransformer<NetconfMessage> messageTransformer;
+ private boolean passNotifications = false;
+ private final RemoteDeviceId id;
- try {
- transaction.commit().get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
+ NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+ this.salFacade = salFacade;
+ this.messageTransformer = messageTransformer;
+ this.id = id;
}
- mountService = session.getService(MountProvisionService.class);
- if (mountService != null) {
- mountInstance = mountService.createOrGetMountPoint(path);
- }
- }
-
- CompositeNode getNodeWithId() {
- SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
- return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
- }
-
- boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
- return null == transaction.readConfigurationData(path);
- }
-
- boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
- return null == transaction.readOperationalData(path);
- }
-
- static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
-
- Node<?> current = node;
- for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
- if (current instanceof SimpleNode<?>) {
- return null;
- } else if (current instanceof CompositeNode) {
- CompositeNode currentComposite = (CompositeNode) current;
-
- current = currentComposite.getFirstCompositeByName(arg.getNodeType());
- if (current == null) {
- current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
- }
- if (current == null) {
- current = currentComposite.getFirstSimpleByName(arg.getNodeType());
- }
- if (current == null) {
- current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
- }
- if (current == null) {
- return null;
- }
+ synchronized void handleNotification(final NetconfMessage notification) {
+ if(passNotifications) {
+ passNotification(messageTransformer.toNotification(notification));
+ } else {
+ cacheNotification(notification);
}
}
- return current;
- }
- @Override
- public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
- DataModification<InstanceIdentifier, CompositeNode> modification) {
- NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
- modification, true, rollbackSupported);
- try {
- twoPhaseCommit.prepare();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
- return twoPhaseCommit;
- }
+ /**
+ * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+ */
+ synchronized void onRemoteSchemaUp() {
+ passNotifications = true;
- Set<QName> getCapabilities(Collection<String> capabilities) {
- return FluentIterable.from(capabilities).filter(new Predicate<String>() {
- @Override
- public boolean apply(final String capability) {
- return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
+ for (final NetconfMessage cachedNotification : cache) {
+ passNotification(messageTransformer.toNotification(cachedNotification));
}
- }).transform(new Function<String, QName>() {
- @Override
- public QName apply(final String capability) {
- String[] parts = capability.split("\\?");
- String namespace = parts[0];
- FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
-
- String revision = getStringAndTransform(queryParams, "revision=", "revision=");
- String moduleName = getStringAndTransform(queryParams, "module=", "module=");
-
- if (revision == null) {
- logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
- revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
-
- if (revision != null) {
- logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
- }
- }
- if (revision == null) {
- return QName.create(URI.create(namespace), null, moduleName);
- }
- return QName.create(namespace, revision, moduleName);
- }
+ cache.clear();
+ }
- private String getStringAndTransform(final Iterable<String> queryParams, final String match,
- final String substringToRemove) {
- Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
- @Override
- public boolean apply(final String input) {
- return input.startsWith(match);
- }
- });
+ private void cacheNotification(final NetconfMessage notification) {
+ Preconditions.checkState(passNotifications == false);
- return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
+ logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+ if(logger.isTraceEnabled()) {
+ logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
}
- }).toSet();
- }
-
- @Override
- public void close() {
- bringDown();
- }
-
- public String getName() {
- return name;
- }
-
- public InetSocketAddress getSocketAddress() {
- return socketAddress;
- }
-
- public MountProvisionInstance getMountInstance() {
- return mountInstance;
- }
-
- public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
- this.reconnectStrategy = reconnectStrategy;
- }
-
- public void setProcessingExecutor(final ExecutorService processingExecutor) {
- this.processingExecutor = processingExecutor;
- }
-
- public void setSocketAddress(final InetSocketAddress socketAddress) {
- this.socketAddress = socketAddress;
- }
-
- public void setEventExecutor(final EventExecutor eventExecutor) {
- this.eventExecutor = eventExecutor;
- }
-
- public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
- this.schemaSourceProvider = schemaSourceProvider;
- }
-
- public void setDispatcher(final NetconfClientDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
-}
-
-class NetconfDeviceSchemaContextProvider {
-
- NetconfDevice device;
-
- SchemaSourceProvider<InputStream> sourceProvider;
-
- Optional<SchemaContext> currentContext;
-
- NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
- this.device = device;
- this.sourceProvider = sourceProvider;
- this.currentContext = Optional.absent();
- }
-
- void createContextFromCapabilities(Iterable<QName> capabilities) {
- YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
- if (!sourceContext.getMissingSources().isEmpty()) {
- device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
- }
- device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
- List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
- if (!sourceContext.getValidSources().isEmpty()) {
- SchemaContext schemaContext = tryToCreateContext(modelsToParse);
- currentContext = Optional.fromNullable(schemaContext);
- } else {
- currentContext = Optional.absent();
- }
- if (currentContext.isPresent()) {
- device.logger.debug("Schema context successfully created.");
+ cache.add(notification);
}
- }
-
- SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
- YangParserImpl parser = new YangParserImpl();
- try {
- Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
- return parser.resolveSchemaContext(models);
- } catch (Exception e) {
- device.logger.debug("Error occured during parsing YANG schemas", e);
- return null;
+ private void passNotification(final CompositeNode parsedNotification) {
+ logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+ Preconditions.checkNotNull(parsedNotification);
+ salFacade.onNotification(parsedNotification);
}
}
+
}