X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSessionNegotiator.java;h=9b457109bd5d47f3ff17a3a1dd01d73fb142fdff;hb=791a6a25f6670a23d390bfdeb786f70588d622a5;hp=2f30023f541f94914fac34bac68d8852f383cb68;hpb=6d7e12bf3ef64e5004703a1d540e7e26f30a9595;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index 2f30023f54..9b457109bd 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -5,11 +5,12 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; @@ -19,7 +20,6 @@ import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import java.util.concurrent.TimeUnit; @@ -35,7 +35,6 @@ import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; import org.opendaylight.netconf.util.messages.FramingMechanism; -import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -43,14 +42,16 @@ import org.w3c.dom.NodeList; public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> - extends AbstractSessionNegotiator { + extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator { private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class); public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; protected final P sessionPreferences; + protected final Channel channel; + private final Promise promise; private final L sessionListener; private Timeout timeout; @@ -62,42 +63,45 @@ public abstract class AbstractNetconfSessionNegotiator

promise; private final Timer timer; private final long connectionTimeoutMillis; protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise promise, final Channel channel, final Timer timer, final L sessionListener, final long connectionTimeoutMillis) { - super(promise, channel); + this.channel = requireNonNull(channel); + this.promise = requireNonNull(promise); this.sessionPreferences = sessionPreferences; - this.promise = promise; this.timer = timer; this.sessionListener = sessionListener; this.connectionTimeoutMillis = connectionTimeoutMillis; } - @Override protected final void startNegotiation() { - final Optional sslHandler = getSslHandler(channel); - if (sslHandler.isPresent()) { - Future future = sslHandler.get().handshakeFuture(); - future.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(final Future future) { - Preconditions.checkState(future.isSuccess(), "Ssl handshake was not successful"); + if (ifNegotiatedAlready()) { + LOG.debug("Negotiation on channel {} already started", channel); + } else { + final Optional sslHandler = getSslHandler(channel); + if (sslHandler.isPresent()) { + sslHandler.get().handshakeFuture().addListener(future -> { + checkState(future.isSuccess(), "Ssl handshake was not successful"); LOG.debug("Ssl handshake complete"); start(); - } - }); - } else { - start(); + }); + } else { + start(); + } } } + protected final synchronized boolean ifNegotiatedAlready() { + // Indicates whether negotiation already started + return this.state != State.IDLE; + } + private static Optional getSslHandler(final Channel channel) { final SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - return sslHandler == null ? Optional.absent() : Optional.of(sslHandler); + return sslHandler == null ? Optional.absent() : Optional.of(sslHandler); } public P getSessionPreferences() { @@ -117,6 +121,7 @@ public abstract class AbstractNetconfSessionNegotiator

() { - @Override - public void operationComplete(final ChannelFuture future) throws Exception { - if (future.isSuccess()) { - LOG.debug("Channel {} closed: success", future.channel()); - } else { - LOG.warn("Channel {} closed: fail", future.channel()); - } + channel.close().addListener((GenericFutureListener) future -> { + if (future.isSuccess()) { + LOG.debug("Channel {} closed: success", future.channel()); + } else { + LOG.warn("Channel {} closed: fail", future.channel()); } }); } @@ -161,8 +163,6 @@ public abstract class AbstractNetconfSessionNegotiator

netconfMessagesFromNegotiation = ((NetconfXMLToHelloMessageDecoder) helloMessageHandler).getPostHelloNetconfMessages(); @@ -226,18 +226,21 @@ public abstract class AbstractNetconfSessionNegotiator

{ + if (!f.isSuccess()) { + LOG.info("Failed to send message {}", msg, f.cause()); + negotiationFailed(f.cause()); + } else { + LOG.trace("Message {} sent to socket", msg); + } + }); + } + + @Override + @SuppressWarnings("checkstyle:illegalCatch") + public final void channelActive(final ChannelHandlerContext ctx) { + LOG.debug("Starting session negotiation on channel {}", channel); + try { + startNegotiation(); + } catch (final Exception e) { + LOG.warn("Unexpected negotiation failure", e); + negotiationFailed(e); + } + } + + @Override + @SuppressWarnings("checkstyle:illegalCatch") + public final void channelRead(final ChannelHandlerContext ctx, final Object msg) { + LOG.debug("Negotiation read invoked on channel {}", channel); + try { + handleMessage((NetconfHelloMessage) msg); + } catch (final Exception e) { + LOG.debug("Unexpected error while handling negotiation message {}", msg, e); + negotiationFailed(e); + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.info("Unexpected error during negotiation", cause); + negotiationFailed(cause); + } + + protected abstract void handleMessage(NetconfHelloMessage msg) throws Exception; }