import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
-import com.google.common.base.Optional;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.Timeout;
import io.netty.util.Timer;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSessionListener;
public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences,
S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator<S> {
+ /**
+ * Possible states for Finite State Machine.
+ */
+ protected enum State {
+ IDLE, OPEN_WAIT, FAILED, ESTABLISHED
+ }
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
-
- public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
+ private static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
protected final P sessionPreferences;
protected final Channel channel;
+ private final long connectionTimeoutMillis;
private final Promise<S> promise;
private final L sessionListener;
- private Timeout timeout;
+ private final Timer timer;
- /**
- * Possible states for Finite State Machine.
- */
- protected enum State {
- IDLE, OPEN_WAIT, FAILED, ESTABLISHED
- }
+ private Timeout timeoutTask;
+ @GuardedBy("this")
private State state = State.IDLE;
- private final Timer timer;
- private final long connectionTimeoutMillis;
protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise<S> promise,
final Channel channel, final Timer timer,
}
private static Optional<SslHandler> getSslHandler(final Channel channel) {
- final SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
- return sslHandler == null ? Optional.absent() : Optional.of(sslHandler);
+ return Optional.ofNullable(channel.pipeline().get(SslHandler.class));
}
- public P getSessionPreferences() {
+ public final P getSessionPreferences() {
return sessionPreferences;
}
replaceHelloMessageOutboundHandler();
changeState(State.OPEN_WAIT);
- timeout = this.timer.newTimeout(new TimerTask() {
- @Override
- @SuppressWarnings("checkstyle:hiddenField")
- public void run(final Timeout timeout) {
- synchronized (this) {
- if (state != State.ESTABLISHED) {
-
- LOG.debug("Connection timeout after {}, session is in state {}", timeout, state);
-
- // Do not fail negotiation if promise is done or canceled
- // It would result in setting result of the promise second time and that throws exception
- if (!isPromiseFinished()) {
- LOG.warn("Netconf session was not established after {}", connectionTimeoutMillis);
- changeState(State.FAILED);
-
- channel.close().addListener((GenericFutureListener<ChannelFuture>) future -> {
- if (future.isSuccess()) {
- LOG.debug("Channel {} closed: success", future.channel());
- } else {
- LOG.warn("Channel {} closed: fail", future.channel());
- }
- });
- }
- } else if (channel.isOpen()) {
- channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
- }
- }
- }
+ timeoutTask = this.timer.newTimeout(this::timeoutExpired, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ }
- private boolean isPromiseFinished() {
- return promise.isDone() || promise.isCancelled();
+ private synchronized void timeoutExpired(final Timeout timeout) {
+ if (state != State.ESTABLISHED) {
+ LOG.debug("Connection timeout after {}, session backed by channel {} is in state {}", timeout, channel,
+ state);
+
+ // Do not fail negotiation if promise is done or canceled
+ // It would result in setting result of the promise second time and that throws exception
+ if (!promise.isDone() && !promise.isCancelled()) {
+ LOG.warn("Netconf session backed by channel {} was not established after {}", channel,
+ connectionTimeoutMillis);
+ changeState(State.FAILED);
+
+ channel.close().addListener(future -> {
+ if (future.isSuccess()) {
+ LOG.debug("Channel {} closed: success", channel);
+ } else {
+ LOG.warn("Channel {} closed: fail", channel);
+ }
+ });
}
-
- }, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ } else if (channel.isOpen()) {
+ channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
+ }
}
private void cancelTimeout() {
- if (timeout != null) {
- timeout.cancel();
+ if (timeoutTask != null) {
+ timeoutTask.cancel();
}
}
return getSession(sessionListener, channel, netconfMessage);
}
+ protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message)
+ throws NetconfDocumentedException;
+
/**
* Insert chunk framing handlers into the pipeline.
*/
return channel.pipeline().replace(handlerKey, handlerKey, decoder);
}
- @SuppressWarnings("checkstyle:hiddenField")
- protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message)
- throws NetconfDocumentedException;
-
private synchronized void changeState(final State newState) {
LOG.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
checkState(isStateChangePermitted(state, newState),
- "Cannot change state from %s to %s for chanel %s", state, newState, channel);
+ "Cannot change state from %s to %s for channel %s", state, newState, channel);
this.state = newState;
}
private final class ExceptionHandlingInboundChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
- LOG.warn("An exception occurred during negotiation with {}", channel.remoteAddress(), cause);
+ LOG.warn("An exception occurred during negotiation with {} on channel {}",
+ channel.remoteAddress(), channel, cause);
+ // FIXME: this is quite suspect as it is competing with timeoutExpired() without synchronization
cancelTimeout();
negotiationFailed(cause);
changeState(State.FAILED);
protected final void sendMessage(final NetconfMessage msg) {
this.channel.writeAndFlush(msg).addListener(f -> {
if (!f.isSuccess()) {
- LOG.info("Failed to send message {}", msg, f.cause());
+ LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause());
negotiationFailed(f.cause());
} else {
- LOG.trace("Message {} sent to socket", msg);
+ LOG.trace("Message {} sent to socket on channel {}", msg, channel);
}
});
}
try {
startNegotiation();
} catch (final Exception e) {
- LOG.warn("Unexpected negotiation failure", e);
+ LOG.warn("Unexpected negotiation failure on channel {}", channel, e);
negotiationFailed(e);
}
}
try {
handleMessage((NetconfHelloMessage) msg);
} catch (final Exception e) {
- LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
+ LOG.debug("Unexpected error while handling negotiation message {} on channel {}", msg, channel, e);
negotiationFailed(e);
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
- LOG.info("Unexpected error during negotiation", cause);
+ LOG.info("Unexpected error during negotiation on channel {}", channel, cause);
negotiationFailed(cause);
}