package org.opendaylight.protocol.bgp.rib.impl;
import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import javax.annotation.concurrent.ThreadSafe;
* for a sessions' channel.
*/
@ThreadSafe
-final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
private final BGPSessionImpl session;
private volatile boolean blocked;
}
private void ensureWritable() {
- if (blocked) {
- LOG.trace("Blocked slow path tripped on session {}", session);
+ if (this.blocked) {
+ LOG.trace("Blocked slow path tripped on session {}", this.session);
synchronized (this) {
- while (blocked) {
+ while (this.blocked) {
try {
- LOG.debug("Waiting for session {} to become writable", session);
+ LOG.debug("Waiting for session {} to become writable", this.session);
+ flush();
this.wait();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
}
}
- LOG.debug("Resuming write on session {}", session);
+ LOG.debug("Resuming write on session {}", this.session);
}
}
}
- void write(final Notification msg) {
+ public void write(final Notification msg) {
ensureWritable();
- session.write(msg);
+ this.session.write(msg);
}
- void writeAndFlush(final Notification msg) {
+ ChannelFuture writeAndFlush(final Notification msg) {
ensureWritable();
- session.writeAndFlush(msg);
+ return this.session.writeAndFlush(msg);
}
- void flush() {
- session.flush();
+ public void flush() {
+ this.session.flush();
}
@Override
final boolean w = ctx.channel().isWritable();
synchronized (this) {
- blocked = !w;
- LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
+ this.blocked = !w;
+ LOG.debug("Writes on session {} {}", this.session, w ? "unblocked" : "blocked");
if (w) {
- this.notifyAll();
- } else {
- flush();
+ notifyAll();
}
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
synchronized (this) {
- blocked = false;
- this.notifyAll();
+ this.blocked = false;
+ notifyAll();
}
super.channelInactive(ctx);