X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSessionNegotiator.java;h=bbe563b5b19e8ea5b5fed2cf92ac81c74df61e0c;hb=460f7e77b73183799fc1d2898fec1bc50c5829db;hp=d9c15ae03578052a079bc157d44d2e7e9dcb6e1e;hpb=ce8ad4e92f8128750f7ce7216f7e73ad238efaae;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index d9c15ae035..bbe563b5b1 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -5,23 +5,22 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSessionListener; @@ -34,7 +33,6 @@ import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; import org.opendaylight.netconf.util.messages.FramingMechanism; -import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -42,17 +40,7 @@ import org.w3c.dom.NodeList; public abstract class AbstractNetconfSessionNegotiator
, L extends NetconfSessionListener promise;
+ private final L sessionListener;
private final Timer timer;
- private final long connectionTimeoutMillis;
+
+ private Timeout timeoutTask;
+
+ @GuardedBy("this")
+ private State state = State.IDLE;
protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise sslHandler = getSslHandler(channel);
if (sslHandler.isPresent()) {
sslHandler.get().handshakeFuture().addListener(future -> {
- Preconditions.checkState(future.isSuccess(), "Ssl handshake was not successful");
+ checkState(future.isSuccess(), "Ssl handshake was not successful");
LOG.debug("Ssl handshake complete");
start();
});
@@ -100,11 +98,10 @@ public abstract class AbstractNetconfSessionNegotiator getSslHandler(final Channel channel) {
- final SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
- return sslHandler == null ? Optional.absent() : Optional.of(sslHandler);
+ return Optional.ofNullable(channel.pipeline().get(SslHandler.class));
}
- public P getSessionPreferences() {
+ public final P getSessionPreferences() {
return sessionPreferences;
}
@@ -119,52 +116,42 @@ public abstract class AbstractNetconfSessionNegotiator ) future -> {
- if (future.isSuccess()) {
- LOG.debug("Channel {} closed: success", future.channel());
- } else {
- LOG.warn("Channel {} closed: fail", future.channel());
- }
- });
- }
- } else if (channel.isOpen()) {
- channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
- }
- }
- }
+ timeoutTask = this.timer.newTimeout(this::timeoutExpired, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ }
- private boolean isPromiseFinished() {
- return promise.isDone() || promise.isCancelled();
+ private synchronized void timeoutExpired(final Timeout timeout) {
+ if (state != State.ESTABLISHED) {
+ LOG.debug("Connection timeout after {}, session backed by channel {} is in state {}", timeout, channel,
+ state);
+
+ // Do not fail negotiation if promise is done or canceled
+ // It would result in setting result of the promise second time and that throws exception
+ if (!promise.isDone() && !promise.isCancelled()) {
+ LOG.warn("Netconf session backed by channel {} was not established after {}", channel,
+ connectionTimeoutMillis);
+ changeState(State.FAILED);
+
+ channel.close().addListener(future -> {
+ if (future.isSuccess()) {
+ LOG.debug("Channel {} closed: success", channel);
+ } else {
+ LOG.warn("Channel {} closed: fail", channel);
+ }
+ });
}
-
- }, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ } else if (channel.isOpen()) {
+ channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
+ }
}
private void cancelTimeout() {
- if (timeout != null) {
- timeout.cancel();
+ if (timeoutTask != null) {
+ timeoutTask.cancel();
}
}
protected final S getSessionForHelloMessage(final NetconfHelloMessage netconfMessage)
throws NetconfDocumentedException {
- Preconditions.checkNotNull(netconfMessage, "netconfMessage");
-
final Document doc = netconfMessage.getDocument();
if (shouldUseChunkFraming(doc)) {
@@ -175,6 +162,9 @@ public abstract class AbstractNetconfSessionNegotiator netconfMessagesFromNegotiation =
((NetconfXMLToHelloMessageDecoder) helloMessageHandler).getPostHelloNetconfMessages();
@@ -228,14 +218,10 @@ public abstract class AbstractNetconfSessionNegotiator {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause());
+ negotiationFailed(f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket on channel {}", msg, channel);
+ }
+ });
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelActive(final ChannelHandlerContext ctx) {
+ LOG.debug("Starting session negotiation on channel {}", channel);
+ try {
+ startNegotiation();
+ } catch (final Exception e) {
+ LOG.warn("Unexpected negotiation failure on channel {}", channel, e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Negotiation read invoked on channel {}", channel);
+ try {
+ handleMessage((NetconfHelloMessage) msg);
+ } catch (final Exception e) {
+ LOG.debug("Unexpected error while handling negotiation message {} on channel {}", msg, channel, e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.info("Unexpected error during negotiation on channel {}", channel, cause);
+ negotiationFailed(cause);
+ }
+
+ protected abstract void handleMessage(NetconfHelloMessage msg) throws Exception;
}
>
- extends AbstractSessionNegotiator {
/**
* Possible states for Finite State Machine.
*/
@@ -60,23 +48,33 @@ public abstract class AbstractNetconfSessionNegotiator promise,
final Channel channel, final Timer timer,
final L sessionListener, final long connectionTimeoutMillis) {
- super(promise, channel);
+ this.channel = requireNonNull(channel);
+ this.promise = requireNonNull(promise);
this.sessionPreferences = sessionPreferences;
- this.promise = promise;
this.timer = timer;
this.sessionListener = sessionListener;
this.connectionTimeoutMillis = connectionTimeoutMillis;
}
- @Override
protected final void startNegotiation() {
if (ifNegotiatedAlready()) {
LOG.debug("Negotiation on channel {} already started", channel);
@@ -84,7 +82,7 @@ public abstract class AbstractNetconfSessionNegotiator