package org.opendaylight.protocol.bgp.rib.impl;
import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import javax.annotation.concurrent.ThreadSafe;
* for a sessions' channel.
*/
@ThreadSafe
-final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
private final BGPSessionImpl session;
private volatile boolean blocked;
this.session = Preconditions.checkNotNull(session);
}
- void write(final Notification msg) {
- if (blocked) {
- LOG.trace("Blocked slow path tripped on session {}", session);
+ private void ensureWritable() {
+ if (this.blocked) {
+ LOG.trace("Blocked slow path tripped on session {}", this.session);
synchronized (this) {
- while (blocked) {
+ while (this.blocked) {
try {
- LOG.debug("Waiting for session {} to become writable", session);
+ LOG.debug("Waiting for session {} to become writable", this.session);
+ flush();
this.wait();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
}
}
- LOG.debug("Resuming write on session {}", session);
+ LOG.debug("Resuming write on session {}", this.session);
}
}
+ }
+
+ public void write(final Notification msg) {
+ ensureWritable();
+ this.session.write(msg);
+ }
- session.sendMessage(msg);
+ ChannelFuture writeAndFlush(final Notification msg) {
+ ensureWritable();
+ return this.session.writeAndFlush(msg);
}
- void flush() {
- // FIXME: no-op, as we do not have hatching APIs in session yet
+ public void flush() {
+ this.session.flush();
}
@Override
final boolean w = ctx.channel().isWritable();
synchronized (this) {
- blocked = !w;
- LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
+ this.blocked = !w;
+ LOG.debug("Writes on session {} {}", this.session, w ? "unblocked" : "blocked");
if (w) {
- this.notifyAll();
+ notifyAll();
}
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
synchronized (this) {
- blocked = false;
- this.notifyAll();
+ this.blocked = false;
+ notifyAll();
}
super.channelInactive(ctx);