f80960ef99df353fc9f25448a453929e33867d7b
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / protocol / BGPProtocolSessionPromise.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl.protocol;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelFutureListener;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoop;
18 import io.netty.util.concurrent.DefaultPromise;
19 import io.netty.util.concurrent.GlobalEventExecutor;
20 import io.netty.util.concurrent.Promise;
21 import java.net.InetSocketAddress;
22 import java.util.concurrent.TimeUnit;
23 import javax.annotation.Nonnull;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.protocol.bgp.rib.impl.StrictBGPPeerRegistry;
26 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
27 import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
28 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public final class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
34     private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
35     private static final int CONNECT_TIMEOUT = 5000;
36
37     private InetSocketAddress address;
38     private final int retryTimer;
39     private final Bootstrap bootstrap;
40     private final BGPPeerRegistry peerRegistry;
41     @GuardedBy("this")
42     private final AutoCloseable listenerRegistration;
43     @GuardedBy("this")
44     private ChannelFuture pending;
45     @GuardedBy("this")
46     private boolean peerSessionPresent;
47     @GuardedBy("this")
48     private boolean connectSkipped;
49
50
51     public BGPProtocolSessionPromise(@Nonnull final InetSocketAddress remoteAddress, final int retryTimer,
52         @Nonnull final Bootstrap bootstrap, @Nonnull final BGPPeerRegistry peerRegistry) {
53         super(GlobalEventExecutor.INSTANCE);
54         this.address = requireNonNull(remoteAddress);
55         this.retryTimer = retryTimer;
56         this.bootstrap = requireNonNull(bootstrap);
57         this.peerRegistry = requireNonNull(peerRegistry);
58         this.listenerRegistration = this.peerRegistry.registerPeerSessionListener(
59             new PeerRegistrySessionListenerImpl(this, StrictBGPPeerRegistry.getIpAddress(this.address)));
60     }
61
62     public synchronized void connect() {
63         if (this.peerSessionPresent) {
64             LOG.debug("Connection to {} already exists", this.address);
65             this.connectSkipped = true;
66             return;
67         }
68
69         this.connectSkipped = false;
70
71         final BGPProtocolSessionPromise<?> lock = this;
72         try {
73             LOG.debug("Promise {} attempting connect for {}ms", lock, CONNECT_TIMEOUT);
74             if (this.address.isUnresolved()) {
75                 this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
76             }
77
78             this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
79             this.bootstrap.remoteAddress(this.address);
80             final ChannelFuture connectFuture = this.bootstrap.connect();
81             connectFuture.addListener(new BootstrapConnectListener(lock));
82             this.pending = connectFuture;
83         } catch (final Exception e) {
84             LOG.warn("Failed to connect to {}", this.address, e);
85             this.setFailure(e);
86         }
87     }
88
89     public synchronized void reconnect() {
90         if (this.retryTimer == 0) {
91             LOG.debug("Retry timer value is 0. Reconnection will not be attempted");
92             this.setFailure(this.pending.cause());
93             return;
94         }
95
96         final BGPProtocolSessionPromise<?> lock = this;
97         final EventLoop loop = this.pending.channel().eventLoop();
98         loop.schedule(() -> {
99             synchronized (BGPProtocolSessionPromise.this) {
100                 if (BGPProtocolSessionPromise.this.peerSessionPresent) {
101                     LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
102                     BGPProtocolSessionPromise.this.connectSkipped = true;
103                     return;
104                 }
105
106                 BGPProtocolSessionPromise.this.connectSkipped = false;
107                 LOG.debug("Attempting to connect to {}", BGPProtocolSessionPromise.this.address);
108                 final ChannelFuture reconnectFuture = BGPProtocolSessionPromise.this.bootstrap.connect();
109                 reconnectFuture.addListener(new BootstrapConnectListener(lock));
110                 BGPProtocolSessionPromise.this.pending = reconnectFuture;
111             }
112         }, this.retryTimer, TimeUnit.SECONDS);
113         LOG.debug("Next reconnection attempt in {}s", this.retryTimer);
114     }
115
116     @Override
117     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
118         closePeerSessionListener();
119         if (super.cancel(mayInterruptIfRunning)) {
120             requireNonNull(this.pending);
121             this.pending.cancel(mayInterruptIfRunning);
122             return true;
123         }
124
125         return false;
126     }
127
128     private synchronized void closePeerSessionListener() {
129         try {
130             this.listenerRegistration.close();
131         } catch (final Exception e) {
132             LOG.debug("Exception encountered while closing peer registry session listener registration", e);
133         }
134     }
135
136     @Override
137     public synchronized Promise<S> setSuccess(final S result) {
138         LOG.debug("Promise {} completed", this);
139         return super.setSuccess(result);
140     }
141
142     private class BootstrapConnectListener implements ChannelFutureListener {
143         @GuardedBy("this")
144         private final Object lock;
145
146         BootstrapConnectListener(final Object lock) {
147             this.lock = lock;
148         }
149
150         @Override
151         public void operationComplete(final ChannelFuture channelFuture) throws Exception {
152             synchronized (this.lock) {
153                 BGPProtocolSessionPromise.LOG.debug("Promise {} connection resolved", this.lock);
154                 Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(channelFuture));
155                 if (BGPProtocolSessionPromise.this.isCancelled()) {
156                     if (channelFuture.isSuccess()) {
157                         BGPProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}", this.lock);
158                         channelFuture.channel().close();
159                     }
160                 } else if (channelFuture.isSuccess()) {
161                     BGPProtocolSessionPromise.LOG.debug("Promise {} connection successful", this.lock);
162                 } else {
163                     BGPProtocolSessionPromise.LOG.warn("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, channelFuture.cause());
164                     BGPProtocolSessionPromise.this.reconnect();
165                 }
166             }
167         }
168     }
169
170     private class PeerRegistrySessionListenerImpl implements PeerRegistrySessionListener {
171         @GuardedBy("this")
172         private final Object lock;
173         private final IpAddress peerAddress;
174
175         PeerRegistrySessionListenerImpl(final Object lock, final IpAddress peerAddress) {
176             this.lock = lock;
177             this.peerAddress = peerAddress;
178         }
179
180         @Override
181         public void onSessionCreated(@Nonnull final IpAddress ip) {
182             if (!ip.equals(this.peerAddress)) {
183                 return;
184             }
185             BGPProtocolSessionPromise.LOG.debug("Callback for session creation with peer {} received", ip);
186             synchronized (this.lock) {
187                 BGPProtocolSessionPromise.this.peerSessionPresent = true;
188             }
189         }
190
191         @Override
192         public void onSessionRemoved(@Nonnull final IpAddress ip) {
193             if (!ip.equals(this.peerAddress)) {
194                 return;
195             }
196             BGPProtocolSessionPromise.LOG.debug("Callback for session removal with peer {} received", ip);
197             synchronized (this.lock) {
198                 BGPProtocolSessionPromise.this.peerSessionPresent = false;
199                 if (BGPProtocolSessionPromise.this.connectSkipped) {
200                     BGPProtocolSessionPromise.this.connect();
201                 }
202             }
203         }
204     }
205
206 }