Bump versions to 0.21.8-SNAPSHOT
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / AbstractBGPSessionNegotiator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.channel.Channel;
14 import io.netty.channel.ChannelFutureListener;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelInboundHandlerAdapter;
17 import io.netty.util.concurrent.Promise;
18 import io.netty.util.concurrent.ScheduledFuture;
19 import java.util.concurrent.TimeUnit;
20 import org.checkerframework.checker.lock.qual.GuardedBy;
21 import org.checkerframework.checker.lock.qual.Holding;
22 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
23 import org.opendaylight.protocol.bgp.parser.BGPError;
24 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
25 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
26 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
27 import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
28 import org.opendaylight.protocol.util.Ipv6Util;
29 import org.opendaylight.protocol.util.Values;
30 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressNoZone;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4AddressNoZone;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Keepalive;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.KeepaliveBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Notify;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.NotifyBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Open;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.OpenBuilder;
38 import org.opendaylight.yangtools.yang.binding.Notification;
39 import org.opendaylight.yangtools.yang.common.Uint16;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * Bgp Session negotiator. Common for local-to-remote and remote-to-local connections.
45  * One difference is session validation performed by injected BGPSessionValidator when OPEN message is received.
46  */
47 abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandlerAdapter implements SessionNegotiator {
48     // 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2
49     private static final int INITIAL_HOLDTIMER = 4;
50
51     // <a href="http://tools.ietf.org/html/rfc6793">BGP Support for 4-Octet AS Number Space</a>
52     @VisibleForTesting
53     static final Uint16 AS_TRANS = Uint16.valueOf(23456).intern();
54     private static final Logger LOG = LoggerFactory.getLogger(AbstractBGPSessionNegotiator.class);
55     private final BGPPeerRegistry registry;
56     private final Promise<BGPSessionImpl> promise;
57     private final Channel channel;
58     @GuardedBy("this")
59     private State state = State.IDLE;
60     @GuardedBy("this")
61     private BGPSessionImpl session;
62     @GuardedBy("this")
63     private ScheduledFuture<?> pending;
64
65     @VisibleForTesting
66     public enum State {
67         /**
68          * Negotiation has not started yet.
69          */
70         IDLE,
71         /**
72          * We have sent our Open message, and are waiting for the peer's Open message.
73          */
74         OPEN_SENT,
75         /**
76          * We have received the peer's Open message, which is acceptable, and we're waiting the acknowledgement of our
77          * Open message.
78          */
79         OPEN_CONFIRM,
80         /**
81          * The negotiation finished.
82          */
83         FINISHED,
84     }
85
86     AbstractBGPSessionNegotiator(final Promise<BGPSessionImpl> promise, final Channel channel,
87             final BGPPeerRegistry registry) {
88         this.promise = requireNonNull(promise);
89         this.channel = requireNonNull(channel);
90         this.registry = registry;
91     }
92
93     @SuppressWarnings("checkstyle:illegalCatch")
94     private synchronized void startNegotiation() {
95         LOG.debug("Starting negotiating with {}, current state: {}", channel.remoteAddress(), state);
96         if (state != State.IDLE && state != State.OPEN_CONFIRM) {
97             return;
98         }
99         // Open can be sent first either from ODL (IDLE) or from peer (OPEN_CONFIRM)
100         final IpAddressNoZone remoteIp = getRemoteIp();
101         try {
102             // Check if peer is configured in registry before retrieving preferences
103             if (!registry.isPeerConfigured(remoteIp)) {
104                 final BGPDocumentedException cause = new BGPDocumentedException(
105                         String.format("BGP peer with ip: %s not configured, check configured peers in : %s",
106                                 remoteIp, registry), BGPError.CONNECTION_REJECTED);
107                 negotiationFailed(cause);
108                 return;
109             }
110
111             final BGPSessionPreferences preferences = registry.getPeerPreferences(remoteIp);
112             final Uint16 as = openASNumber(preferences.getMyAs().getValue().longValue());
113             sendMessage(new OpenBuilder().setMyAsNumber(as).setHoldTimer(Uint16.valueOf(preferences.getHoldTime()))
114                 .setBgpIdentifier(preferences.getBgpId()).setBgpParameters(preferences.getParams()).build());
115             if (state != State.FINISHED) {
116                 state = State.OPEN_SENT;
117                 pending = channel.eventLoop().schedule(() -> {
118                     synchronized (AbstractBGPSessionNegotiator.this) {
119                         AbstractBGPSessionNegotiator.this.pending = null;
120                         if (AbstractBGPSessionNegotiator.this.state != State.FINISHED) {
121                             AbstractBGPSessionNegotiator.this
122                                     .sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED));
123                             negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
124                             AbstractBGPSessionNegotiator.this.state = State.FINISHED;
125                         }
126                     }
127                 }, INITIAL_HOLDTIMER, TimeUnit.MINUTES);
128             }
129         } catch (final Exception e) {
130             LOG.warn("Unexpected negotiation failure", e);
131             negotiationFailedCloseChannel(e);
132         }
133     }
134
135     private IpAddressNoZone getRemoteIp() {
136         final IpAddressNoZone remoteIp = StrictBGPPeerRegistry.getIpAddress(channel.remoteAddress());
137         if (remoteIp.getIpv6AddressNoZone() != null) {
138             return new IpAddressNoZone(Ipv6Util.getFullForm(remoteIp.getIpv6AddressNoZone()));
139         }
140         return remoteIp;
141     }
142
143     synchronized void handleMessage(final Notification<?> msg) {
144         LOG.debug("Channel {} handling message in state {}, msg: {}", channel, state, msg);
145         switch (state) {
146             case FINISHED:
147                 sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
148                 return;
149             case IDLE:
150                 // to avoid race condition when Open message was sent by the peer before startNegotiation could be
151                 // executed
152                 if (msg instanceof Open) {
153                     startNegotiation();
154                     handleOpen((Open) msg);
155                     return;
156                 }
157                 sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
158                 break;
159             case OPEN_CONFIRM:
160                 if (msg instanceof Keepalive) {
161                     negotiationSuccessful();
162                     LOG.info("BGP Session with peer {} established successfully.", channel);
163                 } else if (msg instanceof Notify) {
164                     final Notify ntf = (Notify) msg;
165                     negotiationFailed(new BGPDocumentedException("Peer refusal",
166                             BGPError.forValue(ntf.getErrorCode(), ntf.getErrorSubcode())));
167                 }
168                 state = State.FINISHED;
169                 return;
170             case OPEN_SENT:
171                 if (msg instanceof Open) {
172                     handleOpen((Open) msg);
173                     return;
174                 }
175                 break;
176             default:
177                 break;
178         }
179
180         // Catch-all for unexpected message
181         LOG.warn("Channel {} state {} unexpected message {}", channel, state, msg);
182         sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
183         negotiationFailed(new BGPDocumentedException("Unexpected message channel: "
184                 + channel + ", state: " + state + ", message: " + msg, BGPError.FSM_ERROR));
185         state = State.FINISHED;
186     }
187
188     private static Notify buildErrorNotify(final BGPError err) {
189         return buildErrorNotify(err, null);
190     }
191
192     private static Notify buildErrorNotify(final BGPError err, final byte[] data) {
193         final NotifyBuilder builder = new NotifyBuilder().setErrorCode(err.getCode()).setErrorSubcode(err.getSubcode());
194         if (data != null && data.length != 0) {
195             builder.setData(data);
196         }
197         return builder.build();
198     }
199
200     @SuppressWarnings("checkstyle:illegalCatch")
201     private synchronized void handleOpen(final Open openObj) {
202         final IpAddressNoZone remoteIp = getRemoteIp();
203         final BGPSessionPreferences preferences = registry.getPeerPreferences(remoteIp);
204         try {
205             final BGPSessionListener peer = registry.getPeer(remoteIp, getSourceId(openObj, preferences),
206                     getDestinationId(openObj, preferences), openObj);
207             sendMessage(new KeepaliveBuilder().build());
208             state = State.OPEN_CONFIRM;
209             session = new BGPSessionImpl(peer, channel, openObj, preferences, registry);
210             session.setChannelExtMsgCoder(openObj);
211             LOG.debug("Channel {} moved to OPEN_CONFIRM state with remote proposal {}", channel, openObj);
212         } catch (final BGPDocumentedException | RuntimeException e) {
213             LOG.warn("Channel {} negotiation failed", channel, e);
214             negotiationFailed(e);
215         }
216     }
217
218     private synchronized void negotiationFailed(final Throwable cause) {
219         LOG.warn("Channel {} negotiation failed: {}", channel, cause.getMessage());
220         if (cause instanceof BGPDocumentedException) {
221             // although sendMessage() can also result in calling this method, it won't create a cycle.
222             // In case sendMessage() fails to deliver the message, this method gets called with different
223             // exception (definitely not with BGPDocumentedException).
224             sendMessage(buildErrorNotify(((BGPDocumentedException) cause).getError(),
225                     ((BGPDocumentedException) cause).getData()));
226         }
227         if (state == State.OPEN_CONFIRM) {
228             registry.removePeerSession(getRemoteIp());
229         }
230         negotiationFailedCloseChannel(cause);
231         state = State.FINISHED;
232     }
233
234     /**
235      * Get destination identifier.
236      *
237      * @param openMsg Open message received from remote BGP speaker
238      * @param preferences Local BGP speaker preferences
239      * @return BGP Id of device that accepted the connection
240      */
241     protected abstract Ipv4AddressNoZone getDestinationId(Open openMsg, BGPSessionPreferences preferences);
242
243     /**
244      * Get source identifier.
245      *
246      * @param openMsg Open message received from remote BGP speaker
247      * @param preferences Local BGP speaker preferences
248      * @return BGP Id of device that accepted the connection
249      */
250     protected abstract Ipv4AddressNoZone getSourceId(Open openMsg, BGPSessionPreferences preferences);
251
252     public synchronized State getState() {
253         return state;
254     }
255
256     @Holding("this")
257     private void negotiationSuccessful() {
258         LOG.debug("Negotiation on channel {} successful with session {}", channel, session);
259         channel.pipeline().replace(this, "session", session);
260         promise.setSuccess(session);
261     }
262
263     private void negotiationFailedCloseChannel(final Throwable cause) {
264         LOG.debug("Negotiation on channel {} failed", channel, cause);
265         channel.close();
266         synchronized (AbstractBGPSessionNegotiator.this) {
267             if (pending != null && pending.isCancellable()) {
268                 pending.cancel(true);
269                 pending = null;
270             }
271         }
272     }
273
274     private void sendMessage(final Notification<?> msg) {
275         channel.writeAndFlush(msg).addListener((ChannelFutureListener) f -> {
276             if (!f.isSuccess()) {
277                 LOG.warn("Failed to send message {} to channel {}", msg, AbstractBGPSessionNegotiator.this.channel,
278                         f.cause());
279                 negotiationFailedCloseChannel(f.cause());
280             } else {
281                 LOG.trace("Message {} sent to channel {}", msg, AbstractBGPSessionNegotiator.this.channel);
282             }
283         });
284     }
285
286     @Override
287     public final void channelActive(final ChannelHandlerContext ctx) {
288         LOG.debug("Starting session negotiation on channel {}", channel);
289         startNegotiation();
290     }
291
292     @Override
293     @SuppressWarnings("checkstyle:illegalCatch")
294     public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
295         LOG.debug("Negotiation read invoked on channel {}", channel);
296         try {
297             handleMessage((Notification<?>) msg);
298         } catch (final Exception e) {
299             LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
300             negotiationFailedCloseChannel(e);
301         }
302
303     }
304
305     @Override
306     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
307         LOG.info("Unexpected error during negotiation", cause);
308         negotiationFailedCloseChannel(cause);
309     }
310
311     @VisibleForTesting
312     static Uint16 openASNumber(final long configuredASNumber) {
313         // Return AS_TRANS if the value is bigger than 2B.
314         return configuredASNumber > Values.UNSIGNED_SHORT_MAX_VALUE ? AS_TRANS : Uint16.valueOf(configuredASNumber);
315     }
316 }