Replace Preconditions.CheckNotNull per RequireNonNull
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / ChannelOutputLimiter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.channel.ChannelFuture;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelInboundHandlerAdapter;
15 import javax.annotation.concurrent.ThreadSafe;
16 import org.opendaylight.yangtools.yang.binding.Notification;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 /**
21  * A best-effort output limiter. It does not provide any fairness, and acts as a blocking gate-keeper
22  * for a sessions' channel.
23  */
24 @ThreadSafe
25 public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
26     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
27     private final BGPSessionImpl session;
28     private volatile boolean blocked;
29
30     ChannelOutputLimiter(final BGPSessionImpl session) {
31         this.session = requireNonNull(session);
32     }
33
34     private void ensureWritable() {
35         if (this.blocked) {
36             LOG.trace("Blocked slow path tripped on session {}", this.session);
37             synchronized (this) {
38                 while (this.blocked) {
39                     try {
40                         LOG.debug("Waiting for session {} to become writable", this.session);
41                         flush();
42                         this.wait();
43                     } catch (final InterruptedException e) {
44                         throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
45                     }
46                 }
47
48                 LOG.debug("Resuming write on session {}", this.session);
49             }
50         }
51     }
52
53     public void write(final Notification msg) {
54         ensureWritable();
55         this.session.write(msg);
56     }
57
58     ChannelFuture writeAndFlush(final Notification msg) {
59         ensureWritable();
60         return this.session.writeAndFlush(msg);
61     }
62
63     public void flush() {
64         this.session.flush();
65     }
66
67     @Override
68     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
69         final boolean w = ctx.channel().isWritable();
70
71         synchronized (this) {
72             this.blocked = !w;
73             LOG.debug("Writes on session {} {}", this.session, w ? "unblocked" : "blocked");
74
75             if (w) {
76                 notifyAll();
77             }
78         }
79
80         super.channelWritabilityChanged(ctx);
81     }
82
83     @Override
84     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
85         synchronized (this) {
86             this.blocked = false;
87             notifyAll();
88         }
89
90         super.channelInactive(ctx);
91     }
92 }