BUG-372: Rework sal-netconf-connector 58/5258/16
authorRobert Varga <rovarga@cisco.com>
Tue, 11 Feb 2014 07:44:46 +0000 (08:44 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 14 Apr 2014 07:47:52 +0000 (07:47 +0000)
This makes the connector truly asynchronous and able to work with
backing datastore going away. It will retry connecting infintely to the
backed device, too.

Change-Id: I843620bf63eeade231698c22592aaec0521a09ad
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/YangModelInputStreamAdapter.java [deleted file]
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfTerminationReason.java

index 3eb0472..12e8b55 100644 (file)
@@ -13,19 +13,18 @@ import io.netty.util.concurrent.EventExecutor
 import java.io.InputStream
 import java.net.InetSocketAddress
 import java.net.URI
+import java.util.ArrayList
+import java.util.Collection
 import java.util.Collections
 import java.util.List
 import java.util.Set
 import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
 import org.opendaylight.controller.md.sal.common.api.data.DataModification
 import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import org.opendaylight.controller.netconf.client.NetconfClient
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.controller.netconf.util.xml.XmlUtil
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
 import org.opendaylight.controller.sal.core.api.Provider
 import org.opendaylight.controller.sal.core.api.RpcImplementation
 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
@@ -45,7 +44,6 @@ import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
 import org.opendaylight.yangtools.yang.model.api.SchemaContext
 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
 import org.slf4j.Logger
@@ -55,16 +53,13 @@ import static com.google.common.base.Preconditions.*
 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
 
 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
-import com.google.common.util.concurrent.Futures
 
-class NetconfDevice implements Provider, // 
+class NetconfDevice implements Provider, //
 DataReader<InstanceIdentifier, CompositeNode>, //
 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
 RpcImplementation, //
 AutoCloseable {
 
-    var NetconfClient client;
-
     @Property
     var InetSocketAddress socketAddress;
 
@@ -94,15 +89,12 @@ AutoCloseable {
     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
+    List<RpcRegistration> rpcReg
 
+    @Property
     val String name
-    MountProvisionService mountService
-
-    int messegeRetryCount = 5;
-
-    int messageTimeoutCount = 5 * 1000;
 
-    Set<QName> cachedCapabilities
+    MountProvisionService mountService
 
     @Property
     var NetconfClientDispatcher dispatcher
@@ -111,11 +103,13 @@ AutoCloseable {
 
     @Property
     var SchemaSourceProvider<InputStream> remoteSourceProvider
-    
+
     DataBrokerService dataBroker
 
+    var NetconfDeviceListener listener;
+
     public new(String name) {
-        this.name = name;
+        this._name = name;
         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
@@ -126,10 +120,11 @@ AutoCloseable {
         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
         checkState(eventExecutor != null, "Event executor must be set.");
 
-        val listener = new NetconfDeviceListener(this);
-        val task = startClientTask(dispatcher, listener)
-        return processingExecutor.submit(task) as Future<Void>;
+        listener = new NetconfDeviceListener(this);
 
+        logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
+
+        dispatcher.createClient(socketAddress, listener, reconnectStrategy);
     }
 
     def Optional<SchemaContext> getSchemaContext() {
@@ -139,59 +134,65 @@ AutoCloseable {
         return deviceContextProvider.currentContext;
     }
 
-    private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
-        return [ |
-            try {
-                logger.info("Starting Netconf Client on: {}", socketAddress);
-                client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
-                logger.debug("Initial capabilities {}", initialCapabilities);
-                var SchemaSourceProvider<String> delegate;
-                if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
-                    delegate = new NetconfRemoteSchemaSourceProvider(this);
-                }  else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
-                    delegate = new NetconfRemoteSchemaSourceProvider(this);
-                } else {
-                    logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
-                    delegate = SchemaSourceProviders.<String>noopProvider();
-                }
-                remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-                deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
-                deviceContextProvider.createContextFromCapabilities(initialCapabilities);
-                if (mountInstance != null && schemaContext.isPresent) {
-                    mountInstance.schemaContext = schemaContext.get();
-                    val operations = schemaContext.get().operations;
-                    for (rpc : operations) {
-                        mountInstance.addRpcImplementation(rpc.QName, this);
-                    }
-                }
-                updateDeviceState()
-                if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
-                    confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
-                    operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
-                    commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
-                }
-            } catch (Exception e) {
-                logger.error("Netconf client NOT started. ", e)
+    def bringDown() {
+        if (rpcReg != null) {
+            for (reg : rpcReg) {
+                reg.close()
+            }
+            rpcReg = null
+        }
+        confReaderReg?.close()
+        confReaderReg = null
+        operReaderReg?.close()
+        operReaderReg = null
+        commitHandlerReg?.close()
+        commitHandlerReg = null
+
+        updateDeviceState(false, Collections.emptySet())
+    }
+
+    def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
+        remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+        deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
+        deviceContextProvider.createContextFromCapabilities(capabilities);
+        if (mountInstance != null && schemaContext.isPresent) {
+            mountInstance.schemaContext = schemaContext.get();
+        }
+
+        updateDeviceState(true, capabilities)
+
+        if (mountInstance != null) {
+            confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
+            operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+            commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
+
+            val rpcs = new ArrayList<RpcRegistration>();
+            for (rpc : mountInstance.schemaContext.operations) {
+                rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
             }
-        ]
+            rpcReg = rpcs
+        }
     }
 
-    private def updateDeviceState() {
+    private def updateDeviceState(boolean up, Set<QName> capabilities) {
         val transaction = dataBroker.beginTransaction
 
         val it = ImmutableCompositeNode.builder
         setQName(INVENTORY_NODE)
         addLeaf(INVENTORY_ID, name)
-        addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
+        addLeaf(INVENTORY_CONNECTED, up)
 
-        logger.debug("Client capabilities {}", client.capabilities)
-        for (capability : client.capabilities) {
+        logger.debug("Client capabilities {}", capabilities)
+        for (capability : capabilities) {
             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
         }
 
         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
+        transaction.removeOperationalData(path)
         transaction.putOperationalData(path, it.toInstance)
         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
+
+        // FIXME: this has to be asynchronous
         val transactionStatus = transaction.commit.get;
 
         if (transactionStatus.successful) {
@@ -219,29 +220,8 @@ AutoCloseable {
         Collections.emptySet;
     }
 
-//    def createSubscription(String streamName) {
-//        val it = ImmutableCompositeNode.builder()
-//        QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
-//        addLeaf("stream", streamName);
-//        invokeRpc(QName, toInstance())
-//    }
-
     override invokeRpc(QName rpc, CompositeNode input) {
-        try {
-            val message = rpc.toRpcMessage(input,schemaContext);
-            val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
-            return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext));
-        } catch (Exception e) {
-            logger.error("Rpc was not processed correctly.", e)
-            throw e;
-        }
-    }
-
-    def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
-        logger.debug("Send message {}",XmlUtil.toString(message.document))
-        val result = client.sendMessage(message, retryCount, timeout);
-        NetconfMapping.checkValidReply(message, result)
-        return result;
+        return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
     }
 
     override getProviderFunctionality() {
@@ -284,7 +264,7 @@ AutoCloseable {
                 return null;
             } else if (current instanceof CompositeNode) {
                 val currentComposite = (current as CompositeNode);
-                
+
                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
                 if(current == null) {
                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
@@ -303,18 +283,13 @@ AutoCloseable {
     }
 
     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
-        val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
+        val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
         twoPhaseCommit.prepare()
         return twoPhaseCommit;
     }
 
-    def getInitialCapabilities() {
-        val capabilities = client?.capabilities;
-        if (capabilities == null) {
-            return null;
-        }
-        if (cachedCapabilities == null) {
-            cachedCapabilities = FluentIterable.from(capabilities).filter[
+    def getCapabilities(Collection<String> capabilities) {
+        return FluentIterable.from(capabilities).filter[
                 contains("?") && contains("module=") && contains("revision=")].transform [
                 val parts = split("\\?");
                 val namespace = parts.get(0);
@@ -333,14 +308,10 @@ AutoCloseable {
                 }
                 return QName.create(namespace, revision, moduleName);
             ].toSet();
-        }
-        return cachedCapabilities;
     }
 
     override close() {
-        confReaderReg?.close()
-        operReaderReg?.close()
-        client?.close()
+        bringDown()
     }
 }
 
index 13cd5db..d5e1d35 100644 (file)
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+
 import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+class NetconfDeviceListener implements NetconfClientSessionListener {
+    private static final class Request {
+        final UncancellableFuture<RpcResult<CompositeNode>> future;
+        final NetconfMessage request;
+
+        private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
+            this.future = future;
+            this.request = request;
+        }
+    }
 
-class NetconfDeviceListener extends AbstractNetconfClientNotifySessionListener {
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
+    private final Queue<Request> requests = new ArrayDeque<>();
     private final NetconfDevice device;
+    private NetconfClientSession session;
 
     public NetconfDeviceListener(final NetconfDevice device) {
         this.device = Preconditions.checkNotNull(device);
     }
 
-    /**
-     * Method intended to customize notification processing.
-     * 
-     * @param session
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     * @param message
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     */
     @Override
-    public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
-        this.device.logger.debug("Received NETCONF notification.", message);
-        CompositeNode domNotification = null;
-        if (message != null) {
-            domNotification = NetconfMapping.toNotificationNode(message, device.getSchemaContext());
-        }
-        if (domNotification != null) {
-            MountProvisionInstance _mountInstance = null;
-            if (this.device != null) {
-                _mountInstance = this.device.getMountInstance();
+    public synchronized void onSessionUp(final NetconfClientSession session) {
+        LOG.debug("Session with {} established as address {} session-id {}",
+                device.getName(), device.getSocketAddress(), session.getSessionId());
+
+        final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
+        LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
+
+        // Select the appropriate provider
+        final SchemaSourceProvider<String> delegate;
+        if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
+            delegate = new NetconfRemoteSchemaSourceProvider(device);
+        } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
+            delegate = new NetconfRemoteSchemaSourceProvider(device);
+        } else {
+            LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
+            delegate = SchemaSourceProviders.<String>noopProvider();
+        }
+
+        device.bringUp(delegate, caps);
+
+        this.session = session;
+    }
+
+    private synchronized void tearDown(final Exception e) {
+        session = null;
+
+        /*
+         * Walk all requests, check if they have been executing
+         * or cancelled and remove them from the queue.
+         */
+        final Iterator<Request> it = requests.iterator();
+        while (it.hasNext()) {
+            final Request r = it.next();
+            if (r.future.isUncancellable()) {
+                // FIXME: add a RpcResult instead?
+                r.future.setException(e);
+                it.remove();
+            } else if (r.future.isCancelled()) {
+                // This just does some house-cleaning
+                it.remove();
             }
-            if (_mountInstance != null) {
-                _mountInstance.publish(domNotification);
+        }
+
+        device.bringDown();
+    }
+
+    @Override
+    public void onSessionDown(final NetconfClientSession session, final Exception e) {
+        LOG.debug("Session with {} went down", device.getName(), e);
+        tearDown(e);
+    }
+
+    @Override
+    public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+        LOG.debug("Session with {} terminated {}", session, reason);
+        tearDown(new RuntimeException(reason.getErrorMessage()));
+    }
+
+    @Override
+    public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+        /*
+         * Dispatch between notifications and messages. Messages need to be processed
+         * with lock held, notifications do not.
+         */
+        if (isNotification(message)) {
+            processNotification(message);
+        } else {
+            processMessage(message);
+        }
+    }
+
+    private synchronized void processMessage(final NetconfMessage message) {
+        final Request r = requests.peek();
+        if (r.future.isUncancellable()) {
+            requests.poll();
+            LOG.debug("Matched {} to {}", r.request, message);
+
+            // FIXME: this can throw exceptions, which should result
+            // in the future failing
+            NetconfMapping.checkValidReply(r.request, message);
+            r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
+                    Collections.<RpcError>emptyList()));
+        } else {
+            LOG.warn("Ignoring unsolicited message", message);
+        }
+    }
+
+    synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+        if (session == null) {
+            LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
+            return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
+                @Override
+                public boolean isSuccessful() {
+                    return false;
+                }
+
+                @Override
+                public CompositeNode getResult() {
+                    return null;
+                }
+
+                @Override
+                public Collection<RpcError> getErrors() {
+                    // FIXME: indicate that the session is down
+                    return Collections.emptySet();
+                }
+            });
+        }
+
+        final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
+        requests.add(req);
+
+        session.sendMessage(req.request).addListener(new FutureListener<Void>() {
+            @Override
+            public void operationComplete(final Future<Void> future) throws Exception {
+                if (!future.isSuccess()) {
+                    // We expect that a session down will occur at this point
+                    LOG.debug("Failed to send request {}", req.request, future.cause());
+                    req.future.setException(future.cause());
+                } else {
+                    LOG.trace("Finished sending request {}", req.request);
+                }
             }
+        });
+
+        return req.future;
+    }
+
+    /**
+     * Process an incoming notification.
+     *
+     * @param notification Notification message
+     */
+    private void processNotification(final NetconfMessage notification) {
+        this.device.logger.debug("Received NETCONF notification.", notification);
+        CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
+        if (domNotification == null) {
+            return;
+        }
+
+        MountProvisionInstance mountInstance =  this.device.getMountInstance();
+        if (mountInstance != null) {
+            mountInstance.publish(domNotification);
         }
     }
+
+    private static boolean isNotification(final NetconfMessage message) {
+        final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
+        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
+    }
 }
index 9ec3aa3..5f14c26 100644 (file)
@@ -42,17 +42,18 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
+class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
-    private final NetconfDevice device;
     private final DataModification<InstanceIdentifier, CompositeNode> modification;
-    private final boolean candidateSupported = true;
+    private final NetconfDevice device;
+    private final boolean candidateSupported;
 
     public NetconfDeviceTwoPhaseCommitTransaction(NetconfDevice device,
-            DataModification<InstanceIdentifier, CompositeNode> modification) {
-        super();
-        this.device = device;
-        this.modification = modification;
+            DataModification<InstanceIdentifier, CompositeNode> modification,
+            boolean candidateSupported) {
+        this.device = Preconditions.checkNotNull(device);
+        this.modification = Preconditions.checkNotNull(modification);
+        this.candidateSupported = candidateSupported;
     }
 
     void prepare() throws InterruptedException, ExecutionException {
@@ -62,7 +63,6 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac
         for(Entry<InstanceIdentifier, CompositeNode> toUpdate : modification.getUpdatedConfigurationData().entrySet()) {
             sendMerge(toUpdate.getKey(),toUpdate.getValue());
         }
-
     }
 
     private void sendMerge(InstanceIdentifier key, CompositeNode value) throws InterruptedException, ExecutionException {
@@ -80,7 +80,6 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac
 
         RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()).get();
         Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful");
-
     }
 
     private CompositeNodeBuilder<ImmutableCompositeNode> configurationRpcBuilder() {
index e5a24fc..228a01e 100644 (file)
@@ -39,8 +39,8 @@ class NetconfMapping {
     public static val NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0")
     public static val NETCONF_MONITORING_URI = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring"
     public static val NETCONF_NOTIFICATION_URI = URI.create("urn:ietf:params:xml:ns:netconf:notification:1.0")
-    
-    
+
+
     public static val NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf");
     public static val NETCONF_RPC_QNAME = QName.create(NETCONF_QNAME, "rpc");
     public static val NETCONF_GET_QNAME = QName.create(NETCONF_QNAME, "get");
@@ -51,15 +51,15 @@ class NetconfMapping {
     public static val NETCONF_DELETE_CONFIG_QNAME = QName.create(NETCONF_QNAME, "delete-config");
     public static val NETCONF_OPERATION_QNAME = QName.create(NETCONF_QNAME, "operation");
     public static val NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit");
-    
+
     public static val NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config");
     public static val NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source");
     public static val NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target");
-    
+
     public static val NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate");
     public static val NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running");
-    
-    
+
+
     public static val NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply");
     public static val NETCONF_OK_QNAME = QName.create(NETCONF_QNAME, "ok");
     public static val NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
@@ -78,7 +78,7 @@ class NetconfMapping {
         if(identifier.path.empty) {
             return null;
         }
-        
+
         for (component : identifier.path.reverseView) {
             val Node<?> current = component.toNode(previous);
             previous = current;
@@ -106,11 +106,11 @@ class NetconfMapping {
     }
 
     static def CompositeNode toCompositeNode(NetconfMessage message,Optional<SchemaContext> ctx) {
-        //TODO: implement general normalization to normalize incoming Netconf Message 
+        //TODO: implement general normalization to normalize incoming Netconf Message
         // for Schema Context counterpart
         return null
     }
-    
+
     static def CompositeNode toNotificationNode(NetconfMessage message,Optional<SchemaContext> ctx) {
         if (ctx.present) {
             val schemaContext = ctx.get
@@ -127,56 +127,53 @@ class NetconfMapping {
         w3cPayload.documentElement.setAttribute("message-id", "m-" + messageId.andIncrement)
         return new NetconfMessage(w3cPayload);
     }
-    
+
     def static flattenInput(CompositeNode node) {
         val inputQName = QName.create(node.nodeType,"input");
         val input = node.getFirstCompositeByName(inputQName);
         if(input == null) return node;
         if(input instanceof CompositeNode) {
-            
+
             val nodes = ImmutableList.builder() //
                 .addAll(input.children) //
                 .addAll(node.children.filter[nodeType != inputQName]) //
                 .build()
             return ImmutableCompositeNode.create(node.nodeType,nodes);
-        } 
-        
+        }
+
     }
 
     static def RpcResult<CompositeNode> toRpcResult(NetconfMessage message,QName rpc,Optional<SchemaContext> context) {
         var CompositeNode rawRpc;
         if(context.present) {
             if(isDataRetrievalReply(rpc)) {
-                
+
                 val xmlData = message.document.dataSubtree
                 val dataNodes = XmlDocumentUtils.toDomNodes(xmlData, Optional.of(context.get.dataDefinitions))
-                
+
                 val it = ImmutableCompositeNode.builder()
                 setQName(NETCONF_RPC_REPLY_QNAME)
                 add(ImmutableCompositeNode.create(NETCONF_DATA_QNAME, dataNodes));
-                
+
                 rawRpc = it.toInstance;
                 //sys(xmlData)
             } else {
                 val rpcSchema = context.get.operations.findFirst[QName == rpc]
                 rawRpc = message.document.toCompositeNode() as CompositeNode;
             }
-            
-            
-            
         } else {
             rawRpc = message.document.toCompositeNode() as CompositeNode;
         }
         //rawRpc.
         return Rpcs.getRpcResult(true, rawRpc, Collections.emptySet());
     }
-    
+
     def static Element getDataSubtree(Document doc) {
         doc.getElementsByTagNameNS(NETCONF_URI.toString,"data").item(0) as Element
     }
-    
+
     def static boolean isDataRetrievalReply(QName it) {
-        return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName) 
+        return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName)
     }
 
     static def wrap(QName name, Node<?> node) {
@@ -209,12 +206,12 @@ class NetconfMapping {
     public static def Node<?> toCompositeNode(Document document) {
         return XmlDocumentUtils.toDomNode(document) as Node<?>
     }
-    
+
     public static def checkValidReply(NetconfMessage input, NetconfMessage output) {
         val inputMsgId = input.document.documentElement.getAttribute("message-id")
         val outputMsgId = output.document.documentElement.getAttribute("message-id")
         Preconditions.checkState(inputMsgId == outputMsgId,"Rpc request and reply message IDs must be same.");
-        
+
     }
-    
+
 }
index 1932726..c734e80 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
-import java.util.Set;
+import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 
 import org.opendaylight.yangtools.yang.common.QName;
@@ -19,6 +19,7 @@ import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String> {
 
@@ -27,11 +28,10 @@ class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String>
     public static final QName GET_SCHEMA_QNAME = QName.create(IETF_NETCONF_MONITORING, "get-schema");
     public static final QName GET_DATA_QNAME = QName.create(IETF_NETCONF_MONITORING, "data");
 
-    NetconfDevice device;
+    private final NetconfDevice device;
 
     public NetconfRemoteSchemaSourceProvider(NetconfDevice device) {
-        super();
-        this.device = device;
+        this.device = Preconditions.checkNotNull(device);
     }
 
     @Override
@@ -73,7 +73,7 @@ class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String>
         return null;
     }
 
-    public static final boolean isSupportedFor(Set<QName> capabilities) {
+    public static final boolean isSupportedFor(Collection<QName> capabilities) {
         return capabilities.contains(IETF_NETCONF_MONITORING);
     }
 }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java
new file mode 100644 (file)
index 0000000..c353f86
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+
+final class UncancellableFuture<V> extends AbstractFuture<V> {
+    @GuardedBy("this")
+    private boolean uncancellable = false;
+
+    public UncancellableFuture(boolean uncancellable) {
+        this.uncancellable = uncancellable;
+    }
+
+    public synchronized boolean setUncancellable() {
+        if (isCancelled()) {
+            return false;
+        }
+
+        uncancellable = true;
+        return true;
+    }
+
+    public synchronized boolean isUncancellable() {
+        return uncancellable;
+    }
+
+    @Override
+    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+        if (uncancellable) {
+            return false;
+        }
+
+        return super.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public synchronized boolean set(@Nullable V value) {
+        Preconditions.checkState(uncancellable == true);
+        return super.set(value);
+    }
+
+    @Override
+    protected boolean setException(Throwable throwable) {
+        Preconditions.checkState(uncancellable == true);
+        return super.setException(throwable);
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/YangModelInputStreamAdapter.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/YangModelInputStreamAdapter.java
deleted file mode 100644 (file)
index 23892e1..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.opendaylight.yangtools.concepts.Delegator;
-import org.opendaylight.yangtools.yang.common.QName;
-
-import com.google.common.base.Charsets;
-
-/**
- *
- *
- */
-public class YangModelInputStreamAdapter extends InputStream implements Delegator<InputStream> {
-
-    final String source;
-    final QName moduleIdentifier;
-    final InputStream delegate;
-
-    private YangModelInputStreamAdapter(String source, QName moduleIdentifier, InputStream delegate) {
-        super();
-        this.source = source;
-        this.moduleIdentifier = moduleIdentifier;
-        this.delegate = delegate;
-    }
-
-    @Override
-    public int read() throws IOException {
-        return delegate.read();
-    }
-
-    @Override
-    public int hashCode() {
-        return delegate.hashCode();
-    }
-
-    @Override
-    public int read(byte[] b) throws IOException {
-        return delegate.read(b);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        return delegate.equals(obj);
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        return delegate.read(b, off, len);
-    }
-
-    @Override
-    public long skip(long n) throws IOException {
-        return delegate.skip(n);
-    }
-
-    @Override
-    public int available() throws IOException {
-        return delegate.available();
-    }
-
-    @Override
-    public void close() throws IOException {
-        delegate.close();
-    }
-
-    @Override
-    public void mark(int readlimit) {
-        delegate.mark(readlimit);
-    }
-
-    @Override
-    public void reset() throws IOException {
-        delegate.reset();
-    }
-
-    @Override
-    public boolean markSupported() {
-        return delegate.markSupported();
-    }
-
-    @Override
-    public InputStream getDelegate() {
-        return delegate;
-    }
-
-    @Override
-    public String toString() {
-        return "YangModelInputStreamAdapter [moduleIdentifier=" + moduleIdentifier + ", delegate=" + delegate + "]";
-    }
-
-    public static YangModelInputStreamAdapter create(QName name, String module) {
-        return new YangModelInputStreamAdapter(null, name, new ByteArrayInputStream(module.getBytes(Charsets.UTF_8)));
-    }
-}
index 9de3071..a15f9e0 100644 (file)
@@ -22,4 +22,9 @@ public class NetconfTerminationReason implements TerminationReason {
     public String getErrorMessage() {
         return reason;
     }
+
+    @Override
+    public String toString() {
+        return reason;
+    }
 }