BUG-6747: Race condition on peer connection
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / protocol / BGPProtocolSessionPromise.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl.protocol;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.bootstrap.Bootstrap;
12 import io.netty.channel.ChannelFuture;
13 import io.netty.channel.ChannelFutureListener;
14 import io.netty.channel.ChannelOption;
15 import io.netty.channel.EventLoop;
16 import io.netty.util.concurrent.DefaultPromise;
17 import io.netty.util.concurrent.GlobalEventExecutor;
18 import io.netty.util.concurrent.Promise;
19 import java.net.InetSocketAddress;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.concurrent.GuardedBy;
22 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
27     private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
28     private static final int CONNECT_TIMEOUT = 5000;
29
30     private InetSocketAddress address;
31     private final int retryTimer;
32     private final Bootstrap bootstrap;
33     @GuardedBy("this")
34     private ChannelFuture pending;
35
36     public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap) {
37         super(GlobalEventExecutor.INSTANCE);
38         this.address = Preconditions.checkNotNull(remoteAddress);
39         this.retryTimer = retryTimer;
40         this.bootstrap = Preconditions.checkNotNull(bootstrap);
41     }
42
43     public synchronized void connect() {
44         final BGPProtocolSessionPromise lock = this;
45
46         try {
47             LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(CONNECT_TIMEOUT));
48             if (this.address.isUnresolved()) {
49                 this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
50             }
51
52             this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
53             this.bootstrap.remoteAddress(this.address);
54             final ChannelFuture connectFuture = this.bootstrap.connect();
55             connectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener(lock));
56             this.pending = connectFuture;
57         } catch (Exception e) {
58             LOG.info("Failed to connect to {}", this.address, e);
59             this.setFailure(e);
60         }
61     }
62
63     public synchronized void reconnect() {
64         if (this.retryTimer == 0) {
65             LOG.debug("Retry timer value is 0. Reconnection will not be attempted");
66             this.setFailure(this.pending.cause());
67             return;
68         }
69
70         final BGPProtocolSessionPromise lock = this;
71         final EventLoop loop = this.pending.channel().eventLoop();
72         loop.schedule(new Runnable() {
73             @Override
74             public void run() {
75                 LOG.debug("Attempting to connect to {}", BGPProtocolSessionPromise.this.address);
76                 final ChannelFuture reconnectFuture = BGPProtocolSessionPromise.this.bootstrap.connect();
77                 reconnectFuture.addListener(new BootstrapConnectListener(lock));
78                 BGPProtocolSessionPromise.this.pending = reconnectFuture;
79             }
80         }, this.retryTimer, TimeUnit.SECONDS);
81         LOG.debug("Next reconnection attempt in {}s", this.retryTimer);
82     }
83
84     @Override
85     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
86         if (super.cancel(mayInterruptIfRunning)) {
87             this.pending.cancel(mayInterruptIfRunning);
88             return true;
89         } else {
90             return false;
91         }
92     }
93
94     @Override
95     public synchronized Promise<S> setSuccess(final S result) {
96         LOG.debug("Promise {} completed", this);
97         return super.setSuccess(result);
98     }
99
100     private class BootstrapConnectListener implements ChannelFutureListener {
101         private final Object lock;
102
103         public BootstrapConnectListener(final Object lock) {
104             this.lock = lock;
105         }
106
107         @Override
108         public void operationComplete(final ChannelFuture channelFuture) throws Exception {
109             synchronized (this.lock) {
110                 BGPProtocolSessionPromise.LOG.debug("Promise {} connection resolved", this.lock);
111                 Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(channelFuture));
112                 if (BGPProtocolSessionPromise.this.isCancelled()) {
113                     if (channelFuture.isSuccess()) {
114                         BGPProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}", this.lock);
115                         channelFuture.channel().close();
116                     }
117                 } else if (channelFuture.isSuccess()) {
118                     BGPProtocolSessionPromise.LOG.debug("Promise {} connection successful", this.lock);
119                 } else {
120                     BGPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, channelFuture.cause());
121                     BGPProtocolSessionPromise.this.reconnect();
122                 }
123             }
124         }
125     }
126 }