BUG-48: message send should return future 77/2177/1
authorRobert Varga <rovarga@cisco.com>
Fri, 25 Oct 2013 20:17:24 +0000 (22:17 +0200)
committerRobert Varga <rovarga@cisco.com>
Fri, 25 Oct 2013 20:18:03 +0000 (22:18 +0200)
The next step will see us integrate sendMessage() into MD-SAL RPC
service. The RPC's fate has to track the message as it leaves the
system -- they return a Future, which resolves one the RPC is complete.

The definition in this case requires that we allow sendMessage() users
to obtain the Future which will fail if the message does not hit the
socket.

Change-Id: If0e086d5a53b17712dfb05730d73cca68f3242e7
Signed-off-by: Robert Varga <rovarga@cisco.com>
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPSession.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/ServerSessionMock.java

index caaa044d5778db40993f10f4521aa61d128821ff..9ff465432e341319643da828174c21223ed034df 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.protocol.pcep;
 
 import java.net.InetAddress;
+import java.util.concurrent.Future;
 
 import org.opendaylight.protocol.framework.ProtocolSession;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
@@ -27,8 +28,9 @@ public interface PCEPSession extends ProtocolSession<Message> {
         * resources.
         *
         * @param message message to be sent
+        * @return Future promise which will be succeed when the message is enqueued in the socket.
         */
-       public void sendMessage(Message message);
+       Future<Void> sendMessage(Message message);
 
        public void close(TerminationReason reason);
 
index bade1702dc186887a759a288775adda48ee8d178..11de5e9f3ea5920227cf8d1719a629b99c40ef60 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.protocol.pcep.impl;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
@@ -18,6 +20,7 @@ import java.net.InetSocketAddress;
 import java.util.Date;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.protocol.framework.AbstractProtocolSession;
@@ -188,17 +191,26 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
         * @param msg to be sent
         */
        @Override
-       public void sendMessage(final Message msg) {
-               try {
-                       this.channel.writeAndFlush(msg);
-                       this.lastMessageSentAt = System.nanoTime();
-                       if (!(msg instanceof KeepaliveMessage)) {
-                               logger.debug("Sent message: " + msg);
-                       }
-                       this.sentMsgCount++;
-               } catch (final Exception e) {
-                       logger.warn("Message {} was not sent.", msg, e);
+       public Future<Void> sendMessage(final Message msg) {
+               final ChannelFuture f = this.channel.writeAndFlush(msg);
+               this.lastMessageSentAt = System.nanoTime();
+               if (!(msg instanceof KeepaliveMessage)) {
+                       logger.debug("Message enqueued: {}", msg);
                }
+               this.sentMsgCount++;
+
+               f.addListener(new ChannelFutureListener() {
+                       @Override
+                       public void operationComplete(final ChannelFuture arg) {
+                               if (arg.isSuccess()) {
+                                       logger.debug("Message sent to socket: {}", msg);
+                               } else {
+                                       logger.debug("Message not sent: {}", msg, arg.cause());
+                               }
+                       }
+               });
+
+               return f;
        }
 
        /**
index 15ed6582d4dfa586f50bde86ee1dd6572d486326..e796e077bfe7218b5c99cd729097ae277e728c02 100644 (file)
@@ -11,12 +11,16 @@ import static org.mockito.Mockito.mock;
 import io.netty.channel.Channel;
 import io.netty.util.HashedWheelTimer;
 
+import java.util.concurrent.Future;
+
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.TerminationReason;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.message.open.message.OpenBuilder;
 
+import com.google.common.util.concurrent.Futures;
+
 public class ServerSessionMock extends PCEPSessionImpl {
 
        private final MockPCE client;
@@ -28,9 +32,10 @@ public class ServerSessionMock extends PCEPSessionImpl {
        }
 
        @Override
-       public void sendMessage(final Message msg) {
+       public Future<Void> sendMessage(final Message msg) {
                this.lastMessageSentAt = System.nanoTime();
                this.client.onMessage(this, msg);
+               return Futures.immediateFuture(null);
        }
 
        @Override