import com.google.common.base.Optional;
import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.concurrent.Future;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.opendaylight.netconf.api.FailedNetconfMessage;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
private final Queue<Request> requests = new ArrayDeque<>();
private NetconfClientSession currentSession;
- private Future<?> initFuture;
private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
+ private Future<?> initFuture;
// isSessionClosing indicates a close operation on the session is issued and
// tearDown will surely be called later to finish the close.
// Used to allow only one thread to enter tearDown and other threads should
// NOT enter it simultaneously and should end its close operation without
// calling tearDown to release the locks they hold to avoid deadlock.
- private final AtomicBoolean isSessionClosing = new AtomicBoolean(false);
+ private static final AtomicIntegerFieldUpdater<NetconfDeviceCommunicator> CLOSING_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(NetconfDeviceCommunicator.class, "closing");
+ private volatile int closing;
- public Boolean isSessionClosing() {
- return isSessionClosing.get();
+ public boolean isSessionClosing() {
+ return closing != 0;
}
public NetconfDeviceCommunicator(
final RemoteDeviceId id,
final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
final int rpcMessageLimit) {
- this(id, remoteDevice, Optional.<UserPreferences>absent(), rpcMessageLimit);
+ this(id, remoteDevice, Optional.absent(), rpcMessageLimit);
}
private NetconfDeviceCommunicator(
public void disconnect() {
// If session is already in closing, no need to close it again
- if (currentSession != null && isSessionClosing.compareAndSet(false, true)) {
+ if (currentSession != null && startClosing() && currentSession.isUp()) {
currentSession.close();
}
}
LOG.warn("It's curious that no one to close the session but tearDown is called!");
}
LOG.debug("Tearing down {}", reason);
- final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
+ final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = new ArrayList<>();
sessionLock.lock();
try {
if (currentSession != null) {
}
}
- isSessionClosing.set(false);
+ closing = 0;
}
private RpcResult<NetconfMessage> createSessionDownRpcResult() {
@Override
public void onSessionDown(final NetconfClientSession session, final Exception exception) {
// If session is already in closing, no need to call tearDown again.
- if (isSessionClosing.compareAndSet(false, true)) {
+ if (startClosing()) {
LOG.warn("{}: Session went down", id, exception);
tearDown(null);
}
sessionLock.lock();
try {
if (semaphore != null && !semaphore.tryAcquire()) {
- LOG.warn("Limit of concurrent rpc messages was reached (limit :" + concurentRpcMsgs
- + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName());
- return Futures.immediateFailedFuture(new NetconfDocumentedException(
+ LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
+ + "Discarding request of Netconf device with id {}", concurentRpcMsgs, id.getName());
+ return FluentFutures.immediateFailedFluentFuture(new NetconfDocumentedException(
"Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
+ ") waiting for emptying the queue of Netconf device with id" + id.getName()));
}
if (currentSession == null) {
LOG.warn("{}: Session is disconnected, failing RPC request {}",
id, message);
- return Futures.immediateFuture(createSessionDownRpcResult());
+ return FluentFutures.immediateFluentFuture(createSessionDownRpcResult());
}
final Request req = new Request(new UncancellableFuture<>(true), message);
this.request = request;
}
}
+
+ private boolean startClosing() {
+ return CLOSING_UPDATER.compareAndSet(this, 0, 1);
+ }
}