public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
try {
// ssh handler has to be the first handler in pipeline
- ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler));
+ ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler, promise));
super.initialize(ch,promise);
} catch (final IOException e) {
throw new RuntimeException(e);
package org.opendaylight.netconf.client;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
+import java.net.SocketAddress;
import org.opendaylight.netconf.nettyutil.AbstractChannelInitializer;
import org.opendaylight.protocol.framework.SessionListenerFactory;
this.sessionListener = sessionListener;
}
+ @Override
+ public void initialize(final Channel ch, Promise<NetconfClientSession> promise) {
+ final Future negotiationFuture = promise;
+
+ //We have to add this channel outbound handler to channel pipeline, in order
+ //to get notifications from netconf negotiatior. Set connection promise to
+ //success only after successful negotiation.
+ ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
+ ChannelPromise connectPromise;
+ GenericFutureListener negotiationFutureListener;
+
+ @Override
+ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
+ final ChannelPromise channelPromise) throws Exception {
+ connectPromise = channelPromise;
+ ChannelPromise tcpConnectFuture = new DefaultChannelPromise(ch);
+
+ negotiationFutureListener = new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future future) throws Exception {
+ if (future.isSuccess())
+ connectPromise.setSuccess();
+ }
+ };
+
+ tcpConnectFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
+ @Override
+ public void operationComplete(Future<? super Void> future) throws Exception {
+ if(future.isSuccess()) {
+ //complete connection promise with netconf negotiation future
+ negotiationFuture.addListener(negotiationFutureListener);
+ } else {
+ connectPromise.setFailure(future.cause());
+ }
+ }
+ });
+ ctx.connect(remoteAddress, localAddress, tcpConnectFuture);
+ }
+
+ @Override
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic
+ if(connectPromise.isSuccess()) {
+ ctx.fireChannelInactive();
+ }
+
+ //If connection promise is not already set, it means negotiation failed
+ //we must set connection promise to failure
+ if(!connectPromise.isDone()) {
+ connectPromise.setFailure(new IllegalStateException("Negotiation failed"));
+ }
+
+ //Remove listener from negotiation future, we don't want notifications
+ //from negotiation anymore
+ negotiationFuture.removeListener(negotiationFutureListener);
+
+ super.disconnect(ctx, promise);
+ promise.setSuccess();
+ }
+ });
+
+ super.initialize(ch, promise);
+ }
+
@Override
protected void initializeSessionNegotiator(final Channel ch, final Promise<NetconfClientSession> promise) {
ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR,
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.netconf.util.messages.FramingMechanism;
-import org.opendaylight.netconf.api.messages.NetconfHelloMessage;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSessionListener;
import org.opendaylight.netconf.api.NetconfSessionPreferences;
+import org.opendaylight.netconf.api.messages.NetconfHelloMessage;
import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory;
import org.opendaylight.netconf.nettyutil.handler.NetconfChunkAggregator;
import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder;
import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
+import org.opendaylight.netconf.util.messages.FramingMechanism;
import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Do not fail negotiation if promise is done or canceled
// It would result in setting result of the promise second time and that throws exception
if (isPromiseFinished() == false) {
- negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+ LOG.warn("Netconf session was not established after {}", connectionTimeoutMillis);
changeState(State.FAILED);
- channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+ channel.close().addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if(future.isSuccess()) {
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.HashMap;
*/
public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
public static final String SUBSYSTEM = "netconf";
-
public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient();
-
public static final int SSH_DEFAULT_NIO_WORKERS = 8;
-
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
// Disable default timeouts from mina sshd
private static final long DEFAULT_TIMEOUT = -1L;
private final AuthenticationHandler authenticationHandler;
private final SshClient sshClient;
+ private Future negotiationFuture;
private AsyncSshHandlerReader sshReadAsyncListener;
private AsyncSshHandlerWriter sshWriteAsyncHandler;
private ClientChannel channel;
private ClientSession session;
private ChannelPromise connectPromise;
+ private GenericFutureListener negotiationFutureListener;
- public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
- return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
+ public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final SshClient sshClient, final Future negotiationFuture) throws IOException {
+ this(authenticationHandler, sshClient);
+ this.negotiationFuture = negotiationFuture;
}
/**
sshClient.start();
}
+ public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
+ return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
+ }
+
+ /**
+ *
+ * Create AsyncSshHandler for netconf subsystem. Negotiation future has to be set to success after successful netconf
+ * negotiation.
+ *
+ * @param authenticationHandler
+ * @param negotiationFuture
+ * @return
+ * @throws IOException
+ */
+ public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler, final Future negotiationFuture) throws IOException {
+ return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT, negotiationFuture);
+ }
+
private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) {
LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) {
LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
- connectPromise.setSuccess();
+ if(negotiationFuture == null) {
+ connectPromise.setSuccess();
+ }
// TODO we should also read from error stream and at least log from that
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
- disconnect(ctx, ctx.newPromise());
// If the promise is not yet done, we have failed with initial connect and set connectPromise to failure
if(!connectPromise.isDone()) {
connectPromise.setFailure(e);
}
+
+ disconnect(ctx, ctx.newPromise());
}
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
this.connectPromise = promise;
+
+ if(negotiationFuture != null) {
+
+ negotiationFutureListener = new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future future) throws Exception {
+ if (future.isSuccess())
+ connectPromise.setSuccess();
+ }
+ };
+ //complete connection promise with netconf negotiation future
+ negotiationFuture.addListener(negotiationFutureListener);
+ }
startSsh(ctx, remoteAddress);
}
sshReadAsyncListener.close();
}
+ //If connection promise is not already set, it means negotiation failed
+ //we must set connection promise to failure
+ if(!connectPromise.isDone()) {
+ connectPromise.setFailure(new IllegalStateException("Negotiation failed"));
+ }
+
+ //Remove listener from negotiation future, we don't want notifications
+ //from negotiation anymore
+ if(negotiationFuture != null) {
+ negotiationFuture.removeListener(negotiationFutureListener);
+ }
+
if(session!= null && !session.isClosed() && !session.isClosing()) {
session.close(false).addListener(new SshFutureListener<CloseFuture>() {
@Override
@Override
public synchronized void operationComplete(final IoReadFuture future) {
if(future.getException() != null) {
+
+ //if asyncout is already set to null by close method, do nothing
+ if(asyncOut == null) {
+ return;
+ }
+
if(asyncOut.isClosed() || asyncOut.isClosing()) {
// Ssh dropped
LOG.debug("Ssh session dropped on channel: {}", channelId, future.getException());