Remove redundant code constructs
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / listener / NetconfDeviceCommunicator.java
index 83ee068eb65e77c9e76ba6f434946e89408823d5..7968fcda978255007baaf65ce23cecea3826f541 100644 (file)
@@ -22,12 +22,13 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.opendaylight.controller.config.util.xml.XmlElement;
-import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.opendaylight.netconf.api.FailedNetconfMessage;
 import org.opendaylight.netconf.api.NetconfDocumentedException;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.api.xml.XmlElement;
 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.netconf.api.xml.XmlUtil;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientSession;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
@@ -58,7 +59,7 @@ public class NetconfDeviceCommunicator
     private final int concurentRpcMsgs;
 
     private final Queue<Request> requests = new ArrayDeque<>();
-    private NetconfClientSession session;
+    private NetconfClientSession currentSession;
 
     private Future<?> initFuture;
     private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
@@ -85,7 +86,7 @@ public class NetconfDeviceCommunicator
             final RemoteDeviceId id,
             final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
             final int rpcMessageLimit) {
-        this(id, remoteDevice, Optional.<UserPreferences>absent(), rpcMessageLimit);
+        this(id, remoteDevice, Optional.absent(), rpcMessageLimit);
     }
 
     private NetconfDeviceCommunicator(
@@ -105,7 +106,7 @@ public class NetconfDeviceCommunicator
         sessionLock.lock();
         try {
             LOG.debug("{}: Session established", id);
-            this.session = session;
+            currentSession = session;
 
             NetconfSessionPreferences netconfSessionPreferences =
                                              NetconfSessionPreferences.fromNetconfSession(session);
@@ -166,8 +167,8 @@ public class NetconfDeviceCommunicator
 
     public void disconnect() {
         // If session is already in closing, no need to close it again
-        if (session != null && isSessionClosing.compareAndSet(false, true)) {
-            session.close();
+        if (currentSession != null && isSessionClosing.compareAndSet(false, true)) {
+            currentSession.close();
         }
     }
 
@@ -179,8 +180,8 @@ public class NetconfDeviceCommunicator
         final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
         sessionLock.lock();
         try {
-            if (session != null) {
-                session = null;
+            if (currentSession != null) {
+                currentSession = null;
                 /*
                  * Walk all requests, check if they have been executing
                  * or cancelled and remove them from the queue.
@@ -274,7 +275,7 @@ public class NetconfDeviceCommunicator
         try {
             request = requests.peek();
             if (request != null && request.future.isUncancellable()) {
-                requests.poll();
+                request = requests.poll();
                 // we have just removed one request from the queue
                 // we can also release one permit
                 if (semaphore != null) {
@@ -291,6 +292,11 @@ public class NetconfDeviceCommunicator
 
         if (request != null) {
 
+            if (FailedNetconfMessage.class.isInstance(message)) {
+                request.future.set(NetconfMessageTransformUtil.toRpcResult((FailedNetconfMessage) message));
+                return;
+            }
+
             LOG.debug("{}: Message received {}", id, message);
 
             if (LOG.isTraceEnabled()) {
@@ -337,18 +343,15 @@ public class NetconfDeviceCommunicator
     @Override
     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
         sessionLock.lock();
-
-        if (semaphore != null && !semaphore.tryAcquire()) {
-            LOG.warn("Limit of concurrent rpc messages was reached (limit :"
-                    + concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id"
-                    + id.getName());
-            sessionLock.unlock();
-            return Futures.immediateFailedFuture(new NetconfDocumentedException(
-                    "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
-                            + ") waiting for emptying the queue of Netconf device with id" + id.getName()));
-        }
-
         try {
+            if (semaphore != null && !semaphore.tryAcquire()) {
+                LOG.warn("Limit of concurrent rpc messages was reached (limit :" + concurentRpcMsgs
+                    + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName());
+                return Futures.immediateFailedFuture(new NetconfDocumentedException(
+                        "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
+                        + ") waiting for emptying the queue of Netconf device with id" + id.getName()));
+            }
+
             return sendRequestWithLock(message, rpc);
         } finally {
             sessionLock.unlock();
@@ -361,7 +364,7 @@ public class NetconfDeviceCommunicator
             LOG.trace("{}: Sending message {}", id, msgToS(message));
         }
 
-        if (session == null) {
+        if (currentSession == null) {
             LOG.warn("{}: Session is disconnected, failing RPC request {}",
                     id, message);
             return Futures.immediateFuture(createSessionDownRpcResult());
@@ -370,7 +373,7 @@ public class NetconfDeviceCommunicator
         final Request req = new Request(new UncancellableFuture<>(true), message);
         requests.add(req);
 
-        session.sendMessage(req.request).addListener(future -> {
+        currentSession.sendMessage(req.request).addListener(future -> {
             if (!future.isSuccess()) {
                 // We expect that a session down will occur at this point
                 LOG.debug("{}: Failed to send request {}", id,
@@ -401,6 +404,10 @@ public class NetconfDeviceCommunicator
     }
 
     private static boolean isNotification(final NetconfMessage message) {
+        if (message.getDocument() == null) {
+            // We have no message, which mean we have a FailedNetconfMessage
+            return false;
+        }
         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
     }