BUG-6470 Fix deadlock in BGP session creation
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / AbstractBGPSessionNegotiator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.bgp.rib.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelFutureListener;
16 import io.netty.channel.ChannelHandlerContext;
17 import io.netty.channel.ChannelInboundHandlerAdapter;
18 import io.netty.util.concurrent.Promise;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.concurrent.GuardedBy;
21 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
22 import org.opendaylight.protocol.bgp.parser.BGPError;
23 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
24 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
25 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
26 import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
27 import org.opendaylight.protocol.util.Values;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.KeepaliveBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Notify;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.NotifyBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Open;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.OpenBuilder;
36 import org.opendaylight.yangtools.yang.binding.Notification;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Bgp Session negotiator. Common for local-to-remote and remote-to-local connections.
42  * One difference is session validation performed by injected BGPSessionValidator when OPEN message is received.
43  */
44 abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandlerAdapter implements SessionNegotiator {
45     // 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2
46     private static final int INITIAL_HOLDTIMER = 4;
47
48     /**
49      * @see <a href="http://tools.ietf.org/html/rfc6793">BGP Support for 4-Octet AS Number Space</a>
50      */
51     private static final int AS_TRANS = 23456;
52
53     @VisibleForTesting
54     public enum State {
55         /**
56          * Negotiation has not started yet.
57          */
58         IDLE,
59         /**
60          * We have sent our Open message, and are waiting for the peer's Open message.
61          */
62         OPEN_SENT,
63         /**
64          * We have received the peer's Open message, which is acceptable, and we're waiting the acknowledgement of our
65          * Open message.
66          */
67         OPEN_CONFIRM,
68         /**
69          * The negotiation finished.
70          */
71         FINISHED,
72     }
73
74     private static final Logger LOG = LoggerFactory.getLogger(AbstractBGPSessionNegotiator.class);
75     private final BGPPeerRegistry registry;
76     private final Promise<BGPSessionImpl> promise;
77     private final Channel channel;
78     @GuardedBy("this")
79     private State state = State.IDLE;
80     @GuardedBy("this")
81     private BGPSessionImpl session;
82
83     AbstractBGPSessionNegotiator(final Promise<BGPSessionImpl> promise, final Channel channel,
84             final BGPPeerRegistry registry) {
85         this.promise = Preconditions.checkNotNull(promise);
86         this.channel = Preconditions.checkNotNull(channel);
87         this.registry = registry;
88     }
89
90     private synchronized void startNegotiation() {
91         if (!(this.state == State.IDLE || this.state == State.OPEN_CONFIRM)) {
92             return;
93         }
94         // Open can be sent first either from ODL (IDLE) or from peer (OPEN_CONFIRM)
95         final IpAddress remoteIp = getRemoteIp();
96         try {
97             // Check if peer is configured in registry before retrieving preferences
98             if (!this.registry.isPeerConfigured(remoteIp)) {
99                 final BGPDocumentedException cause = new BGPDocumentedException(
100                     String.format("BGP peer with ip: %s not configured, check configured peers in : %s", remoteIp, this.registry), BGPError.CONNECTION_REJECTED);
101                 negotiationFailed(cause);
102                 return;
103             }
104
105             final BGPSessionPreferences preferences = this.registry.getPeerPreferences(remoteIp);
106
107             int as = preferences.getMyAs().getValue().intValue();
108             // Set as AS_TRANS if the value is bigger than 2B
109             if (as > Values.UNSIGNED_SHORT_MAX_VALUE) {
110                 as = AS_TRANS;
111             }
112             sendMessage(new OpenBuilder().setMyAsNumber(as).setHoldTimer(preferences.getHoldTime()).setBgpIdentifier(
113                 preferences.getBgpId()).setBgpParameters(preferences.getParams()).build());
114             if (this.state != State.FINISHED) {
115                 this.state = State.OPEN_SENT;
116                 this.channel.eventLoop().schedule(new Runnable() {
117                     @Override
118                     public void run() {
119                         synchronized (AbstractBGPSessionNegotiator.this) {
120                             if (AbstractBGPSessionNegotiator.this.state != State.FINISHED) {
121                                 AbstractBGPSessionNegotiator.this
122                                     .sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED));
123                                 negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
124                                 AbstractBGPSessionNegotiator.this.state = State.FINISHED;
125                             }
126                         }
127                     }
128                 }, INITIAL_HOLDTIMER, TimeUnit.MINUTES);
129             }
130         } catch (final Exception e) {
131             LOG.warn("Unexpected negotiation failure", e);
132             negotiationFailedCloseChannel(e);
133         }
134     }
135
136     private IpAddress getRemoteIp() {
137         return StrictBGPPeerRegistry.getIpAddress(this.channel.remoteAddress());
138     }
139
140     protected synchronized void handleMessage(final Notification msg) {
141         LOG.debug("Channel {} handling message in state {}, msg: {}", this.channel, this.state, msg);
142         switch (this.state) {
143         case FINISHED:
144             sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
145             return;
146         case IDLE:
147             // to avoid race condition when Open message was sent by the peer before startNegotiation could be executed
148             if (msg instanceof Open) {
149                 startNegotiation();
150                 handleOpen((Open) msg);
151                 return;
152             }
153             sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
154             break;
155         case OPEN_CONFIRM:
156             if (msg instanceof Keepalive) {
157                 negotiationSuccessful(this.session);
158                 LOG.info("BGP Session with peer {} established successfully.", this.channel);
159             } else if (msg instanceof Notify) {
160                 final Notify ntf = (Notify) msg;
161                 negotiationFailed(new BGPDocumentedException("Peer refusal", BGPError.forValue(ntf.getErrorCode(), ntf.getErrorSubcode())));
162             }
163             this.state = State.FINISHED;
164             return;
165         case OPEN_SENT:
166             if (msg instanceof Open) {
167                 handleOpen((Open) msg);
168                 return;
169             }
170             break;
171         default:
172             break;
173         }
174
175         // Catch-all for unexpected message
176         LOG.warn("Channel {} state {} unexpected message {}", this.channel, this.state, msg);
177         sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
178         negotiationFailed(new BGPDocumentedException("Unexpected message channel: " + this.channel + ", state: " + this.state + ", message: " + msg, BGPError.FSM_ERROR));
179         this.state = State.FINISHED;
180     }
181
182     private static Notify buildErrorNotify(final BGPError err) {
183         return buildErrorNotify(err, null);
184     }
185
186     private static Notify buildErrorNotify(final BGPError err, final byte[] data) {
187         final NotifyBuilder builder = new NotifyBuilder().setErrorCode(err.getCode()).setErrorSubcode(err.getSubcode());
188         if (data != null && data.length != 0) {
189             builder.setData(data);
190         }
191         return builder.build();
192     }
193
194     private synchronized void handleOpen(final Open openObj) {
195         final IpAddress remoteIp = getRemoteIp();
196         final BGPSessionPreferences preferences = this.registry.getPeerPreferences(remoteIp);
197         try {
198             final BGPSessionListener peer = this.registry.getPeer(remoteIp, getSourceId(openObj, preferences), getDestinationId(openObj, preferences), openObj);
199             sendMessage(new KeepaliveBuilder().build());
200             this.state = State.OPEN_CONFIRM;
201             this.session = new BGPSessionImpl(peer, this.channel, openObj, preferences, this.registry);
202             this.session.setChannelExtMsgCoder(openObj);
203             LOG.debug("Channel {} moved to OPEN_CONFIRM state with remote proposal {}", this.channel, openObj);
204         } catch (final BGPDocumentedException e) {
205             LOG.warn("Channel {} negotiation failed", this.channel, e);
206             negotiationFailed(e);
207         }
208     }
209
210     private synchronized void negotiationFailed(final Throwable e) {
211         LOG.warn("Channel {} negotiation failed: {}", this.channel, e.getMessage());
212         if (e instanceof BGPDocumentedException) {
213             // although sendMessage() can also result in calling this method, it won't create a cycle. In case sendMessage() fails to
214             // deliver the message, this method gets called with different exception (definitely not with BGPDocumentedException).
215             sendMessage(buildErrorNotify(((BGPDocumentedException)e).getError(), ((BGPDocumentedException) e).getData()));
216         }
217         if (this.state == State.OPEN_CONFIRM) {
218             this.registry.removePeerSession(getRemoteIp());
219         }
220         negotiationFailedCloseChannel(e);
221         this.state = State.FINISHED;
222     }
223
224     /**
225      * @param openMsg Open message received from remote BGP speaker
226      * @param preferences Local BGP speaker preferences
227      * @return BGP Id of device that accepted the connection
228      */
229     protected abstract Ipv4Address getDestinationId(final Open openMsg, final BGPSessionPreferences preferences);
230
231     /**
232      * @param openMsg Open message received from remote BGP speaker
233      * @param preferences Local BGP speaker preferences
234      * @return BGP Id of device that accepted the connection
235      */
236     protected abstract Ipv4Address getSourceId(final Open openMsg, final BGPSessionPreferences preferences);
237
238     public synchronized State getState() {
239         return this.state;
240     }
241
242     private void negotiationSuccessful(final BGPSessionImpl session) {
243         LOG.debug("Negotiation on channel {} successful with session {}", this.channel, session);
244         this.channel.pipeline().replace(this, "session", session);
245         this.promise.setSuccess(session);
246     }
247
248     private void negotiationFailedCloseChannel(final Throwable cause) {
249         LOG.debug("Negotiation on channel {} failed", this.channel, cause);
250         this.channel.close();
251         this.promise.setFailure(cause);
252     }
253
254     private void sendMessage(final Notification msg) {
255         this.channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
256             @Override
257             public void operationComplete(final ChannelFuture f) {
258                 if (!f.isSuccess()) {
259                     LOG.warn("Failed to send message {} to channel {}", msg,  AbstractBGPSessionNegotiator.this.channel, f.cause());
260                     negotiationFailedCloseChannel(f.cause());
261                 } else {
262                     LOG.trace("Message {} sent to channel {}", msg, AbstractBGPSessionNegotiator.this.channel);
263                 }
264             }
265         });
266     }
267
268     @Override
269     public final void channelActive(final ChannelHandlerContext ctx) {
270         LOG.debug("Starting session negotiation on channel {}", this.channel);
271         startNegotiation();
272     }
273
274     @Override
275     public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
276         LOG.debug("Negotiation read invoked on channel {}", this.channel);
277         try {
278             handleMessage((Notification) msg);
279         } catch (final Exception e) {
280             LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
281             negotiationFailedCloseChannel(e);
282         }
283
284     }
285
286     @Override
287     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
288         LOG.info("Unexpected error during negotiation", cause);
289         negotiationFailedCloseChannel(cause);
290     }
291 }