Move future declaration
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / ChannelOutputLimiter.java
index 3a610b6221eb725b8c80bd0d08d9fae15728d72c..d492daf09c745f627340cca31fdd6fa90d5cc5da 100644 (file)
@@ -7,10 +7,11 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,15 +19,17 @@ import org.slf4j.LoggerFactory;
 /**
  * A best-effort output limiter. It does not provide any fairness, and acts as a blocking gate-keeper
  * for a sessions' channel.
+ *
+ * <p>
+ * This class is thread-safe.
  */
-@ThreadSafe
-final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
     private final BGPSessionImpl session;
     private volatile boolean blocked;
 
     ChannelOutputLimiter(final BGPSessionImpl session) {
-        this.session = Preconditions.checkNotNull(session);
+        this.session = requireNonNull(session);
     }
 
     private void ensureWritable() {
@@ -36,8 +39,9 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
                 while (blocked) {
                     try {
                         LOG.debug("Waiting for session {} to become writable", session);
+                        flush();
                         this.wait();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                         throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
                     }
                 }
@@ -47,17 +51,17 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
         }
     }
 
-    void write(final Notification msg) {
+    public void write(final Notification<?> msg) {
         ensureWritable();
         session.write(msg);
     }
 
-    void writeAndFlush(final Notification msg) {
+    ChannelFuture writeAndFlush(final Notification<?> msg) {
         ensureWritable();
-        session.writeAndFlush(msg);
+        return session.writeAndFlush(msg);
     }
 
-    void flush() {
+    public void flush() {
         session.flush();
     }
 
@@ -70,7 +74,7 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
             LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
 
             if (w) {
-                this.notifyAll();
+                notifyAll();
             }
         }
 
@@ -81,7 +85,7 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         synchronized (this) {
             blocked = false;
-            this.notifyAll();
+            notifyAll();
         }
 
         super.channelInactive(ctx);