Move future declaration
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / ChannelOutputLimiter.java
index 4ea3f54a28320ae15b3a5137f2841575d0dfa58e..d492daf09c745f627340cca31fdd6fa90d5cc5da 100644 (file)
@@ -7,10 +7,11 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,26 +19,29 @@ import org.slf4j.LoggerFactory;
 /**
  * A best-effort output limiter. It does not provide any fairness, and acts as a blocking gate-keeper
  * for a sessions' channel.
+ *
+ * <p>
+ * This class is thread-safe.
  */
-@ThreadSafe
-final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
     private final BGPSessionImpl session;
     private volatile boolean blocked;
 
     ChannelOutputLimiter(final BGPSessionImpl session) {
-        this.session = Preconditions.checkNotNull(session);
+        this.session = requireNonNull(session);
     }
 
-    void write(final Notification msg) {
+    private void ensureWritable() {
         if (blocked) {
             LOG.trace("Blocked slow path tripped on session {}", session);
             synchronized (this) {
                 while (blocked) {
                     try {
                         LOG.debug("Waiting for session {} to become writable", session);
+                        flush();
                         this.wait();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                         throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
                     }
                 }
@@ -45,12 +49,20 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
                 LOG.debug("Resuming write on session {}", session);
             }
         }
+    }
+
+    public void write(final Notification<?> msg) {
+        ensureWritable();
+        session.write(msg);
+    }
 
-        session.sendMessage(msg);
+    ChannelFuture writeAndFlush(final Notification<?> msg) {
+        ensureWritable();
+        return session.writeAndFlush(msg);
     }
 
-    void flush() {
-        // FIXME: no-op, as we do not have hatching APIs in session yet
+    public void flush() {
+        session.flush();
     }
 
     @Override
@@ -62,7 +74,7 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
             LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
 
             if (w) {
-                this.notifyAll();
+                notifyAll();
             }
         }
 
@@ -73,7 +85,7 @@ final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         synchronized (this) {
             blocked = false;
-            this.notifyAll();
+            notifyAll();
         }
 
         super.channelInactive(ctx);