Introduce checked sendMessage() method
authorRobert Varga <rovarga@cisco.com>
Tue, 18 Feb 2014 10:39:14 +0000 (11:39 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 18 Feb 2014 10:39:14 +0000 (11:39 +0100)
This method send a message down the socket and makes sure that
negotiation fails if the message does not reach the peer for any reason.
Subclasses should use this method instead of talking directly to the
underlying channel.

Change-Id: I5b8201caef1c4ba655dad12fb139b054b9e75f06
Signed-off-by: Robert Varga <rovarga@cisco.com>
src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java

index 9f9f811e889f85c20352db331187f008b812bd85..d41e8106c5aec85166b43d72241b7fb600921801 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.protocol.framework;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.Promise;
@@ -26,7 +28,7 @@ import com.google.common.base.Preconditions;
  * @param <S> Protocol session type, has to extend ProtocolSession<M>
  */
 public abstract class AbstractSessionNegotiator<M, S extends AbstractProtocolSession<?>> extends ChannelInboundHandlerAdapter implements SessionNegotiator<S> {
-    private final Logger logger = LoggerFactory.getLogger(AbstractSessionNegotiator.class);
+    private final Logger LOG = LoggerFactory.getLogger(AbstractSessionNegotiator.class);
     private final Promise<S> promise;
     protected final Channel channel;
 
@@ -39,42 +41,63 @@ public abstract class AbstractSessionNegotiator<M, S extends AbstractProtocolSes
     protected abstract void handleMessage(M msg) throws Exception;
 
     protected final void negotiationSuccessful(final S session) {
-        logger.debug("Negotiation on channel {} successful with session {}", channel, session);
+        LOG.debug("Negotiation on channel {} successful with session {}", channel, session);
         channel.pipeline().replace(this, "session", session);
         promise.setSuccess(session);
     }
 
     protected final void negotiationFailed(final Throwable cause) {
-        logger.debug("Negotiation on channel {} failed", channel, cause);
+        LOG.debug("Negotiation on channel {} failed", channel, cause);
         channel.close();
         promise.setFailure(cause);
     }
 
+    /**
+     * Send a message to peer and fail negotiation if it does not reach
+     * the peer.
+     *
+     * @param msg Message which should be sent.
+     */
+    protected final void sendMessage(final M msg) {
+        this.channel.writeAndFlush(msg).addListener(
+                new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(final ChannelFuture f) {
+                        if (!f.isSuccess()) {
+                            LOG.info("Failed to send message {}", msg, f.cause());
+                            negotiationFailed(f.cause());
+                        } else {
+                            LOG.trace("Message {} sent to socket", msg);
+                        }
+                    }
+                });
+    }
+
     @Override
     public final void channelActive(final ChannelHandlerContext ctx) {
-        logger.debug("Starting session negotiation on channel {}", channel);
+        LOG.debug("Starting session negotiation on channel {}", channel);
         try {
             startNegotiation();
         } catch (Exception e) {
-            logger.warn("Unexpected negotiation failure", e);
+            LOG.warn("Unexpected negotiation failure", e);
             negotiationFailed(e);
         }
     }
 
     @Override
     public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
-        logger.debug("Negotiation read invoked on channel {}", channel);
+        LOG.debug("Negotiation read invoked on channel {}", channel);
         try {
             handleMessage((M)msg);
         } catch (Exception e) {
-            logger.debug("Unexpected error while handling negotiation message {}", msg, e);
+            LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
             negotiationFailed(e);
         }
     }
 
     @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
-        logger.info("Unexpected error during negotiation", cause);
+        LOG.info("Unexpected error during negotiation", cause);
         negotiationFailed(cause);
     }
 }