Merge "Do not duplicate OSGi dependencyManagement"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
index aa5c6f40a9a4f827c88aa97d9a72551d6e8fd94f..9a5b239024c5bb0cbca3798de58ccc102a994224 100644 (file)
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
-
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
-import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.sal.connect.api.MessageTransformer;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
+import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.concurrent.EventExecutor;
-
-public class NetconfDevice implements Provider, //
-        DataReader<InstanceIdentifier, CompositeNode>, //
-        DataCommitHandler<InstanceIdentifier, CompositeNode>, //
-        RpcImplementation, //
-        AutoCloseable {
-
-    InetSocketAddress socketAddress;
-
-    MountProvisionInstance mountInstance;
-
-    EventExecutor eventExecutor;
-
-    ExecutorService processingExecutor;
-
-    InstanceIdentifier path;
-
-    ReconnectStrategy reconnectStrategy;
-
-    AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
-
-    private NetconfDeviceSchemaContextProvider deviceContextProvider;
-
-    protected Logger logger;
-
-    Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
-    Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
-    Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
-    List<RpcRegistration> rpcReg;
-
-    String name;
-
-    MountProvisionService mountService;
-
-    NetconfClientDispatcher dispatcher;
-
-    static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
-
-    SchemaSourceProvider<InputStream> remoteSourceProvider;
-
-    DataBrokerService dataBroker;
-
-    NetconfDeviceListener listener;
-
-    private boolean rollbackSupported;
+/**
+ *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
+ */
+public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
 
+    private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
 
-    public NetconfDevice(String name) {
-        this.name = name;
-        this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
-        this.path = InstanceIdentifier.builder(INVENTORY_PATH)
-                .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
+    public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION = new Function<QName, SourceIdentifier>() {
+        @Override
+        public SourceIdentifier apply(final QName input) {
+            return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision()));
+        }
+    };
+
+    private final RemoteDeviceId id;
+    private final boolean reconnectOnSchemasChange;
+
+    private final SchemaContextFactory schemaContextFactory;
+    private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
+    private final ListeningExecutorService processingExecutor;
+    private final SchemaSourceRegistry schemaRegistry;
+    private final MessageTransformer<NetconfMessage> messageTransformer;
+    private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+    private final NotificationHandler notificationHandler;
+    private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
+
+    public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+                         final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
+        this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false);
+    }
+
+    // FIXME reduce parameters
+    public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+                         final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer, final boolean reconnectOnSchemasChange) {
+        this.id = id;
+        this.reconnectOnSchemasChange = reconnectOnSchemasChange;
+        this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
+        this.messageTransformer = messageTransformer;
+        this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
+        this.salFacade = salFacade;
+        this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
+        this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
+        this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
     }
 
-    public void start() {
-        checkState(dispatcher != null, "Dispatcher must be set.");
-        checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
-        checkState(eventExecutor != null, "Event executor must be set.");
+    @Override
+    public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
+                                  final NetconfDeviceCommunicator listener) {
+        // SchemaContext setup has to be performed in a dedicated thread since
+        // we are in a netty thread in this method
+        // Yang models are being downloaded in this method and it would cause a
+        // deadlock if we used the netty thread
+        // http://netty.io/wiki/thread-model.html
+        logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
+
+        final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(listener);
+
+        final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver);
+        final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
+
+        if(shouldListenOnSchemaChange(remoteSessionCapabilities)) {
+           registerToBaseNetconfStream(deviceRpc, listener);
+        }
 
-        listener = new NetconfDeviceListener(this);
+        final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
+            @Override
+            public void onSuccess(final DeviceSources result) {
+                addProvidedSourcesToSchemaRegistry(deviceRpc, result);
+                setUpSchema(result);
+            }
 
-        logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
+            private void setUpSchema(final DeviceSources result) {
+                processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener));
+            }
 
-        dispatcher.createClient(socketAddress, listener, reconnectStrategy);
-    }
+            @Override
+            public void onFailure(final Throwable t) {
+                logger.warn("{}: Unexpected error resolving device sources: {}", id, t);
+                handleSalInitializationFailure(t, listener);
+            }
+        };
 
-    Optional<SchemaContext> getSchemaContext() {
-        if (deviceContextProvider == null) {
-            return Optional.absent();
-        }
-        return deviceContextProvider.currentContext;
-    }
+        Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
 
-    void bringDown() {
-        if (rpcReg != null) {
-            for (RpcRegistration reg : rpcReg) {
-                reg.close();
-            }
-            rpcReg = null;
-        }
-        closeGracefully(confReaderReg);
-        confReaderReg = null;
-        closeGracefully(operReaderReg);
-        operReaderReg = null;
-        closeGracefully(commitHandlerReg);
-        commitHandlerReg = null;
-
-        updateDeviceState(false, Collections.<QName> emptySet());
     }
 
-    private void closeGracefully(final AutoCloseable resource) {
-        if (resource != null) {
-            try {
-                resource.close();
-            } catch (Exception e) {
-                logger.warn("Ignoring exception while closing {}", resource, e);
-            }
-        }
-    }
+    private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
+       final ListenableFuture<RpcResult<CompositeNode>> rpcResultListenableFuture =
+                deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
 
-    void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
-        // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
-        // Reason: delegate.getSchema blocks thread when waiting for response
-        // however, if the netty thread is blocked, no incoming message can be processed
-        // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
-        // TODO redesign +refactor
-        processingExecutor.submit(new Runnable() {
+        final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
             @Override
-            public void run() {
-                NetconfDevice.this.rollbackSupported = rollbackSupported;
-                remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-                deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
-                deviceContextProvider.createContextFromCapabilities(capabilities);
-                if (mountInstance != null && getSchemaContext().isPresent()) {
-                    mountInstance.setSchemaContext(getSchemaContext().get());
+            public Optional<CompositeNode> filterNotification(final CompositeNode notification) {
+                if (isCapabilityChanged(notification)) {
+                    logger.info("{}: Schemas change detected, reconnecting", id);
+                    // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
+                    listener.disconnect();
+                    return Optional.absent();
                 }
+                return Optional.of(notification);
+            }
 
-                updateDeviceState(true, capabilities);
+            private boolean isCapabilityChanged(final CompositeNode notification) {
+                return notification.getNodeType().equals(NetconfCapabilityChange.QNAME);
+            }
+        };
 
-                if (mountInstance != null) {
-                    confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
-                    operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
-                    commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
+        Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+            @Override
+            public void onSuccess(final RpcResult<CompositeNode> result) {
+                notificationHandler.addNotificationFilter(filter);
+            }
 
-                    List<RpcRegistration> rpcs = new ArrayList<>();
-                    // TODO same condition twice
-                    if (mountInstance != null && getSchemaContext().isPresent()) {
-                        for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
-                            rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
-                        }
-                    }
-                    rpcReg = rpcs;
-                }
+            @Override
+            public void onFailure(final Throwable t) {
+                logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t);
             }
         });
     }
 
-    private void updateDeviceState(boolean up, Set<QName> capabilities) {
-        DataModificationTransaction transaction = dataBroker.beginTransaction();
-
-        CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
-        it.setQName(INVENTORY_NODE);
-        it.addLeaf(INVENTORY_ID, name);
-        it.addLeaf(INVENTORY_CONNECTED, up);
+    private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
+        return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
+    }
 
-        logger.debug("Client capabilities {}", capabilities);
-        for (QName capability : capabilities) {
-            it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
-        }
+    private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
+        updateMessageTransformer(result);
+        salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
+        notificationHandler.onRemoteSchemaUp();
 
-        logger.debug("Update device state transaction " + transaction.getIdentifier()
-                + " putting operational data started.");
-        transaction.removeOperationalData(path);
-        transaction.putOperationalData(path, it.toInstance());
-        logger.debug("Update device state transaction " + transaction.getIdentifier()
-                + " putting operational data ended.");
-
-        // FIXME: this has to be asynchronous
-        RpcResult<TransactionStatus> transactionStatus = null;
-        try {
-            transactionStatus = transaction.commit().get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
-        }
-        // TODO better ex handling
+        logger.info("{}: Netconf connector initialized successfully", id);
+    }
 
-        if (transactionStatus.isSuccessful()) {
-            logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
-        } else {
-            logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
-            logger.debug("Update device state transaction status " + transaction.getStatus());
-        }
+    private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+        logger.error("{}: Initialization in sal failed, disconnecting from device", id, t);
+        listener.close();
+        onRemoteSessionDown();
+        resetMessageTransformer();
     }
 
-    @Override
-    public CompositeNode readConfigurationData(InstanceIdentifier path) {
-        RpcResult<CompositeNode> result = null;
-        try {
-            result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
-                    wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
-        }
+    /**
+     * Set the schema context inside transformer to null as is in initial state
+     */
+    private void resetMessageTransformer() {
+        updateMessageTransformer(null);
+    }
 
-        CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
-        return data == null ? null : (CompositeNode) findNode(data, path);
+    /**
+     * Update initial message transformer to use retrieved schema
+     */
+    private void updateMessageTransformer(final SchemaContext currentSchemaContext) {
+        messageTransformer.onGlobalContextUpdated(currentSchemaContext);
     }
 
-    @Override
-    public CompositeNode readOperationalData(InstanceIdentifier path) {
-        RpcResult<CompositeNode> result = null;
-        try {
-            result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
+    private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) {
+        final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc);
+        for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
+            sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider,
+                    PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
         }
-
-        CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
-        return (CompositeNode) findNode(data, path);
     }
 
-    @Override
-    public Set<QName> getSupportedRpcs() {
-        return Collections.emptySet();
+    private NetconfDeviceRpc setUpDeviceRpc(final RemoteDeviceCommunicator<NetconfMessage> listener) {
+       return new NetconfDeviceRpc(listener, messageTransformer);
     }
 
     @Override
-    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-        return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
+    public void onRemoteSessionDown() {
+        salFacade.onDeviceDisconnected();
+        for (final SchemaSourceRegistration<? extends SchemaSourceRepresentation> sourceRegistration : sourceRegistrations) {
+            sourceRegistration.close();
+        }
+        resetMessageTransformer();
     }
 
     @Override
-    public Collection<ProviderFunctionality> getProviderFunctionality() {
-        return Collections.emptySet();
+    public void onRemoteSessionFailed(Throwable throwable) {
+        salFacade.onDeviceFailed(throwable);
     }
 
     @Override
-    public void onSessionInitiated(ProviderSession session) {
-        dataBroker = session.getService(DataBrokerService.class);
-
-        DataModificationTransaction transaction = dataBroker.beginTransaction();
-        if (operationalNodeNotExisting(transaction)) {
-            transaction.putOperationalData(path, getNodeWithId());
-        }
-        if (configurationNodeNotExisting(transaction)) {
-            transaction.putConfigurationData(path, getNodeWithId());
+    public void onNotification(final NetconfMessage notification) {
+        notificationHandler.handleNotification(notification);
+    }
+
+    /**
+     * Just a transfer object containing schema related dependencies. Injected in constructor.
+     */
+    public static class SchemaResourcesDTO {
+        private final SchemaSourceRegistry schemaRegistry;
+        private final SchemaContextFactory schemaContextFactory;
+        private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+
+        public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry, final SchemaContextFactory schemaContextFactory, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+            this.schemaRegistry = Preconditions.checkNotNull(schemaRegistry);
+            this.schemaContextFactory = Preconditions.checkNotNull(schemaContextFactory);
+            this.stateSchemasResolver = Preconditions.checkNotNull(stateSchemasResolver);
         }
 
-        try {
-            transaction.commit().get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
+        public SchemaSourceRegistry getSchemaRegistry() {
+            return schemaRegistry;
         }
 
-        mountService = session.getService(MountProvisionService.class);
-        if (mountService != null) {
-            mountInstance = mountService.createOrGetMountPoint(path);
+        public SchemaContextFactory getSchemaContextFactory() {
+            return schemaContextFactory;
         }
-    }
-
-    CompositeNode getNodeWithId() {
-        SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
-        return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
-    }
-
-    boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
-        return null == transaction.readConfigurationData(path);
-    }
-
-    boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
-        return null == transaction.readOperationalData(path);
-    }
-
-    static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
 
-        Node<?> current = node;
-        for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
-            if (current instanceof SimpleNode<?>) {
-                return null;
-            } else if (current instanceof CompositeNode) {
-                CompositeNode currentComposite = (CompositeNode) current;
-
-                current = currentComposite.getFirstCompositeByName(arg.getNodeType());
-                if (current == null) {
-                    current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
-                }
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.getNodeType());
-                }
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
-                }
-                if (current == null) {
-                    return null;
-                }
-            }
+        public NetconfStateSchemas.NetconfStateSchemasResolver getStateSchemasResolver() {
+            return stateSchemasResolver;
         }
-        return current;
     }
 
-    @Override
-    public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
-            DataModification<InstanceIdentifier, CompositeNode> modification) {
-        NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
-                modification, true, rollbackSupported);
-        try {
-            twoPhaseCommit.prepare();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
+    /**
+     * Schema building callable.
+     */
+    private static class DeviceSourcesResolver implements Callable<DeviceSources> {
+        private final NetconfDeviceRpc deviceRpc;
+        private final NetconfSessionPreferences remoteSessionCapabilities;
+        private final RemoteDeviceId id;
+        private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
+
+        public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+            this.deviceRpc = deviceRpc;
+            this.remoteSessionCapabilities = remoteSessionCapabilities;
+            this.id = id;
+            this.stateSchemasResolver = stateSchemasResolver;
         }
-         return twoPhaseCommit;
-    }
 
-    Set<QName> getCapabilities(Collection<String> capabilities) {
-        return FluentIterable.from(capabilities).filter(new Predicate<String>() {
-            @Override
-            public boolean apply(final String capability) {
-                return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
-            }
-        }).transform(new Function<String, QName>() {
-            @Override
-            public QName apply(final String capability) {
-                String[] parts = capability.split("\\?");
-                String namespace = parts[0];
-                FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
+        @Override
+        public DeviceSources call() throws Exception {
 
-                String revision = getStringAndTransform(queryParams, "revision=", "revision=");
+            final Set<SourceIdentifier> requiredSources = Sets.newHashSet(Collections2.transform(
+                    remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION));
 
-                String moduleName = getStringAndTransform(queryParams, "module=", "module=");
+            // If monitoring is not supported, we will still attempt to create schema, sources might be already provided
+            final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
+            logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
 
-                if (revision == null) {
-                    logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
-                    revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
+            final Set<SourceIdentifier> providedSources = Sets.newHashSet(Collections2.transform(
+                    availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION));
 
-                    if (revision != null) {
-                        logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
-                    }
-                }
-                if (revision == null) {
-                    return QName.create(URI.create(namespace), null, moduleName);
-                }
-                return QName.create(namespace, revision, moduleName);
-            }
+            final Set<SourceIdentifier> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
 
-            private String getStringAndTransform(final Iterable<String> queryParams, final String match,
-                    final String substringToRemove) {
-                Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
-                    @Override
-                    public boolean apply(final String input) {
-                        return input.startsWith(match);
-                    }
-                });
-
-                return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
+            if (!requiredSourcesNotProvided.isEmpty()) {
+                logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}",
+                        id, requiredSourcesNotProvided);
+                logger.warn("{}: Attempting to build schema context from required sources", id);
             }
 
-        }).toSet();
-    }
 
-    @Override
-    public void close() {
-        bringDown();
-    }
+            // TODO should we perform this ? We have a mechanism to fix initialization of devices not reporting or required modules in hello
+            // That is overriding capabilities in configuration using attribute yang-module-capabilities
+            // This is more user friendly even though it clashes with attribute yang-module-capabilities
+            // Some devices do not report all required models in hello message, but provide them
+            final Set<SourceIdentifier> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
+            if (!providedSourcesNotRequired.isEmpty()) {
+                logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}",
+                        id, providedSourcesNotRequired);
+                logger.warn("{}: Adding provided but not required sources as required to prevent failures", id);
+                requiredSources.addAll(providedSourcesNotRequired);
+            }
 
-    public String getName() {
-        return name;
+            return new DeviceSources(requiredSources, providedSources);
+        }
     }
 
-    public InetSocketAddress getSocketAddress() {
-        return socketAddress;
-    }
+    /**
+     * Contains RequiredSources - sources from capabilities.
+     *
+     */
+    private static final class DeviceSources {
+        private final Collection<SourceIdentifier> requiredSources;
+        private final Collection<SourceIdentifier> providedSources;
 
-    public MountProvisionInstance getMountInstance() {
-        return mountInstance;
-    }
+        public DeviceSources(final Collection<SourceIdentifier> requiredSources, final Collection<SourceIdentifier> providedSources) {
+            this.requiredSources = requiredSources;
+            this.providedSources = providedSources;
+        }
 
-    public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
-        this.reconnectStrategy = reconnectStrategy;
-    }
+        public Collection<SourceIdentifier> getRequiredSources() {
+            return requiredSources;
+        }
 
-    public void setProcessingExecutor(final ExecutorService processingExecutor) {
-        this.processingExecutor = processingExecutor;
-    }
+        public Collection<SourceIdentifier> getProvidedSources() {
+            return providedSources;
+        }
 
-    public void setSocketAddress(final InetSocketAddress socketAddress) {
-        this.socketAddress = socketAddress;
     }
 
-    public void setEventExecutor(final EventExecutor eventExecutor) {
-        this.eventExecutor = eventExecutor;
-    }
+    /**
+     * Schema builder that tries to build schema context from provided sources or biggest subset of it.
+     */
+    private final class RecursiveSchemaSetup implements Runnable {
+        private final DeviceSources deviceSources;
+        private final NetconfSessionPreferences remoteSessionCapabilities;
+        private final NetconfDeviceRpc deviceRpc;
+        private final RemoteDeviceCommunicator<NetconfMessage> listener;
+        private NetconfDeviceCapabilities capabilities;
 
-    public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
-        this.schemaSourceProvider = schemaSourceProvider;
-    }
+        public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+            this.deviceSources = deviceSources;
+            this.remoteSessionCapabilities = remoteSessionCapabilities;
+            this.deviceRpc = deviceRpc;
+            this.listener = listener;
+            this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
+        }
 
-    public void setDispatcher(final NetconfClientDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-}
+        @Override
+        public void run() {
+            setUpSchema(deviceSources.getRequiredSources());
+        }
 
-class NetconfDeviceSchemaContextProvider {
+        /**
+         * Recursively build schema context, in case of success or final failure notify device
+         */
+        // FIXME reimplement without recursion
+        private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
+            logger.trace("{}: Trying to build schema context from {}", id, requiredSources);
+
+            // If no more sources, fail
+            if(requiredSources.isEmpty()) {
+                handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener);
+                return;
+            }
 
-    NetconfDevice device;
+            final CheckedFuture<SchemaContext, SchemaResolutionException> schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources);
 
-    SchemaSourceProvider<InputStream> sourceProvider;
+            final FutureCallback<SchemaContext> RecursiveSchemaBuilderCallback = new FutureCallback<SchemaContext>() {
 
-    Optional<SchemaContext> currentContext;
+                @Override
+                public void onSuccess(final SchemaContext result) {
+                    logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
+                    Collection<QName> filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet());
+                    capabilities.addCapabilities(filteredQNames);
+                    capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
+                    handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
+                }
 
-    NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
-        this.device = device;
-        this.sourceProvider = sourceProvider;
-        this.currentContext = Optional.absent();
-    }
+                @Override
+                public void onFailure(final Throwable t) {
+                    // In case source missing, try without it
+                    if (t instanceof MissingSchemaSourceException) {
+                        final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
+                        logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
+                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource);
+                        setUpSchema(stripMissingSource(requiredSources, missingSource));
+
+                    // In case resolution error, try only with resolved sources
+                    } else if (t instanceof SchemaResolutionException) {
+                        // TODO check for infinite loop
+                        final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
+                        final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
+                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve);
+                        logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
+                        setUpSchema(resolutionException.getResolvedSources());
+                    // unknown error, fail
+                    } else {
+                        handleSalInitializationFailure(t, listener);
+                    }
+                }
+            };
 
-    void createContextFromCapabilities(Iterable<QName> capabilities) {
-        YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
-        if (!sourceContext.getMissingSources().isEmpty()) {
-            device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
+            Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
         }
-        device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
-        List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
-        if (!sourceContext.getValidSources().isEmpty()) {
-            SchemaContext schemaContext = tryToCreateContext(modelsToParse);
-            currentContext = Optional.fromNullable(schemaContext);
-        } else {
-            currentContext = Optional.absent();
-        }
-        if (currentContext.isPresent()) {
-            device.logger.debug("Schema context successfully created.");
+
+        private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
+            final LinkedList<SourceIdentifier> sourceIdentifiers = Lists.newLinkedList(requiredSources);
+            final boolean removed = sourceIdentifiers.remove(sIdToRemove);
+            Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources);
+            return sourceIdentifiers;
         }
-    }
 
-    SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
-        YangParserImpl parser = new YangParserImpl();
-        try {
+        private Collection<QName> getQNameFromSourceIdentifiers(Collection<SourceIdentifier> identifiers) {
+            Collection<QName> qNames = new HashSet<>();
+            for (SourceIdentifier source : identifiers) {
+                Optional<QName> qname = getQNameFromSourceIdentifier(source);
+                if (qname.isPresent()) {
+                    qNames.add(qname.get());
+                }
+            }
+            if (qNames.isEmpty()) {
+                logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
+            }
+            return qNames;
+        }
 
-            Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
-            return parser.resolveSchemaContext(models);
-        } catch (Exception e) {
-            device.logger.debug("Error occured during parsing YANG schemas", e);
-            return null;
+        private Optional<QName> getQNameFromSourceIdentifier(SourceIdentifier identifier) {
+            for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) {
+                if (qname.getLocalName().equals(identifier.getName())
+                        && qname.getFormattedRevision().equals(identifier.getRevision())) {
+                    return Optional.of(qname);
+                }
+            }
+            throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier);
         }
     }
 }