Fix raw type warnings in bgp/bmp
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / ChannelOutputLimiter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.channel.ChannelFuture;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelInboundHandlerAdapter;
15 import org.opendaylight.yangtools.yang.binding.Notification;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * A best-effort output limiter. It does not provide any fairness, and acts as a blocking gate-keeper
21  * for a sessions' channel.
22  *
23  * <p>
24  * This class is thread-safe.
25  */
26 public final class ChannelOutputLimiter extends ChannelInboundHandlerAdapter {
27     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputLimiter.class);
28     private final BGPSessionImpl session;
29     private volatile boolean blocked;
30
31     ChannelOutputLimiter(final BGPSessionImpl session) {
32         this.session = requireNonNull(session);
33     }
34
35     private void ensureWritable() {
36         if (blocked) {
37             LOG.trace("Blocked slow path tripped on session {}", session);
38             synchronized (this) {
39                 while (blocked) {
40                     try {
41                         LOG.debug("Waiting for session {} to become writable", session);
42                         flush();
43                         this.wait();
44                     } catch (final InterruptedException e) {
45                         throw new IllegalStateException("Interrupted while waiting for channel to come back", e);
46                     }
47                 }
48
49                 LOG.debug("Resuming write on session {}", session);
50             }
51         }
52     }
53
54     public void write(final Notification<?> msg) {
55         ensureWritable();
56         session.write(msg);
57     }
58
59     ChannelFuture writeAndFlush(final Notification<?> msg) {
60         ensureWritable();
61         return session.writeAndFlush(msg);
62     }
63
64     public void flush() {
65         session.flush();
66     }
67
68     @Override
69     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
70         final boolean w = ctx.channel().isWritable();
71
72         synchronized (this) {
73             blocked = !w;
74             LOG.debug("Writes on session {} {}", session, w ? "unblocked" : "blocked");
75
76             if (w) {
77                 notifyAll();
78             }
79         }
80
81         super.channelWritabilityChanged(ctx);
82     }
83
84     @Override
85     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
86         synchronized (this) {
87             blocked = false;
88             notifyAll();
89         }
90
91         super.channelInactive(ctx);
92     }
93 }