2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bgp.rib.impl.protocol;
10 import static java.util.Objects.requireNonNull;
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelInboundHandlerAdapter;
17 import io.netty.channel.ChannelInitializer;
18 import io.netty.channel.socket.SocketChannel;
19 import io.netty.util.concurrent.DefaultPromise;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
24 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
25 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Void> {
30 private static final Logger LOG = LoggerFactory.getLogger(BGPReconnectPromise.class);
32 private final InetSocketAddress address;
33 private final int retryTimer;
34 private final Bootstrap bootstrap;
35 private final BGPPeerRegistry peerRegistry;
36 private final ChannelPipelineInitializer<S> initializer;
37 private BGPProtocolSessionPromise<S> pending;
39 public BGPReconnectPromise(final @NonNull EventExecutor executor, final @NonNull InetSocketAddress address,
40 final int retryTimer, final @NonNull Bootstrap bootstrap, final @NonNull BGPPeerRegistry peerRegistry,
41 final @NonNull ChannelPipelineInitializer<S> initializer) {
43 this.bootstrap = bootstrap;
44 this.initializer = requireNonNull(initializer);
45 this.address = requireNonNull(address);
46 this.retryTimer = retryTimer;
47 this.peerRegistry = requireNonNull(peerRegistry);
50 public synchronized void connect() {
51 if (this.pending != null) {
52 this.pending.cancel(true);
55 // Set up a client with pre-configured bootstrap, but add a closed channel handler
56 // into the pipeline to support reconnect attempts
57 this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry,
58 (channel, promise) -> {
59 this.initializer.initializeChannel(channel, promise);
60 // add closed channel handler
61 // This handler has to be added as last channel handler and the channel inactive event has to be
63 // Handlers in front of it can react to channelInactive event, but have to forward the event or
64 // the reconnect will not work. This handler is last so all handlers in front of it can handle
65 // channel inactive (to e.g. resource cleanup) before a new connection is started
66 channel.pipeline().addLast(new ClosedChannelHandler(this));
69 this.pending.addListener(future -> {
70 if (!future.isSuccess() && !this.isDone()) {
71 this.setFailure(future.cause());
76 private static <S extends BGPSession> BGPProtocolSessionPromise<S> connectSessionPromise(
77 final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
78 final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer<S> initializer) {
79 final BGPProtocolSessionPromise<S> sessionPromise = new BGPProtocolSessionPromise<>(address, retryTimer,
80 bootstrap, peerRegistry);
81 final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
83 protected void initChannel(final SocketChannel channel) {
84 initializer.initializeChannel(channel, sessionPromise);
88 bootstrap.handler(chInit);
89 sessionPromise.connect();
90 LOG.debug("Client created.");
91 return sessionPromise;
95 * Indicate whether the initial connection was established successfully.
97 * @return true if initial connection was established successfully, false if initial connection failed due
98 * to e.g. Connection refused, Negotiation failed
100 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
101 justification = "https://github.com/spotbugs/spotbugs/issues/811")
102 private synchronized boolean isInitialConnectFinished() {
103 requireNonNull(this.pending);
104 return this.pending.isDone() && this.pending.isSuccess();
107 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
108 justification = "https://github.com/spotbugs/spotbugs/issues/811")
109 private synchronized void reconnect() {
110 requireNonNull(this.pending);
111 this.pending.reconnect();
115 public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
116 if (super.cancel(mayInterruptIfRunning)) {
117 requireNonNull(this.pending);
118 this.pending.cancel(mayInterruptIfRunning);
125 * Channel handler that responds to channelInactive event and reconnects the session.
126 * Only if the promise was not canceled.
128 private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
129 private final BGPReconnectPromise<?> promise;
131 ClosedChannelHandler(final BGPReconnectPromise<?> promise) {
132 this.promise = promise;
136 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
137 // This is the ultimate channel inactive handler, not forwarding
138 if (this.promise.isCancelled()) {
142 if (!this.promise.isInitialConnectFinished()) {
143 LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
144 this.promise.reconnect();
148 LOG.debug("Reconnecting after connection to {} was dropped", this.promise.address);
149 this.promise.connect();