X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSessionNegotiator.java;h=3949db62d2e21b4fc9397a9701f950b16f01b5c2;hb=refs%2Fchanges%2F86%2F101886%2F4;hp=9b457109bd5d47f3ff17a3a1dd01d73fb142fdff;hpb=4b560808ecaa0f2879717ac00c80657e89b9dd38;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index 9b457109bd..3949db62d2 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -10,23 +10,21 @@ package org.opendaylight.netconf.nettyutil; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; -import com.google.common.base.Optional; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSessionListener; -import org.opendaylight.netconf.api.NetconfSessionPreferences; import org.opendaylight.netconf.api.messages.NetconfHelloMessage; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory; @@ -40,43 +38,47 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.NodeList; -public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> +public abstract class AbstractNetconfSessionNegotiator, + L extends NetconfSessionListener> extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator { + /** + * Possible states for Finite State Machine. + */ + protected enum State { + IDLE, OPEN_WAIT, FAILED, ESTABLISHED + } private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class); + private static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; - public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; - - protected final P sessionPreferences; + private final @NonNull NetconfHelloMessage localHello; protected final Channel channel; + private final long connectionTimeoutMillis; private final Promise promise; private final L sessionListener; - private Timeout timeout; + private final Timer timer; - /** - * Possible states for Finite State Machine. - */ - protected enum State { - IDLE, OPEN_WAIT, FAILED, ESTABLISHED - } + private Timeout timeoutTask; + @GuardedBy("this") private State state = State.IDLE; - private final Timer timer; - private final long connectionTimeoutMillis; - protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise promise, + protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise promise, final Channel channel, final Timer timer, final L sessionListener, final long connectionTimeoutMillis) { - this.channel = requireNonNull(channel); + this.localHello = requireNonNull(hello); this.promise = requireNonNull(promise); - this.sessionPreferences = sessionPreferences; + this.channel = requireNonNull(channel); this.timer = timer; this.sessionListener = sessionListener; this.connectionTimeoutMillis = connectionTimeoutMillis; } + protected final @NonNull NetconfHelloMessage localHello() { + return localHello; + } + protected final void startNegotiation() { if (ifNegotiatedAlready()) { LOG.debug("Negotiation on channel {} already started", channel); @@ -100,64 +102,50 @@ public abstract class AbstractNetconfSessionNegotiator

getSslHandler(final Channel channel) { - final SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - return sslHandler == null ? Optional.absent() : Optional.of(sslHandler); - } - - public P getSessionPreferences() { - return sessionPreferences; + return Optional.ofNullable(channel.pipeline().get(SslHandler.class)); } private void start() { - final NetconfHelloMessage helloMessage = this.sessionPreferences.getHelloMessage(); - LOG.debug("Session negotiation started with hello message {} on channel {}", helloMessage, channel); + LOG.debug("Session negotiation started with hello message {} on channel {}", localHello, channel); channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler()); - sendMessage(helloMessage); + sendMessage(localHello); replaceHelloMessageOutboundHandler(); changeState(State.OPEN_WAIT); - timeout = this.timer.newTimeout(new TimerTask() { - @Override - @SuppressWarnings("checkstyle:hiddenField") - public void run(final Timeout timeout) { - synchronized (this) { - if (state != State.ESTABLISHED) { - - LOG.debug("Connection timeout after {}, session is in state {}", timeout, state); - - // Do not fail negotiation if promise is done or canceled - // It would result in setting result of the promise second time and that throws exception - if (!isPromiseFinished()) { - LOG.warn("Netconf session was not established after {}", connectionTimeoutMillis); - changeState(State.FAILED); - - channel.close().addListener((GenericFutureListener) future -> { - if (future.isSuccess()) { - LOG.debug("Channel {} closed: success", future.channel()); - } else { - LOG.warn("Channel {} closed: fail", future.channel()); - } - }); - } - } else if (channel.isOpen()) { - channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER); - } - } - } + timeoutTask = this.timer.newTimeout(this::timeoutExpired, connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } - private boolean isPromiseFinished() { - return promise.isDone() || promise.isCancelled(); + private synchronized void timeoutExpired(final Timeout timeout) { + if (state != State.ESTABLISHED) { + LOG.debug("Connection timeout after {}, session backed by channel {} is in state {}", timeout, channel, + state); + + // Do not fail negotiation if promise is done or canceled + // It would result in setting result of the promise second time and that throws exception + if (!promise.isDone() && !promise.isCancelled()) { + LOG.warn("Netconf session backed by channel {} was not established after {}", channel, + connectionTimeoutMillis); + changeState(State.FAILED); + + channel.close().addListener(future -> { + if (future.isSuccess()) { + LOG.debug("Channel {} closed: success", channel); + } else { + LOG.warn("Channel {} closed: fail", channel); + } + }); } - - }, connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } else if (channel.isOpen()) { + channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER); + } } private void cancelTimeout() { - if (timeout != null) { - timeout.cancel(); + if (timeoutTask != null) { + timeoutTask.cancel(); } } @@ -173,6 +161,9 @@ public abstract class AbstractNetconfSessionNegotiator

{ if (!f.isSuccess()) { - LOG.info("Failed to send message {}", msg, f.cause()); + LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause()); negotiationFailed(f.cause()); } else { - LOG.trace("Message {} sent to socket", msg); + LOG.trace("Message {} sent to socket on channel {}", msg, channel); } }); } @@ -312,7 +300,7 @@ public abstract class AbstractNetconfSessionNegotiator