*/
package org.opendaylight.protocol.bgp.rib.impl;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A best-effort output limiter. It does not provide any fairness, and acts as a blocking gate-keeper
* for a sessions' channel.
+ *
+ * <p>
+ * This class is thread-safe.
*/
-@ThreadSafe
-final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
+public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
private final BGPSessionImpl session;
private volatile boolean blocked;
ChannelOutputLimiter(final BGPSessionImpl session) {
- this.session = Preconditions.checkNotNull(session);
+ this.session = requireNonNull(session);
}
private void ensureWritable() {
while (blocked) {
try {
LOG.debug("Waiting for session {} to become writable", session);
+ flush();
this.wait();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
}
}
}
}
- void write(final Notification msg) {
+ public void write(final Notification<?> msg) {
ensureWritable();
session.write(msg);
}
- void writeAndFlush(final Notification msg) {
+ ChannelFuture writeAndFlush(final Notification<?> msg) {
ensureWritable();
- session.writeAndFlush(msg);
+ return session.writeAndFlush(msg);
}
- void flush() {
+ public void flush() {
session.flush();
}
LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
if (w) {
- this.notifyAll();
+ notifyAll();
}
}
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
synchronized (this) {
blocked = false;
- this.notifyAll();
+ notifyAll();
}
super.channelInactive(ctx);