import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.api.FailedNetconfMessage;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NetconfDeviceCommunicator
- implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
-
+public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
- protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
- private final Optional<UserPreferences> overrideNetconfCapabilities;
+ protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
+ private final @Nullable UserPreferences overrideNetconfCapabilities;
protected final RemoteDeviceId id;
private final Lock sessionLock = new ReentrantLock();
private final Queue<Request> requests = new ArrayDeque<>();
private NetconfClientSession currentSession;
- private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
+ private final SettableFuture<Empty> firstConnectionFuture = SettableFuture.create();
private Future<?> taskFuture;
// isSessionClosing indicates a close operation on the session is issued and
return closing != 0;
}
- public NetconfDeviceCommunicator(
- final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
- final UserPreferences netconfSessionPreferences, final int rpcMessageLimit) {
- this(id, remoteDevice, Optional.of(netconfSessionPreferences), rpcMessageLimit);
- }
-
- public NetconfDeviceCommunicator(
- final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
- final int rpcMessageLimit) {
- this(id, remoteDevice, Optional.empty(), rpcMessageLimit);
+ public NetconfDeviceCommunicator(final RemoteDeviceId id,
+ final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
+ this(id, remoteDevice, rpcMessageLimit, null);
}
- private NetconfDeviceCommunicator(
- final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
- final Optional<UserPreferences> overrideNetconfCapabilities, final int rpcMessageLimit) {
+ public NetconfDeviceCommunicator(final RemoteDeviceId id,
+ final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
+ final @Nullable UserPreferences overrideNetconfCapabilities) {
concurentRpcMsgs = rpcMessageLimit;
this.id = id;
this.remoteDevice = remoteDevice;
this.overrideNetconfCapabilities = overrideNetconfCapabilities;
- firstConnectionFuture = SettableFuture.create();
semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
}
LOG.debug("{}: Session established", id);
currentSession = session;
- NetconfSessionPreferences netconfSessionPreferences =
- NetconfSessionPreferences.fromNetconfSession(session);
- LOG.trace("{}: Session advertised capabilities: {}", id,
- netconfSessionPreferences);
+ var netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session);
+ LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
- if (overrideNetconfCapabilities.isPresent()) {
- final NetconfSessionPreferences sessionPreferences = overrideNetconfCapabilities
- .get().getSessionPreferences();
- netconfSessionPreferences = overrideNetconfCapabilities.get().moduleBasedCapsOverrided()
+ final var localOverride = overrideNetconfCapabilities;
+ if (localOverride != null) {
+ final var sessionPreferences = localOverride.sessionPreferences();
+ netconfSessionPreferences = localOverride.overrideModuleCapabilities()
? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
: netconfSessionPreferences.addModuleCaps(sessionPreferences);
- netconfSessionPreferences = overrideNetconfCapabilities.get().nonModuleBasedCapsOverrided()
+ netconfSessionPreferences = localOverride.overrideNonModuleCapabilities()
? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
: netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
}
remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
- if (!firstConnectionFuture.isDone()) {
- firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities());
- }
} finally {
sessionLock.unlock();
}
+
+ // FIXME: right, except ... this does not include the device schema setup, so is it really useful?
+ if (!firstConnectionFuture.set(Empty.value())) {
+ LOG.trace("{}: First connection already completed", id);
+ }
}
/**
* @return a ListenableFuture that returns success on first successful connection and failure when the underlying
* reconnecting strategy runs out of reconnection attempts
*/
- public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(
- final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
+ public ListenableFuture<Empty> initializeRemoteConnection(final NetconfClientDispatcher dispatcher,
+ final NetconfClientConfiguration config) {
final Future<?> connectFuture;
if (config instanceof NetconfReconnectingClientConfiguration) {
private RpcResult<NetconfMessage> createSessionDownRpcResult() {
return createErrorRpcResult(ErrorType.TRANSPORT,
- String.format("The netconf session to %1$s is disconnected", id.getName()));
+ String.format("The netconf session to %1$s is disconnected", id.name()));
}
private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
try {
if (semaphore != null && !semaphore.tryAcquire()) {
LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
- + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.getName());
+ + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.name());
return FluentFutures.immediateFailedFluentFuture(new NetconfDocumentedException(
"Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
- + ") waiting for emptying the queue of Netconf device with id: " + id.getName()));
+ + ") waiting for emptying the queue of Netconf device with id: " + id.name()));
}
return sendRequestWithLock(message, rpc);