Bump odlparent to 5.0.0
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / listener / NetconfDeviceCommunicator.java
index 0e240dde64d74b728da856540d8f57c0f71a96bb..929232a8a29951250382148d5912f4983a987e76 100644 (file)
@@ -9,17 +9,16 @@ package org.opendaylight.netconf.sal.connect.netconf.listener;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.concurrent.Future;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.opendaylight.netconf.api.FailedNetconfMessage;
@@ -38,6 +37,7 @@ import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -61,18 +61,20 @@ public class NetconfDeviceCommunicator
     private final Queue<Request> requests = new ArrayDeque<>();
     private NetconfClientSession currentSession;
 
-    private Future<?> initFuture;
     private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
+    private Future<?> initFuture;
 
     // isSessionClosing indicates a close operation on the session is issued and
     // tearDown will surely be called later to finish the close.
     // Used to allow only one thread to enter tearDown and other threads should
     // NOT enter it simultaneously and should end its close operation without
     // calling tearDown to release the locks they hold to avoid deadlock.
-    private final AtomicBoolean isSessionClosing = new AtomicBoolean(false);
+    private static final AtomicIntegerFieldUpdater<NetconfDeviceCommunicator> CLOSING_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(NetconfDeviceCommunicator.class, "closing");
+    private volatile int closing;
 
-    public Boolean isSessionClosing() {
-        return isSessionClosing.get();
+    public boolean isSessionClosing() {
+        return closing != 0;
     }
 
     public NetconfDeviceCommunicator(
@@ -86,7 +88,7 @@ public class NetconfDeviceCommunicator
             final RemoteDeviceId id,
             final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
             final int rpcMessageLimit) {
-        this(id, remoteDevice, Optional.<UserPreferences>absent(), rpcMessageLimit);
+        this(id, remoteDevice, Optional.absent(), rpcMessageLimit);
     }
 
     private NetconfDeviceCommunicator(
@@ -167,7 +169,7 @@ public class NetconfDeviceCommunicator
 
     public void disconnect() {
         // If session is already in closing, no need to close it again
-        if (currentSession != null && isSessionClosing.compareAndSet(false, true)) {
+        if (currentSession != null && startClosing() && currentSession.isUp()) {
             currentSession.close();
         }
     }
@@ -177,7 +179,7 @@ public class NetconfDeviceCommunicator
             LOG.warn("It's curious that no one to close the session but tearDown is called!");
         }
         LOG.debug("Tearing down {}", reason);
-        final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
+        final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = new ArrayList<>();
         sessionLock.lock();
         try {
             if (currentSession != null) {
@@ -214,7 +216,7 @@ public class NetconfDeviceCommunicator
             }
         }
 
-        isSessionClosing.set(false);
+        closing = 0;
     }
 
     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
@@ -231,7 +233,7 @@ public class NetconfDeviceCommunicator
     @Override
     public void onSessionDown(final NetconfClientSession session, final Exception exception) {
         // If session is already in closing, no need to call tearDown again.
-        if (isSessionClosing.compareAndSet(false, true)) {
+        if (startClosing()) {
             LOG.warn("{}: Session went down", id, exception);
             tearDown(null);
         }
@@ -345,9 +347,9 @@ public class NetconfDeviceCommunicator
         sessionLock.lock();
         try {
             if (semaphore != null && !semaphore.tryAcquire()) {
-                LOG.warn("Limit of concurrent rpc messages was reached (limit :" + concurentRpcMsgs
-                    + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName());
-                return Futures.immediateFailedFuture(new NetconfDocumentedException(
+                LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
+                    + "Discarding request of Netconf device with id {}", concurentRpcMsgs, id.getName());
+                return FluentFutures.immediateFailedFluentFuture(new NetconfDocumentedException(
                         "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
                         + ") waiting for emptying the queue of Netconf device with id" + id.getName()));
             }
@@ -367,7 +369,7 @@ public class NetconfDeviceCommunicator
         if (currentSession == null) {
             LOG.warn("{}: Session is disconnected, failing RPC request {}",
                     id, message);
-            return Futures.immediateFuture(createSessionDownRpcResult());
+            return FluentFutures.immediateFluentFuture(createSessionDownRpcResult());
         }
 
         final Request req = new Request(new UncancellableFuture<>(true), message);
@@ -422,4 +424,8 @@ public class NetconfDeviceCommunicator
             this.request = request;
         }
     }
+
+    private boolean startClosing() {
+        return CLOSING_UPDATER.compareAndSet(this, 0, 1);
+    }
 }