*/
package org.opendaylight.netconf.sal.connect.netconf.listener;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.api.FailedNetconfMessage;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NetconfDeviceCommunicator
- implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
-
+public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
- protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
- private final Optional<UserPreferences> overrideNetconfCapabilities;
+ protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
+ private final @Nullable UserPreferences overrideNetconfCapabilities;
protected final RemoteDeviceId id;
private final Lock sessionLock = new ReentrantLock();
private final Queue<Request> requests = new ArrayDeque<>();
private NetconfClientSession currentSession;
- private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
+ private final SettableFuture<Empty> firstConnectionFuture = SettableFuture.create();
private Future<?> taskFuture;
// isSessionClosing indicates a close operation on the session is issued and
return closing != 0;
}
- public NetconfDeviceCommunicator(
- final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
+ public NetconfDeviceCommunicator(final RemoteDeviceId id,
+ final RemoteDevice<NetconfDeviceCommunicator> remoteDevice,
final UserPreferences netconfSessionPreferences, final int rpcMessageLimit) {
- this(id, remoteDevice, Optional.of(netconfSessionPreferences), rpcMessageLimit);
+ this(id, remoteDevice, rpcMessageLimit, requireNonNull(netconfSessionPreferences));
}
- public NetconfDeviceCommunicator(
- final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
- final int rpcMessageLimit) {
- this(id, remoteDevice, Optional.empty(), rpcMessageLimit);
+ public NetconfDeviceCommunicator(final RemoteDeviceId id,
+ final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
+ this(id, remoteDevice, rpcMessageLimit, null);
}
- private NetconfDeviceCommunicator(
- final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
- final Optional<UserPreferences> overrideNetconfCapabilities, final int rpcMessageLimit) {
+ public NetconfDeviceCommunicator(final RemoteDeviceId id,
+ final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
+ final @Nullable UserPreferences overrideNetconfCapabilities) {
concurentRpcMsgs = rpcMessageLimit;
this.id = id;
this.remoteDevice = remoteDevice;
this.overrideNetconfCapabilities = overrideNetconfCapabilities;
- firstConnectionFuture = SettableFuture.create();
semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
}
LOG.trace("{}: Session advertised capabilities: {}", id,
netconfSessionPreferences);
- if (overrideNetconfCapabilities.isPresent()) {
- final NetconfSessionPreferences sessionPreferences = overrideNetconfCapabilities
- .get().getSessionPreferences();
- netconfSessionPreferences = overrideNetconfCapabilities.get().moduleBasedCapsOverrided()
+ final var localOverride = overrideNetconfCapabilities;
+ if (localOverride != null) {
+ final var sessionPreferences = localOverride.getSessionPreferences();
+ netconfSessionPreferences = localOverride.moduleBasedCapsOverrided()
? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
: netconfSessionPreferences.addModuleCaps(sessionPreferences);
- netconfSessionPreferences = overrideNetconfCapabilities.get().nonModuleBasedCapsOverrided()
+ netconfSessionPreferences = localOverride.nonModuleBasedCapsOverrided()
? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
: netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
}
remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
- if (!firstConnectionFuture.isDone()) {
- firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities());
- }
} finally {
sessionLock.unlock();
}
+
+ // FIXME: right, except ... this does not include the device schema setup, so is it really useful?
+ if (!firstConnectionFuture.set(Empty.value())) {
+ LOG.trace("{}: First connection already completed", id);
+ }
}
/**
* @return a ListenableFuture that returns success on first successful connection and failure when the underlying
* reconnecting strategy runs out of reconnection attempts
*/
- public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(
- final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
+ public ListenableFuture<Empty> initializeRemoteConnection(final NetconfClientDispatcher dispatcher,
+ final NetconfClientConfiguration config) {
final Future<?> connectFuture;
if (config instanceof NetconfReconnectingClientConfiguration) {
if (Strings.isNullOrEmpty(reason)) {
future.set(createSessionDownRpcResult());
} else {
- future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT, reason));
+ future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason));
}
}
}
private RpcResult<NetconfMessage> createSessionDownRpcResult() {
- return createErrorRpcResult(RpcError.ErrorType.TRANSPORT,
+ return createErrorRpcResult(ErrorType.TRANSPORT,
String.format("The netconf session to %1$s is disconnected", id.getName()));
}
- private static RpcResult<NetconfMessage> createErrorRpcResult(final RpcError.ErrorType errorType,
- final String message) {
+ private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
return RpcResultBuilder.<NetconfMessage>failed()
- .withError(errorType, ErrorTag.OPERATION_FAILED.elementBody(), message).build();
+ .withError(errorType, ErrorTag.OPERATION_FAILED, message).build();
}
@Override
future.cause());
if (future.cause() != null) {
- req.future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT,
- future.cause().getLocalizedMessage()));
+ req.future.set(createErrorRpcResult(ErrorType.TRANSPORT, future.cause().getLocalizedMessage()));
} else {
req.future.set(createSessionDownRpcResult()); // assume session is down
}