Bump versions by x.y.(z+1)
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / ChannelOutputLimiter.java
index 21150b9431bc9b6fddeb25ac645425edae7deec7..ca818341b734b38a1c289e15403d61544fefb640 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.protocol.bgp.rib.impl;
 
 import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import javax.annotation.concurrent.ThreadSafe;
@@ -20,7 +21,7 @@ import org.slf4j.LoggerFactory;
  * for a sessions' channel.
  */
 @ThreadSafe
-final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
     private final BGPSessionImpl session;
     private volatile boolean blocked;
@@ -30,35 +31,36 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
     }
 
     private void ensureWritable() {
-        if (blocked) {
-            LOG.trace("Blocked slow path tripped on session {}", session);
+        if (this.blocked) {
+            LOG.trace("Blocked slow path tripped on session {}", this.session);
             synchronized (this) {
-                while (blocked) {
+                while (this.blocked) {
                     try {
-                        LOG.debug("Waiting for session {} to become writable", session);
+                        LOG.debug("Waiting for session {} to become writable", this.session);
+                        flush();
                         this.wait();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                         throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
                     }
                 }
 
-                LOG.debug("Resuming write on session {}", session);
+                LOG.debug("Resuming write on session {}", this.session);
             }
         }
     }
 
-    void write(final Notification msg) {
+    public void write(final Notification msg) {
         ensureWritable();
-        session.write(msg);
+        this.session.write(msg);
     }
 
-    void writeAndFlush(final Notification msg) {
+    ChannelFuture writeAndFlush(final Notification msg) {
         ensureWritable();
-        session.writeAndFlush(msg);
+        return this.session.writeAndFlush(msg);
     }
 
-    void flush() {
-        session.flush();
+    public void flush() {
+        this.session.flush();
     }
 
     @Override
@@ -66,13 +68,11 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
         final boolean w = ctx.channel().isWritable();
 
         synchronized (this) {
-            blocked = !w;
-            LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
+            this.blocked = !w;
+            LOG.debug("Writes on session {} {}", this.session, w ? "unblocked" : "blocked");
 
             if (w) {
-                this.notifyAll();
-            } else {
-                flush();
+                notifyAll();
             }
         }
 
@@ -82,8 +82,8 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         synchronized (this) {
-            blocked = false;
-            this.notifyAll();
+            this.blocked = false;
+            notifyAll();
         }
 
         super.channelInactive(ctx);