*/
package org.opendaylight.controller.sal.connect.netconf;
-import static com.google.common.base.Preconditions.checkState;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
-import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
-import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
-import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.sal.connect.api.MessageTransformer;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
+import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.concurrent.EventExecutor;
-
-public class NetconfDevice implements Provider, //
- DataReader<InstanceIdentifier, CompositeNode>, //
- DataCommitHandler<InstanceIdentifier, CompositeNode>, //
- RpcImplementation, //
- AutoCloseable {
-
- InetSocketAddress socketAddress;
-
- MountProvisionInstance mountInstance;
-
- EventExecutor eventExecutor;
-
- ExecutorService processingExecutor;
-
- InstanceIdentifier path;
-
- ReconnectStrategy reconnectStrategy;
-
- AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
-
- private NetconfDeviceSchemaContextProvider deviceContextProvider;
-
- protected Logger logger;
-
- Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
- Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
- Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
- List<RpcRegistration> rpcReg;
-
- String name;
-
- MountProvisionService mountService;
+/**
+ * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
+ */
+public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
- NetconfClientDispatcher dispatcher;
+ private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
- static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
+ /**
+ * Initial schema context contains schemas for netconf monitoring and netconf notifications
+ */
+ public static final SchemaContext INIT_SCHEMA_CTX;
- SchemaSourceProvider<InputStream> remoteSourceProvider;
+ static {
+ try {
+ final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
+ moduleInfoBackedContext.addModuleInfos(
+ Lists.newArrayList(
+ $YangModuleInfoImpl.getInstance(),
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.$YangModuleInfoImpl.getInstance(),
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.$YangModuleInfoImpl.getInstance()));
+ INIT_SCHEMA_CTX = moduleInfoBackedContext.tryToCreateSchemaContext().get();
+ } catch (final RuntimeException e) {
+ logger.error("Unable to prepare schema context for netconf initialization", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ }
- private volatile DataBrokerService dataBroker;
+ public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION = new Function<QName, SourceIdentifier>() {
+ @Override
+ public SourceIdentifier apply(final QName input) {
+ return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision()));
+ }
+ };
- NetconfDeviceListener listener;
+ private final RemoteDeviceId id;
+ private final boolean reconnectOnSchemasChange;
- private boolean rollbackSupported;
+ private final SchemaContextFactory schemaContextFactory;
+ private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
+ private final ListeningExecutorService processingExecutor;
+ private final SchemaSourceRegistry schemaRegistry;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+ private final NotificationHandler notificationHandler;
+ private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
- private NetconfClientConfiguration clientConfig;
- private volatile DataProviderService dataProviderService;
+ // Message transformer is constructed once the schemas are available
+ private MessageTransformer<NetconfMessage> messageTransformer;
- public NetconfDevice(String name) {
- this.name = name;
- this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
- this.path = InstanceIdentifier.builder(INVENTORY_PATH)
- .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
+ public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+ final ExecutorService globalProcessingExecutor) {
+ this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, false);
}
- public void start() {
- checkState(dispatcher != null, "Dispatcher must be set.");
- checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
- checkState(eventExecutor != null, "Event executor must be set.");
-
- Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
- listener = (NetconfDeviceListener) clientConfig.getSessionListener();
+ /**
+ * Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded
+ */
+ static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) {
+ return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX, false));
+ }
- logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
- dispatcher.createClient(clientConfig);
+ // FIXME reduce parameters
+ public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+ final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
+ this.id = id;
+ this.reconnectOnSchemasChange = reconnectOnSchemasChange;
+ this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
+ this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
+ this.salFacade = salFacade;
+ this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
+ this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
+ this.notificationHandler = new NotificationHandler(salFacade, id);
}
- Optional<SchemaContext> getSchemaContext() {
- if (deviceContextProvider == null) {
- return Optional.absent();
+ @Override
+ public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
+ final NetconfDeviceCommunicator listener) {
+ // SchemaContext setup has to be performed in a dedicated thread since
+ // we are in a netty thread in this method
+ // Yang models are being downloaded in this method and it would cause a
+ // deadlock if we used the netty thread
+ // http://netty.io/wiki/thread-model.html
+ logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
+
+ final NetconfDeviceRpc initRpc = getRpcForInitialization(listener);
+ final DeviceSourcesResolver task = new DeviceSourcesResolver(remoteSessionCapabilities, id, stateSchemasResolver, initRpc);
+ final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
+
+ if(shouldListenOnSchemaChange(remoteSessionCapabilities)) {
+ registerToBaseNetconfStream(initRpc, listener);
}
- return deviceContextProvider.currentContext;
- }
- void bringDown() {
- if (rpcReg != null) {
- for (RpcRegistration reg : rpcReg) {
- reg.close();
+ final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
+ @Override
+ public void onSuccess(final DeviceSources result) {
+ addProvidedSourcesToSchemaRegistry(initRpc, result);
+ setUpSchema(result);
}
- rpcReg = null;
- }
- closeGracefully(confReaderReg);
- confReaderReg = null;
- closeGracefully(operReaderReg);
- operReaderReg = null;
- closeGracefully(commitHandlerReg);
- commitHandlerReg = null;
-
- updateDeviceState(false, Collections.<QName> emptySet());
- }
- private void closeGracefully(final AutoCloseable resource) {
- if (resource != null) {
- try {
- resource.close();
- } catch (Exception e) {
- logger.warn("Ignoring exception while closing {}", resource, e);
+ private void setUpSchema(final DeviceSources result) {
+ processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, listener));
}
- }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ logger.warn("{}: Unexpected error resolving device sources: {}", id, t);
+ handleSalInitializationFailure(t, listener);
+ }
+ };
+
+ Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
}
- void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
- // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
- // Reason: delegate.getSchema blocks thread when waiting for response
- // however, if the netty thread is blocked, no incoming message can be processed
- // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
- // TODO redesign +refactor
- processingExecutor.submit(new Runnable() {
+ private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
+ // TODO check whether the model describing create subscription is present in schema
+ // Perhaps add a default schema context to support create-subscription if the model was not provided (same as what we do for base netconf operations in transformer)
+ final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultListenableFuture =
+ deviceRpc.invokeRpc(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+
+ final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
@Override
- public void run() {
- NetconfDevice.this.rollbackSupported = rollbackSupported;
- remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
- deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
- deviceContextProvider.createContextFromCapabilities(capabilities);
- if (mountInstance != null && getSchemaContext().isPresent()) {
- mountInstance.setSchemaContext(getSchemaContext().get());
+ public Optional<DOMNotification> filterNotification(final DOMNotification notification) {
+ if (isCapabilityChanged(notification)) {
+ logger.info("{}: Schemas change detected, reconnecting", id);
+ // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
+ listener.disconnect();
+ return Optional.absent();
}
+ return Optional.of(notification);
+ }
- updateDeviceState(true, capabilities);
+ private boolean isCapabilityChanged(final DOMNotification notification) {
+ return notification.getBody().getNodeType().equals(NetconfCapabilityChange.QNAME);
+ }
+ };
- if (mountInstance != null) {
- confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
- operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
- commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
+ Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(final DOMRpcResult domRpcResult) {
+ notificationHandler.addNotificationFilter(filter);
+ }
- List<RpcRegistration> rpcs = new ArrayList<>();
- // TODO same condition twice
- if (mountInstance != null && getSchemaContext().isPresent()) {
- for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
- rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
- }
- }
- rpcReg = rpcs;
- }
+ @Override
+ public void onFailure(final Throwable t) {
+ logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t);
}
});
}
- private void updateDeviceState(boolean up, Set<QName> capabilities) {
- checkDataStoreState();
-
- DataModificationTransaction transaction = dataBroker.beginTransaction();
+ private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
+ return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
+ }
- CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
- it.setQName(INVENTORY_NODE);
- it.addLeaf(INVENTORY_ID, name);
- it.addLeaf(INVENTORY_CONNECTED, up);
+ @VisibleForTesting
+ void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
+ messageTransformer = new NetconfMessageTransformer(result, true);
- logger.debug("Client capabilities {}", capabilities);
- for (QName capability : capabilities) {
- it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
- }
+ updateTransformer(messageTransformer);
+ // salFacade.onDeviceConnected has to be called before the notification handler is initialized
+ salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp(messageTransformer);
- logger.debug("Update device state transaction " + transaction.getIdentifier()
- + " putting operational data started.");
- transaction.removeOperationalData(path);
- transaction.putOperationalData(path, it.toInstance());
- logger.debug("Update device state transaction " + transaction.getIdentifier()
- + " putting operational data ended.");
-
- // FIXME: this has to be asynchronous
- RpcResult<TransactionStatus> transactionStatus = null;
- try {
- transactionStatus = transaction.commit().get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
- // TODO better ex handling
+ logger.info("{}: Netconf connector initialized successfully", id);
+ }
- if (transactionStatus.isSuccessful()) {
- logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
- } else {
- logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
- logger.debug("Update device state transaction status " + transaction.getStatus());
- }
+ private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ logger.error("{}: Initialization in sal failed, disconnecting from device", id, t);
+ listener.close();
+ onRemoteSessionDown();
+ resetMessageTransformer();
}
- @Override
- public CompositeNode readConfigurationData(InstanceIdentifier path) {
- RpcResult<CompositeNode> result = null;
- try {
- result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
- wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
+ /**
+ * Set the transformer to null as is in initial state
+ */
+ private void resetMessageTransformer() {
+ updateTransformer(null);
+ }
- CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return data == null ? null : (CompositeNode) findNode(data, path);
+ private void updateTransformer(final MessageTransformer<NetconfMessage> transformer) {
+ messageTransformer = transformer;
}
- @Override
- public CompositeNode readOperationalData(InstanceIdentifier path) {
- RpcResult<CompositeNode> result = null;
- try {
- result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
+ private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) {
+ final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc);
+ for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
+ sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider,
+ PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
}
-
- CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return (CompositeNode) findNode(data, path);
}
@Override
- public Set<QName> getSupportedRpcs() {
- return Collections.emptySet();
- }
+ public void onRemoteSessionDown() {
+ notificationHandler.onRemoteSchemaDown();
- @Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
+ salFacade.onDeviceDisconnected();
+ for (final SchemaSourceRegistration<? extends SchemaSourceRepresentation> sourceRegistration : sourceRegistrations) {
+ sourceRegistration.close();
+ }
+ resetMessageTransformer();
}
@Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return Collections.emptySet();
+ public void onRemoteSessionFailed(final Throwable throwable) {
+ salFacade.onDeviceFailed(throwable);
}
@Override
- public void onSessionInitiated(ProviderSession session) {
- dataBroker = session.getService(DataBrokerService.class);
-
- processingExecutor.submit(new Runnable() {
- @Override
- public void run() {
- updateInitialState();
- }
- });
-
- mountService = session.getService(MountProvisionService.class);
- if (mountService != null) {
- mountInstance = mountService.createOrGetMountPoint(path);
+ public void onNotification(final NetconfMessage notification) {
+ notificationHandler.handleNotification(notification);
+ }
+
+ /**
+ * Just a transfer object containing schema related dependencies. Injected in constructor.
+ */
+ public static class SchemaResourcesDTO {
+ private final SchemaSourceRegistry schemaRegistry;
+ private final SchemaContextFactory schemaContextFactory;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+
+ public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry, final SchemaContextFactory schemaContextFactory, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+ this.schemaRegistry = Preconditions.checkNotNull(schemaRegistry);
+ this.schemaContextFactory = Preconditions.checkNotNull(schemaContextFactory);
+ this.stateSchemasResolver = Preconditions.checkNotNull(stateSchemasResolver);
}
- }
- private void updateInitialState() {
- checkDataStoreState();
-
- DataModificationTransaction transaction = dataBroker.beginTransaction();
- if (operationalNodeNotExisting(transaction)) {
- transaction.putOperationalData(path, getNodeWithId());
- }
- if (configurationNodeNotExisting(transaction)) {
- transaction.putConfigurationData(path, getNodeWithId());
+ public SchemaSourceRegistry getSchemaRegistry() {
+ return schemaRegistry;
}
- try {
- transaction.commit().get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
+ public SchemaContextFactory getSchemaContextFactory() {
+ return schemaContextFactory;
}
- }
-
- private void checkDataStoreState() {
- // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
- dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
- Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build()); }
-
- CompositeNode getNodeWithId() {
- SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
- return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
- }
-
- boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
- return null == transaction.readConfigurationData(path);
- }
- boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
- return null == transaction.readOperationalData(path);
+ public NetconfStateSchemas.NetconfStateSchemasResolver getStateSchemasResolver() {
+ return stateSchemasResolver;
+ }
}
- static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
+ /**
+ * Schema building callable.
+ */
+ private static class DeviceSourcesResolver implements Callable<DeviceSources> {
- Node<?> current = node;
- for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
- if (current instanceof SimpleNode<?>) {
- return null;
- } else if (current instanceof CompositeNode) {
- CompositeNode currentComposite = (CompositeNode) current;
+ private final NetconfDeviceRpc deviceRpc;
+ private final NetconfSessionPreferences remoteSessionCapabilities;
+ private final RemoteDeviceId id;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
- current = currentComposite.getFirstCompositeByName(arg.getNodeType());
- if (current == null) {
- current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
- }
- if (current == null) {
- current = currentComposite.getFirstSimpleByName(arg.getNodeType());
- }
- if (current == null) {
- current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
- }
- if (current == null) {
- return null;
- }
- }
+ DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities,
+ final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+ this.deviceRpc = deviceRpc;
+ this.remoteSessionCapabilities = remoteSessionCapabilities;
+ this.id = id;
+ this.stateSchemasResolver = stateSchemasResolver;
}
- return current;
- }
- @Override
- public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
- DataModification<InstanceIdentifier, CompositeNode> modification) {
- NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
- modification, true, rollbackSupported);
- try {
- twoPhaseCommit.prepare();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
+ public DeviceSourcesResolver(final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver, final NetconfDeviceRpc rpcForMonitoring) {
+ this(rpcForMonitoring, remoteSessionCapabilities, id, stateSchemasResolver);
}
- return twoPhaseCommit;
- }
-
- Set<QName> getCapabilities(Collection<String> capabilities) {
- return FluentIterable.from(capabilities).filter(new Predicate<String>() {
- @Override
- public boolean apply(final String capability) {
- return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
- }
- }).transform(new Function<String, QName>() {
- @Override
- public QName apply(final String capability) {
- String[] parts = capability.split("\\?");
- String namespace = parts[0];
- FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
- String revision = getStringAndTransform(queryParams, "revision=", "revision=");
+ @Override
+ public DeviceSources call() throws Exception {
+ final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
+ logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
- String moduleName = getStringAndTransform(queryParams, "module=", "module=");
+ final Set<QName> requiredSources = Sets.newHashSet(remoteSessionCapabilities.getModuleBasedCaps());
+ final Set<QName> providedSources = availableSchemas.getAvailableYangSchemasQNames();
- if (revision == null) {
- logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
- revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
-
- if (revision != null) {
- logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
- }
- }
- if (revision == null) {
- return QName.create(URI.create(namespace), null, moduleName);
- }
- return QName.create(namespace, revision, moduleName);
+ final Set<QName> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
+ if (!requiredSourcesNotProvided.isEmpty()) {
+ logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}",
+ id, requiredSourcesNotProvided);
+ logger.warn("{}: Attempting to build schema context from required sources", id);
}
- private String getStringAndTransform(final Iterable<String> queryParams, final String match,
- final String substringToRemove) {
- Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
- @Override
- public boolean apply(final String input) {
- return input.startsWith(match);
- }
- });
-
- return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
+ // Here all the sources reported in netconf monitoring are merged with those reported in hello.
+ // It is necessary to perform this since submodules are not mentioned in hello but still required.
+ // This clashes with the option of a user to specify supported yang models manually in configuration for netconf-connector
+ // and as a result one is not able to fully override yang models of a device. It is only possible to add additional models.
+ final Set<QName> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
+ if (!providedSourcesNotRequired.isEmpty()) {
+ logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}",
+ id, providedSourcesNotRequired);
+ logger.warn("{}: Adding provided but not required sources as required to prevent failures", id);
+ logger.debug("{}: Netconf device reported in hello: {}", id, requiredSources);
+ requiredSources.addAll(providedSourcesNotRequired);
}
- }).toSet();
- }
-
- @Override
- public void close() {
- bringDown();
+ return new DeviceSources(requiredSources, providedSources);
+ }
}
- public String getName() {
- return name;
- }
+ /**
+ * Contains RequiredSources - sources from capabilities.
+ */
+ private static final class DeviceSources {
+ private final Set<QName> requiredSources;
+ private final Set<QName> providedSources;
- public InetSocketAddress getSocketAddress() {
- return socketAddress;
- }
+ public DeviceSources(final Set<QName> requiredSources, final Set<QName> providedSources) {
+ this.requiredSources = requiredSources;
+ this.providedSources = providedSources;
+ }
- public MountProvisionInstance getMountInstance() {
- return mountInstance;
- }
+ public Set<QName> getRequiredSourcesQName() {
+ return requiredSources;
+ }
- public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
- this.reconnectStrategy = reconnectStrategy;
- }
+ public Set<QName> getProvidedSourcesQName() {
+ return providedSources;
+ }
- public void setProcessingExecutor(final ExecutorService processingExecutor) {
- this.processingExecutor = processingExecutor;
- }
+ public Collection<SourceIdentifier> getRequiredSources() {
+ return Collections2.transform(requiredSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ }
- public void setSocketAddress(final InetSocketAddress socketAddress) {
- this.socketAddress = socketAddress;
- }
+ public Collection<SourceIdentifier> getProvidedSources() {
+ return Collections2.transform(providedSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ }
- public void setEventExecutor(final EventExecutor eventExecutor) {
- this.eventExecutor = eventExecutor;
}
- public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
- this.schemaSourceProvider = schemaSourceProvider;
- }
+ /**
+ * Schema builder that tries to build schema context from provided sources or biggest subset of it.
+ */
+ private final class RecursiveSchemaSetup implements Runnable {
+ private final DeviceSources deviceSources;
+ private final NetconfSessionPreferences remoteSessionCapabilities;
+ private final RemoteDeviceCommunicator<NetconfMessage> listener;
+ private final NetconfDeviceCapabilities capabilities;
- public void setDispatcher(final NetconfClientDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
+ public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ this.deviceSources = deviceSources;
+ this.remoteSessionCapabilities = remoteSessionCapabilities;
+ this.listener = listener;
+ this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
+ }
- public void setClientConfig(final NetconfClientConfiguration clientConfig) {
- this.clientConfig = clientConfig;
- }
+ @Override
+ public void run() {
+ setUpSchema(deviceSources.getRequiredSources());
+ }
- public void setDataProviderService(final DataProviderService dataProviderService) {
- this.dataProviderService = dataProviderService;
- }
-}
+ /**
+ * Recursively build schema context, in case of success or final failure notify device
+ */
+ // FIXME reimplement without recursion
+ private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
+ logger.trace("{}: Trying to build schema context from {}", id, requiredSources);
+
+ // If no more sources, fail
+ if(requiredSources.isEmpty()) {
+ final IllegalStateException cause = new IllegalStateException(id + ": No more sources for schema context");
+ handleSalInitializationFailure(cause, listener);
+ salFacade.onDeviceFailed(cause);
+ return;
+ }
-class NetconfDeviceSchemaContextProvider {
+ final CheckedFuture<SchemaContext, SchemaResolutionException> schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources);
- NetconfDevice device;
+ final FutureCallback<SchemaContext> RecursiveSchemaBuilderCallback = new FutureCallback<SchemaContext>() {
- SchemaSourceProvider<InputStream> sourceProvider;
+ @Override
+ public void onSuccess(final SchemaContext result) {
+ logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
+ final Collection<QName> filteredQNames = Sets.difference(deviceSources.getProvidedSourcesQName(), capabilities.getUnresolvedCapabilites().keySet());
+ capabilities.addCapabilities(filteredQNames);
+ capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
+ handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result));
+ }
- Optional<SchemaContext> currentContext;
+ @Override
+ public void onFailure(final Throwable t) {
+ // In case source missing, try without it
+ if (t instanceof MissingSchemaSourceException) {
+ final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
+ logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource);
+ setUpSchema(stripMissingSource(requiredSources, missingSource));
+
+ // In case resolution error, try only with resolved sources
+ } else if (t instanceof SchemaResolutionException) {
+ // TODO check for infinite loop
+ final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
+ final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve);
+ logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
+ setUpSchema(resolutionException.getResolvedSources());
+ // unknown error, fail
+ } else {
+ handleSalInitializationFailure(t, listener);
+ }
+ }
+ };
- NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
- this.device = device;
- this.sourceProvider = sourceProvider;
- this.currentContext = Optional.absent();
- }
+ Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
+ }
- void createContextFromCapabilities(Iterable<QName> capabilities) {
- YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
- if (!sourceContext.getMissingSources().isEmpty()) {
- device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
+ private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
+ return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
}
- device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
- List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
- if (!sourceContext.getValidSources().isEmpty()) {
- SchemaContext schemaContext = tryToCreateContext(modelsToParse);
- currentContext = Optional.fromNullable(schemaContext);
- } else {
- currentContext = Optional.absent();
+
+ private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
+ final LinkedList<SourceIdentifier> sourceIdentifiers = Lists.newLinkedList(requiredSources);
+ final boolean removed = sourceIdentifiers.remove(sIdToRemove);
+ Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources);
+ return sourceIdentifiers;
}
- if (currentContext.isPresent()) {
- device.logger.debug("Schema context successfully created.");
+
+ private Collection<QName> getQNameFromSourceIdentifiers(final Collection<SourceIdentifier> identifiers) {
+ final Collection<QName> qNames = Collections2.transform(identifiers, new Function<SourceIdentifier, QName>() {
+ @Override
+ public QName apply(final SourceIdentifier sourceIdentifier) {
+ return getQNameFromSourceIdentifier(sourceIdentifier);
+ }
+ });
+
+ if (qNames.isEmpty()) {
+ logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
+ }
+ return qNames;
}
- }
- SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
- YangParserImpl parser = new YangParserImpl();
- try {
+ private QName getQNameFromSourceIdentifier(final SourceIdentifier identifier) {
+ // Required sources are all required and provided merged in DeviceSourcesResolver
+ for (final QName qname : deviceSources.getRequiredSourcesQName()) {
+ if(qname.getLocalName().equals(identifier.getName()) == false) {
+ continue;
+ }
- Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
- return parser.resolveSchemaContext(models);
- } catch (Exception e) {
- device.logger.debug("Error occured during parsing YANG schemas", e);
- return null;
+ if(identifier.getRevision().equals(SourceIdentifier.NOT_PRESENT_FORMATTED_REVISION) &&
+ qname.getRevision() == null) {
+ return qname;
+ }
+
+ if (qname.getFormattedRevision().equals(identifier.getRevision())) {
+ return qname;
+ }
+ }
+ throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier + " Available: " + deviceSources.getRequiredSourcesQName());
}
}
}