BUG-2383: add a non-flushing message write method 14/16014/5
authorRobert Varga <rovarga@cisco.com>
Wed, 4 Mar 2015 12:27:38 +0000 (13:27 +0100)
committerRobert Varga <rovarga@cisco.com>
Thu, 5 Mar 2015 11:07:29 +0000 (12:07 +0100)
When performing route dissemination, we may need to enqueue multiple
messages before flushing. This patch adds and API to perform that and
add the corresponding capability to ChannelOutputLimiter.

Change-Id: I048c6f6a84ced775ce1890cc1369ee8fff97cd1f
Signed-off-by: Robert Varga <rovarga@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ChannelOutputLimiter.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeerTest.java

index f720aae414330da939ae1c8f490699c6cfbc52e4..37bfae4e0b05170be22831eab0f637f373692e30 100644 (file)
@@ -226,31 +226,44 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
         }
     }
 
-    synchronized void sendMessage(final Notification msg) {
-        try {
-            this.channel.writeAndFlush(msg).addListener(
-                new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(final ChannelFuture f) {
-                        if (!f.isSuccess()) {
-                            LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
-                        } else {
-                            LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
-                        }
+    @GuardedBy("this")
+    private final void writeEpilogue(final ChannelFuture future, final Notification msg) {
+        future.addListener(
+            new ChannelFutureListener() {
+                @Override
+                public void operationComplete(final ChannelFuture f) {
+                    if (!f.isSuccess()) {
+                        LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
+                    } else {
+                        LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
                     }
-                });
-            this.lastMessageSentAt = System.nanoTime();
-            this.sessionStats.updateSentMsgTotal();
-            if (msg instanceof Update) {
-                this.sessionStats.updateSentMsgUpd();
-            } else if (msg instanceof Notify) {
-                this.sessionStats.updateSentMsgErr((Notify) msg);
-            }
+                }
+            });
+        this.lastMessageSentAt = System.nanoTime();
+        this.sessionStats.updateSentMsgTotal();
+        if (msg instanceof Update) {
+            this.sessionStats.updateSentMsgUpd();
+        } else if (msg instanceof Notify) {
+            this.sessionStats.updateSentMsgErr((Notify) msg);
+        }
+    }
+
+    void flush() {
+        this.channel.flush();
+    }
+
+    synchronized void write(final Notification msg) {
+        try {
+            writeEpilogue(this.channel.write(msg), msg);
         } catch (final Exception e) {
             LOG.warn("Message {} was not sent.", msg, e);
         }
     }
 
+    synchronized void sendMessage(final Notification msg) {
+        writeEpilogue(this.channel.writeAndFlush(msg), msg);
+    }
+
     private synchronized void closeWithoutMessage() {
         LOG.debug("Closing session: {}", this);
         removePeerSession();
index 4ea3f54a28320ae15b3a5137f2841575d0dfa58e..7b342f8584b5ae78b79deccf03a928932441a9da 100644 (file)
@@ -29,7 +29,7 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
         this.session = Preconditions.checkNotNull(session);
     }
 
-    void write(final Notification msg) {
+    private void ensureWritable() {
         if (blocked) {
             LOG.trace("Blocked slow path tripped on session {}", session);
             synchronized (this) {
@@ -45,12 +45,20 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
                 LOG.debug("Resuming write on session {}", session);
             }
         }
+    }
+
+    void write(final Notification msg) {
+        ensureWritable();
+        session.write(msg);
+    }
 
+    void writeAndFlush(final Notification msg) {
+        ensureWritable();
         session.sendMessage(msg);
     }
 
     void flush() {
-        // FIXME: no-op, as we do not have hatching APIs in session yet
+        session.flush();
     }
 
     @Override
index f6fcd1d849ec7577f9734b7f4f9ce97914b5ee38..e59a9765eea3c70419727e159faac6dd334c204a 100644 (file)
@@ -10,11 +10,11 @@ package org.opendaylight.protocol.bgp.rib.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import io.netty.channel.Channel;
+import io.netty.channel.DefaultChannelPromise;
 import io.netty.channel.EventLoop;
 import java.math.BigInteger;
 import java.net.InetSocketAddress;
@@ -109,6 +109,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.type
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.network.concepts.rev131125.IsoSystemIdentifier;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.Notification;
 
 public class ApplicationPeerTest {
 
@@ -272,6 +273,7 @@ public class ApplicationPeerTest {
         Mockito.doReturn(null).when(this.eventLoop).schedule(any(Runnable.class), any(long.class), any(TimeUnit.class));
         Mockito.doReturn(Boolean.TRUE).when(this.channel).isWritable();
         Mockito.doReturn(null).when(this.channel).close();
+        Mockito.doReturn(new DefaultChannelPromise(channel)).when(this.channel).writeAndFlush(any(Notification.class));
 
         Mockito.doReturn(new InetSocketAddress("localhost", 12345)).when(this.channel).remoteAddress();
         Mockito.doReturn(new InetSocketAddress("localhost", 12345)).when(this.channel).localAddress();