BUG-2283: introduce OutputLimiter 12/16012/7
authorRobert Varga <rovarga@cisco.com>
Wed, 4 Mar 2015 11:31:54 +0000 (12:31 +0100)
committerRobert Varga <nite@hq.sk>
Thu, 5 Mar 2015 11:05:54 +0000 (11:05 +0000)
This is a simple utility class to perform backpressure based on channel
writability.

Change-Id: I2b4018722eee9dfa88ff92d49d17b969134a7de3
Signed-off-by: Robert Varga <rovarga@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ChannelOutputLimiter.java [new file with mode: 0644]

index 7720538b1c0ab8bb998b08cf2eef820f90a429f7..f720aae414330da939ae1c8f490699c6cfbc52e4 100644 (file)
@@ -107,6 +107,7 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
     private final AsNumber asNumber;
     private final Ipv4Address bgpId;
     private final BGPPeerRegistry peerRegistry;
+    private final ChannelOutputLimiter limiter;
 
     private BGPSessionStats sessionStats;
 
@@ -120,6 +121,7 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
             final BGPPeerRegistry peerRegitry) {
         this.listener = Preconditions.checkNotNull(listener);
         this.channel = Preconditions.checkNotNull(channel);
+        this.limiter = new ChannelOutputLimiter(this);
         this.holdTimerValue = (remoteOpen.getHoldTimer() < localHoldTimer) ? remoteOpen.getHoldTimer() : localHoldTimer;
         LOG.info("BGP HoldTimer new value: {}", this.holdTimerValue);
         this.keepAlive = this.holdTimerValue / KA_TO_DEADTIMER_RATIO;
@@ -389,4 +391,8 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
     public synchronized void resetSessionStats() {
         this.sessionStats.resetStats();
     }
+
+    ChannelOutputLimiter getLimiter() {
+        return limiter;
+    }
 }
diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ChannelOutputLimiter.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ChannelOutputLimiter.java
new file mode 100644 (file)
index 0000000..4ea3f54
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A best-effort output limiter. It does not provide any fairness, and acts as a blocking gate-keeper
+ * for a sessions' channel.
+ */
+@ThreadSafe
+final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+    private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
+    private final BGPSessionImpl session;
+    private volatile boolean blocked;
+
+    ChannelOutputLimiter(final BGPSessionImpl session) {
+        this.session = Preconditions.checkNotNull(session);
+    }
+
+    void write(final Notification msg) {
+        if (blocked) {
+            LOG.trace("Blocked slow path tripped on session {}", session);
+            synchronized (this) {
+                while (blocked) {
+                    try {
+                        LOG.debug("Waiting for session {} to become writable", session);
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
+                    }
+                }
+
+                LOG.debug("Resuming write on session {}", session);
+            }
+        }
+
+        session.sendMessage(msg);
+    }
+
+    void flush() {
+        // FIXME: no-op, as we do not have hatching APIs in session yet
+    }
+
+    @Override
+    public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
+        final boolean w = ctx.channel().isWritable();
+
+        synchronized (this) {
+            blocked = !w;
+            LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
+
+            if (w) {
+                this.notifyAll();
+            }
+        }
+
+        super.channelWritabilityChanged(ctx);
+    }
+
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        synchronized (this) {
+            blocked = false;
+            this.notifyAll();
+        }
+
+        super.channelInactive(ctx);
+    }
+}