package org.opendaylight.protocol.framework;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Promise;
* @param <S> Protocol session type, has to extend ProtocolSession<M>
*/
public abstract class AbstractSessionNegotiator<M, S extends AbstractProtocolSession<?>> extends ChannelInboundHandlerAdapter implements SessionNegotiator<S> {
- private final Logger logger = LoggerFactory.getLogger(AbstractSessionNegotiator.class);
+ private final Logger LOG = LoggerFactory.getLogger(AbstractSessionNegotiator.class);
private final Promise<S> promise;
protected final Channel channel;
protected abstract void handleMessage(M msg) throws Exception;
protected final void negotiationSuccessful(final S session) {
- logger.debug("Negotiation on channel {} successful with session {}", channel, session);
+ LOG.debug("Negotiation on channel {} successful with session {}", channel, session);
channel.pipeline().replace(this, "session", session);
promise.setSuccess(session);
}
- protected final void negotiationFailed(final Throwable cause) {
- logger.debug("Negotiation on channel {} failed", channel, cause);
+ protected void negotiationFailed(final Throwable cause) {
+ LOG.debug("Negotiation on channel {} failed", channel, cause);
channel.close();
promise.setFailure(cause);
}
+ /**
+ * Send a message to peer and fail negotiation if it does not reach
+ * the peer.
+ *
+ * @param msg Message which should be sent.
+ */
+ protected final void sendMessage(final M msg) {
+ this.channel.writeAndFlush(msg).addListener(
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(final ChannelFuture f) {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {}", msg, f.cause());
+ negotiationFailed(f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket", msg);
+ }
+ }
+ });
+ }
+
@Override
public final void channelActive(final ChannelHandlerContext ctx) {
- logger.debug("Starting session negotiation on channel {}", channel);
+ LOG.debug("Starting session negotiation on channel {}", channel);
try {
startNegotiation();
} catch (Exception e) {
- logger.warn("Unexpected negotiation failure", e);
+ LOG.warn("Unexpected negotiation failure", e);
negotiationFailed(e);
}
}
@Override
+ @SuppressWarnings("unchecked")
public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
- logger.debug("Negotiation read invoked on channel {}", channel);
+ LOG.debug("Negotiation read invoked on channel {}", channel);
try {
handleMessage((M)msg);
} catch (Exception e) {
- logger.debug("Unexpected error while handling negotiation message {}", msg, e);
+ LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
negotiationFailed(e);
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
- logger.info("Unexpected error during negotiation", cause);
+ LOG.info("Unexpected error during negotiation", cause);
negotiationFailed(cause);
}
}