* for a sessions' channel.
*/
@ThreadSafe
-final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
private final BGPSessionImpl session;
private volatile boolean blocked;
while (this.blocked) {
try {
LOG.debug("Waiting for session {} to become writable", this.session);
+ flush();
this.wait();
} catch (final InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
}
}
- void write(final Notification msg) {
+ public void write(final Notification msg) {
ensureWritable();
this.session.write(msg);
}
return this.session.writeAndFlush(msg);
}
- void flush() {
+ public void flush() {
this.session.flush();
}
LOG.debug("Writes on session {} {}", this.session, w ? "unblocked" : "blocked");
if (w) {
- this.notifyAll();
- } else {
- flush();
+ notifyAll();
}
}
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
synchronized (this) {
this.blocked = false;
- this.notifyAll();
+ notifyAll();
}
super.channelInactive(ctx);