import org.opendaylight.netconf.client.NetconfClientSession;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
}
@Override
- public Future<Void> createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
- return activateChannel(clientConfiguration);
+ public ReconnectFuture createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
+ return new SingleReconnectFuture(eventExecutor, activateChannel(clientConfiguration));
}
- private <V> Future<V> activateChannel(final NetconfClientConfiguration conf) {
+ private Future<NetconfClientSession> activateChannel(final NetconfClientConfiguration conf) {
final InetSocketAddress remoteAddr = conf.getAddress();
final CallHomeMountSessionContext context = getSessionManager().getByAddress(remoteAddr);
LOG.info("Activating NETCONF channel for ip {} device context {}", remoteAddr, context);
- if (context == null) {
- return new FailedFuture<>(eventExecutor, new NullPointerException());
- }
- return context.activateNetconfChannel(conf.getSessionListener());
+ return context == null ? new FailedFuture<>(eventExecutor, new NullPointerException())
+ : context.activateNetconfChannel(conf.getSessionListener());
}
void createTopology() {
.build();
}
- @SuppressWarnings("unchecked")
- <V> Promise<V> activateNetconfChannel(final NetconfClientSessionListener sessionListener) {
- return (Promise<V>) activator.activate(wrap(sessionListener));
+ Promise<NetconfClientSession> activateNetconfChannel(final NetconfClientSessionListener sessionListener) {
+ return activator.activate(wrap(sessionListener));
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.callhome.mount;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+final class SingleReconnectFuture extends DefaultPromise<Empty> implements ReconnectFuture {
+ private final Future<NetconfClientSession> sessionFuture;
+
+ SingleReconnectFuture(final EventExecutor eventExecutor, final Future<NetconfClientSession> sessionFuture) {
+ super(eventExecutor);
+ this.sessionFuture = requireNonNull(sessionFuture);
+ sessionFuture.addListener(future -> {
+ if (!isDone()) {
+ if (future.isCancelled()) {
+ cancel(false);
+ } else if (future.isSuccess()) {
+ setSuccess(Empty.getInstance());
+ } else {
+ setFailure(future.cause());
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ if (super.cancel(mayInterruptIfRunning)) {
+ if (!sessionFuture.isDone()) {
+ sessionFuture.cancel(mayInterruptIfRunning);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Future<?> firstSessionFuture() {
+ return sessionFuture;
+ }
+}
import io.netty.util.concurrent.Future;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
public interface NetconfClientDispatcher {
*/
Future<NetconfClientSession> createClient(NetconfClientConfiguration clientConfiguration);
- Future<Void> createReconnectingClient(NetconfReconnectingClientConfiguration clientConfiguration);
+ ReconnectFuture createReconnectingClient(NetconfReconnectingClientConfiguration clientConfiguration);
}
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public Future<Void> createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
+ public ReconnectFuture createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
switch (clientConfiguration.getProtocol()) {
case TCP:
return createReconnectingTcpClient(clientConfiguration);
currentConfiguration.getSessionListener()).initialize(ch, promise));
}
- private Future<Void> createReconnectingTcpClient(
+ private ReconnectFuture createReconnectingTcpClient(
final NetconfReconnectingClientConfiguration currentConfiguration) {
LOG.debug("Creating reconnecting TCP client with configuration: {}", currentConfiguration);
final TcpClientChannelInitializer init =
currentConfiguration.getSshClient()).initialize(ch, sessionPromise));
}
- private Future<Void> createReconnectingSshClient(
+ private ReconnectFuture createReconnectingSshClient(
final NetconfReconnectingClientConfiguration currentConfiguration) {
LOG.debug("Creating reconnecting SSH client with configuration: {}", currentConfiguration);
final SshClientChannelInitializer init = new SshClientChannelInitializer(currentConfiguration.getAuthHandler(),
.initialize(ch, sessionPromise));
}
- private Future<Void> createReconnectingTlsClient(
+ private ReconnectFuture createReconnectingTlsClient(
final NetconfReconnectingClientConfiguration currentConfiguration) {
LOG.debug("Creating reconnecting TLS client with configuration: {}", currentConfiguration);
final TlsClientChannelInitializer init = new TlsClientChannelInitializer(
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.netconf.nettyutil.ReconnectStrategy;
import org.opendaylight.netconf.nettyutil.ReconnectStrategyFactory;
import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
Future<NetconfClientSession> sshSession = dispatcher.createClient(cfg);
Future<NetconfClientSession> tcpSession = dispatcher.createClient(cfg2);
- Future<Void> sshReconn = dispatcher.createReconnectingClient(cfg);
- final Future<Void> tcpReconn = dispatcher.createReconnectingClient(cfg2);
+ ReconnectFuture sshReconn = dispatcher.createReconnectingClient(cfg);
+ final ReconnectFuture tcpReconn = dispatcher.createReconnectingClient(cfg2);
assertNotNull(sshSession);
assertNotNull(tcpSession);
.withSslHandlerFactory(sslHandlerFactory).build();
Future<NetconfClientSession> tlsSession = dispatcher.createClient(cfg3);
- Future<Void> tlsReconn = dispatcher.createReconnectingClient(cfg3);
+ ReconnectFuture tlsReconn = dispatcher.createReconnectingClient(cfg3);
assertNotNull(tlsSession);
assertNotNull(tlsReconn);
* instead.
*/
@Deprecated
- protected Future<Void> createReconnectingClient(final InetSocketAddress address,
+ protected ReconnectFuture createReconnectingClient(final InetSocketAddress address,
final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
final PipelineInitializer<S> initializer) {
return createReconnectingClient(address, connectStrategyFactory, initializer);
* @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
* success is never reported, only failure when it runs out of reconnection attempts.
*/
- protected Future<Void> createReconnectingClient(final InetSocketAddress address,
+ protected ReconnectFuture createReconnectingClient(final InetSocketAddress address,
final ReconnectStrategyFactory connectStrategyFactory, final PipelineInitializer<S> initializer) {
final Bootstrap b = new Bootstrap();
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil;
+
+import com.google.common.annotations.Beta;
+import io.netty.util.concurrent.Future;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+/**
+ * A future representing the task of reconnecting of a certain channel. This future never completes successfully, it
+ * either fails when the underlying strategy gives up, or when it is cancelled. It additionally exposes an additional
+ * future, which completes when the session is established for the first time.
+ */
+@Beta
+public interface ReconnectFuture extends Future<Empty> {
+ /**
+ * Return a Future which completes when the first session is established.
+ *
+ * @return First session establishment future
+ */
+ @NonNull Future<?> firstSessionFuture();
+}
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
import org.opendaylight.netconf.api.NetconfSession;
import org.opendaylight.netconf.api.NetconfSessionListener;
+import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher.PipelineInitializer;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Deprecated
final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionListener<? super S>>
- extends DefaultPromise<Void> {
+ extends DefaultPromise<Empty> implements ReconnectFuture {
private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
private final AbstractNetconfDispatcher<S, L> dispatcher;
private final InetSocketAddress address;
private final ReconnectStrategyFactory strategyFactory;
private final Bootstrap bootstrap;
- private final AbstractNetconfDispatcher.PipelineInitializer<S> initializer;
+ private final PipelineInitializer<S> initializer;
+ private final Promise<Empty> firstSessionFuture;
+ /**
+ * Channel handler that responds to channelInactive event and reconnects the session unless the promise is
+ * cancelled.
+ */
+ private final ChannelInboundHandlerAdapter inboundHandler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ // This is the ultimate channel inactive handler, not forwarding
+ if (isCancelled()) {
+ return;
+ }
+
+ synchronized (ReconnectPromise.this) {
+ final Future<?> attempt = pending;
+ if (!attempt.isDone() || !attempt.isSuccess()) {
+ // Connection refused, negotiation failed, or similar
+ LOG.debug("Connection to {} was dropped during negotiation, reattempting", address);
+ }
+
+ LOG.debug("Reconnecting after connection to {} was dropped", address);
+ lockedConnect();
+ }
+ }
+ };
+
+ @GuardedBy("this")
private Future<?> pending;
ReconnectPromise(final EventExecutor executor, final AbstractNetconfDispatcher<S, L> dispatcher,
final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
final Bootstrap bootstrap, final AbstractNetconfDispatcher.PipelineInitializer<S> initializer) {
super(executor);
- this.bootstrap = bootstrap;
+ this.firstSessionFuture = new DefaultPromise<>(executor);
+ this.bootstrap = requireNonNull(bootstrap);
this.initializer = requireNonNull(initializer);
this.dispatcher = requireNonNull(dispatcher);
this.address = requireNonNull(address);
this.strategyFactory = requireNonNull(connectStrategyFactory);
}
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (super.cancel(mayInterruptIfRunning)) {
+ firstSessionFuture.cancel(mayInterruptIfRunning);
+ pending.cancel(mayInterruptIfRunning);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Future<?> firstSessionFuture() {
+ return firstSessionFuture;
+ }
+
synchronized void connect() {
+ lockedConnect();
+ }
+
+ @Holding("this")
+ private void lockedConnect() {
final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
// Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support
channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
});
- pending.addListener(future -> {
- if (!future.isSuccess() && !ReconnectPromise.this.isDone()) {
- ReconnectPromise.this.setFailure(future.cause());
- }
- });
+ if (!firstSessionFuture.isDone()) {
+ pending.addListener(future -> {
+ if (!future.isSuccess() && !firstSessionFuture.isDone()) {
+ firstSessionFuture.setFailure(future.cause());
+ }
+ });
+ }
}
/**
return pending.isDone() && pending.isSuccess();
}
- @Override
- public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- requireNonNull(pending);
- this.pending.cancel(mayInterruptIfRunning);
- return true;
- }
-
- return false;
- }
-
/**
* Channel handler that responds to channelInactive event and reconnects the session.
* Only if the promise was not canceled.
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import com.typesafe.config.ConfigFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.SucceededFuture;
import java.io.File;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
import org.opendaylight.netconf.sal.connect.impl.DefaultSchemaResourceManager;
yangNodeInstanceId = bindingToNormalized.toYangInstanceIdentifier(NODE_INSTANCE_ID);
- doReturn(new SucceededFuture(GlobalEventExecutor.INSTANCE, null)).when(mockClientDispatcher)
- .createReconnectingClient(any());
+ doReturn(mock(ReconnectFuture.class)).when(mockClientDispatcher).createReconnectingClient(any());
LOG.info("****** Setup complete");
}
private NetconfClientSession currentSession;
private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
- private Future<?> initFuture;
+ private Future<?> taskFuture;
// isSessionClosing indicates a close operation on the session is issued and
// tearDown will surely be called later to finish the close.
*/
public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(
final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
+
+ final Future<?> connectFuture;
if (config instanceof NetconfReconnectingClientConfiguration) {
- initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+ // FIXME: This is weird. If I understand it correctly we want to know about the first connection so as to
+ // forward error state. Analyze the call graph to understand what is going on here. We really want
+ // to move reconnection away from the socket layer, so that it can properly interface with sessions
+ // and generally has some event-driven state (as all good network glue does). There is a second story
+ // which is we want to avoid duplicate code, so it depends on other users as well.
+ final var future = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+ taskFuture = future;
+ connectFuture = future.firstSessionFuture();
} else {
- initFuture = dispatcher.createClient(config);
+ taskFuture = connectFuture = dispatcher.createClient(config);
}
-
- initFuture.addListener(future -> {
+ connectFuture.addListener(future -> {
if (!future.isSuccess() && !future.isCancelled()) {
LOG.debug("{}: Connection failed", id, future.cause());
- NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
- if (firstConnectionFuture.isDone()) {
+ remoteDevice.onRemoteSessionFailed(future.cause());
+ if (!firstConnectionFuture.isDone()) {
firstConnectionFuture.setException(future.cause());
}
}
@Override
public void close() {
// Cancel reconnect if in progress
- if (initFuture != null) {
- initFuture.cancel(false);
+ if (taskFuture != null) {
+ taskFuture.cancel(false);
}
// Disconnect from device
// tear down not necessary, called indirectly by the close in disconnect()