Bump odlparent to 6.0.0
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / protocol / BGPReconnectPromise.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl.protocol;
9
10 import static java.util.Objects.requireNonNull;
11
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelInboundHandlerAdapter;
17 import io.netty.channel.ChannelInitializer;
18 import io.netty.channel.socket.SocketChannel;
19 import io.netty.util.concurrent.DefaultPromise;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
24 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
25 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Void> {
30     private static final Logger LOG = LoggerFactory.getLogger(BGPReconnectPromise.class);
31
32     private final InetSocketAddress address;
33     private final int retryTimer;
34     private final Bootstrap bootstrap;
35     private final BGPPeerRegistry peerRegistry;
36     private final ChannelPipelineInitializer<S> initializer;
37     private BGPProtocolSessionPromise<S> pending;
38
39     public BGPReconnectPromise(final @NonNull EventExecutor executor, final @NonNull InetSocketAddress address,
40             final int retryTimer, final @NonNull Bootstrap bootstrap, final @NonNull BGPPeerRegistry peerRegistry,
41             final @NonNull ChannelPipelineInitializer<S> initializer) {
42         super(executor);
43         this.bootstrap = bootstrap;
44         this.initializer = requireNonNull(initializer);
45         this.address = requireNonNull(address);
46         this.retryTimer = retryTimer;
47         this.peerRegistry = requireNonNull(peerRegistry);
48     }
49
50     public synchronized void connect() {
51         if (this.pending != null) {
52             this.pending.cancel(true);
53         }
54
55         // Set up a client with pre-configured bootstrap, but add a closed channel handler
56         // into the pipeline to support reconnect attempts
57         this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry,
58             (channel, promise) -> {
59                 this.initializer.initializeChannel(channel, promise);
60                 // add closed channel handler
61                 // This handler has to be added as last channel handler and the channel inactive event has to be
62                 // caught by it
63                 // Handlers in front of it can react to channelInactive event, but have to forward the event or
64                 // the reconnect will not work. This handler is last so all handlers in front of it can handle
65                 // channel inactive (to e.g. resource cleanup) before a new connection is started
66                 channel.pipeline().addLast(new ClosedChannelHandler(this));
67             });
68
69         this.pending.addListener(future -> {
70             if (!future.isSuccess() && !this.isDone()) {
71                 this.setFailure(future.cause());
72             }
73         });
74     }
75
76     private static <S extends BGPSession> BGPProtocolSessionPromise<S> connectSessionPromise(
77             final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
78             final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer<S> initializer) {
79         final BGPProtocolSessionPromise<S> sessionPromise = new BGPProtocolSessionPromise<>(address, retryTimer,
80                 bootstrap, peerRegistry);
81         final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
82             @Override
83             protected void initChannel(final SocketChannel channel) {
84                 initializer.initializeChannel(channel, sessionPromise);
85             }
86         };
87
88         bootstrap.handler(chInit);
89         sessionPromise.connect();
90         LOG.debug("Client created.");
91         return sessionPromise;
92     }
93
94     /**
95      * Indicate whether the initial connection was established successfully.
96      *
97      * @return true if initial connection was established successfully, false if initial connection failed due
98      *         to e.g. Connection refused, Negotiation failed
99      */
100     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
101             justification = "https://github.com/spotbugs/spotbugs/issues/811")
102     private synchronized boolean isInitialConnectFinished() {
103         requireNonNull(this.pending);
104         return this.pending.isDone() && this.pending.isSuccess();
105     }
106
107     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
108             justification = "https://github.com/spotbugs/spotbugs/issues/811")
109     private synchronized void reconnect() {
110         requireNonNull(this.pending);
111         this.pending.reconnect();
112     }
113
114     @Override
115     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
116         if (super.cancel(mayInterruptIfRunning)) {
117             requireNonNull(this.pending);
118             this.pending.cancel(mayInterruptIfRunning);
119             return true;
120         }
121         return false;
122     }
123
124     /**
125      * Channel handler that responds to channelInactive event and reconnects the session.
126      * Only if the promise was not canceled.
127      */
128     private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
129         private final BGPReconnectPromise<?> promise;
130
131         ClosedChannelHandler(final BGPReconnectPromise<?> promise) {
132             this.promise = promise;
133         }
134
135         @Override
136         public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
137             // This is the ultimate channel inactive handler, not forwarding
138             if (this.promise.isCancelled()) {
139                 return;
140             }
141
142             if (!this.promise.isInitialConnectFinished()) {
143                 LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
144                 this.promise.reconnect();
145                 return;
146             }
147
148             LOG.debug("Reconnecting after connection to {} was dropped", this.promise.address);
149             this.promise.connect();
150         }
151     }
152 }