9e8b31760eae9e1daeaf725d1ca3eb5fe4197e34
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.base.MoreObjects.ToStringHelper;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.collect.Sets;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.SimpleChannelInboundHandler;
26 import java.io.IOException;
27 import java.nio.channels.NonWritableChannelException;
28 import java.util.Collections;
29 import java.util.Date;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.TimeUnit;
33 import javax.annotation.concurrent.GuardedBy;
34 import org.opendaylight.controller.config.yang.bgp.rib.impl.BgpSessionState;
35 import org.opendaylight.protocol.bgp.parser.AsNumberUtil;
36 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
37 import org.opendaylight.protocol.bgp.parser.BGPError;
38 import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil;
39 import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
40 import org.opendaylight.protocol.bgp.parser.spi.MultiPathSupport;
41 import org.opendaylight.protocol.bgp.parser.spi.pojo.MultiPathSupportImpl;
42 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPMessagesListener;
43 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
44 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
45 import org.opendaylight.protocol.bgp.rib.impl.state.BGPSessionStateImpl;
46 import org.opendaylight.protocol.bgp.rib.impl.state.BGPSessionStateProvider;
47 import org.opendaylight.protocol.bgp.rib.impl.stats.peer.BGPSessionStats;
48 import org.opendaylight.protocol.bgp.rib.impl.stats.peer.BGPSessionStatsImpl;
49 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
50 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
51 import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
52 import org.opendaylight.protocol.bgp.rib.spi.State;
53 import org.opendaylight.protocol.bgp.rib.spi.state.BGPSessionState;
54 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTimersState;
55 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTransportState;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.KeepaliveBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Notify;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.NotifyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Open;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Update;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.OptionalCapabilities;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.optional.capabilities.CParameters;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.RouteRefresh;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.AddPathCapability;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.add.path.capability.AddressFamilies;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
74 import org.opendaylight.yangtools.yang.binding.Notification;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77
78 @VisibleForTesting
79 public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification> implements BGPSession, BGPSessionStats,
80     BGPSessionStateProvider, AutoCloseable {
81
82     private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class);
83
84     private static final Notification KEEP_ALIVE = new KeepaliveBuilder().build();
85
86     private static final int KA_TO_DEADTIMER_RATIO = 3;
87
88     private static final String EXTENDED_MSG_DECODER = "EXTENDED_MSG_DECODER";
89
90     static final String END_OF_INPUT = "End of input detected. Close the session.";
91
92     /**
93      * System.nanoTime value about when was sent the last message.
94      */
95     @VisibleForTesting
96     private long lastMessageSentAt;
97
98     /**
99      * System.nanoTime value about when was received the last message
100      */
101     private long lastMessageReceivedAt;
102
103     private final BGPSessionListener listener;
104
105     private final BGPSynchronization sync;
106
107     private int kaCounter = 0;
108
109     private final Channel channel;
110
111     @GuardedBy("this")
112     private State state = State.OPEN_CONFIRM;
113
114     private final Set<BgpTableType> tableTypes;
115     private final List<AddressFamilies> addPathTypes;
116     private final int holdTimerValue;
117     private final int keepAlive;
118     private final AsNumber asNumber;
119     private final Ipv4Address bgpId;
120     private final BGPPeerRegistry peerRegistry;
121     private final ChannelOutputLimiter limiter;
122     private final BGPSessionStateImpl sessionState;
123
124     private BGPSessionStatsImpl sessionStats;
125     private boolean terminationReasonNotified;
126
127     public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen,
128         final BGPSessionPreferences localPreferences, final BGPPeerRegistry peerRegistry) {
129         this(listener, channel, remoteOpen, localPreferences.getHoldTime(), peerRegistry);
130         this.sessionStats = new BGPSessionStatsImpl(this, remoteOpen, this.holdTimerValue, this.keepAlive, channel,
131             Optional.of(localPreferences), this.tableTypes, this.addPathTypes);
132     }
133
134     public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen,
135         final int localHoldTimer, final BGPPeerRegistry peerRegistry) {
136         this.listener = requireNonNull(listener);
137         this.channel = requireNonNull(channel);
138         this.limiter = new ChannelOutputLimiter(this);
139         this.channel.pipeline().addLast(this.limiter);
140         this.holdTimerValue = (remoteOpen.getHoldTimer() < localHoldTimer) ? remoteOpen.getHoldTimer() : localHoldTimer;
141         LOG.info("BGP HoldTimer new value: {}", this.holdTimerValue);
142         this.keepAlive = this.holdTimerValue / KA_TO_DEADTIMER_RATIO;
143         this.asNumber = AsNumberUtil.advertizedAsNumber(remoteOpen);
144         this.peerRegistry = peerRegistry;
145         this.sessionState = new BGPSessionStateImpl();
146
147         final Set<TablesKey> tts = Sets.newHashSet();
148         final Set<BgpTableType> tats = Sets.newHashSet();
149         final List<AddressFamilies> addPathCapabilitiesList = Lists.newArrayList();
150         if (remoteOpen.getBgpParameters() != null) {
151             for (final BgpParameters param : remoteOpen.getBgpParameters()) {
152                 for (final OptionalCapabilities optCapa : param.getOptionalCapabilities()) {
153                     final CParameters cParam = optCapa.getCParameters();
154                     if ( cParam.getAugmentation(CParameters1.class) == null) {
155                         continue;
156                     }
157                     if(cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability() != null) {
158                         final MultiprotocolCapability multi = cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability();
159                         final TablesKey tt = new TablesKey(multi.getAfi(), multi.getSafi());
160                         LOG.trace("Added table type to sync {}", tt);
161                         tts.add(tt);
162                         tats.add(new BgpTableTypeImpl(tt.getAfi(), tt.getSafi()));
163                     } else if (cParam.getAugmentation(CParameters1.class).getAddPathCapability() != null) {
164                         final AddPathCapability addPathCap = cParam.getAugmentation(CParameters1.class).getAddPathCapability();
165                         addPathCapabilitiesList.addAll(addPathCap.getAddressFamilies());
166                     }
167                 }
168             }
169         }
170
171         this.sync = new BGPSynchronization(this.listener, tts);
172         this.tableTypes = tats;
173         this.addPathTypes = addPathCapabilitiesList;
174
175         if (! this.addPathTypes.isEmpty()) {
176             final ChannelPipeline pipeline = this.channel.pipeline();
177             final BGPByteToMessageDecoder decoder = pipeline.get(BGPByteToMessageDecoder.class);
178             decoder.addDecoderConstraint(MultiPathSupport.class,
179                     MultiPathSupportImpl.createParserMultiPathSupport(this.addPathTypes));
180         }
181
182         if (this.holdTimerValue != 0) {
183             channel.eventLoop().schedule(this::handleHoldTimer, this.holdTimerValue, TimeUnit.SECONDS);
184             channel.eventLoop().schedule(this::handleKeepaliveTimer, this.keepAlive, TimeUnit.SECONDS);
185         }
186         this.bgpId = remoteOpen.getBgpIdentifier();
187         this.sessionStats = new BGPSessionStatsImpl(this, remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.absent(),
188                 this.tableTypes, this.addPathTypes);
189
190         this.sessionState.advertizeCapabilities(this.holdTimerValue, channel.remoteAddress(), channel.localAddress(),
191             this.tableTypes, remoteOpen.getBgpParameters());
192     }
193
194     /**
195      * Set the extend message coder for current channel
196      * The reason for separating this part from constructor is, in #channel.pipeline().replace(..), the
197      * invokeChannelRead() will be invoked after the original message coder handler got removed. And there
198      * is chance that before the session instance is fully initiated (constructor returns), a KeepAlive
199      * message arrived already in the channel buffer. Thus #AbstractBGPSessionNegotiator.handleMessage(..)
200      * gets invoked again and a deadlock is caused.  A BGP final state machine error will happen as BGP
201      * negotiator is still in OPEN_SENT state as the session constructor hasn't returned yet.
202      *
203      * @param remoteOpen
204      */
205     public synchronized void setChannelExtMsgCoder(final Open remoteOpen) {
206         final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen);
207         if (enableExMess) {
208             this.channel.pipeline().replace(BGPMessageHeaderDecoder.class, EXTENDED_MSG_DECODER, BGPMessageHeaderDecoder.getExtendedBGPMessageHeaderDecoder());
209         }
210     }
211
212     @Override
213     public synchronized void close() {
214         if (this.state != State.IDLE && !this.terminationReasonNotified) {
215             this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(BGPError.CEASE.getSubcode()).build());
216             this.closeWithoutMessage();
217         }
218     }
219
220     /**
221      * Handles incoming message based on their type.
222      *
223      * @param msg incoming message
224      */
225     synchronized void handleMessage(final Notification msg) {
226         if (this.state == State.IDLE) {
227             return;
228         }
229         try {
230             // Update last reception time
231             this.lastMessageReceivedAt = System.nanoTime();
232
233             if (msg instanceof Open) {
234                 // Open messages should not be present here
235                 this.terminate(new BGPDocumentedException(null, BGPError.FSM_ERROR));
236             } else if (msg instanceof Notify) {
237                 final Notify notify = (Notify) msg;
238                 // Notifications are handled internally
239                 LOG.info("Session closed because Notification message received: {} / {}, data={}", notify.getErrorCode(),
240                     notify.getErrorSubcode(), notify.getData() != null ? ByteBufUtil.hexDump(notify.getData()) : null);
241                 notifyTerminationReasonAndCloseWithoutMessage(notify.getErrorCode(), notify.getErrorSubcode());
242             } else if (msg instanceof Keepalive) {
243                 // Keepalives are handled internally
244                 LOG.trace("Received KeepAlive message.");
245                 this.kaCounter++;
246                 if (this.kaCounter >= 2) {
247                     this.sync.kaReceived();
248                 }
249             } else if (msg instanceof RouteRefresh) {
250                 this.listener.onMessage(this, msg);
251             } else if (msg instanceof Update) {
252                 this.listener.onMessage(this, msg);
253                 this.sync.updReceived((Update) msg);
254             } else {
255                 LOG.warn("Ignoring unhandled message: {}.", msg.getClass());
256             }
257
258             this.sessionStats.updateReceivedMsg(msg);
259             this.sessionState.messageReceived(msg);
260         } catch (final BGPDocumentedException e) {
261             this.terminate(e);
262         }
263     }
264
265     private synchronized void notifyTerminationReasonAndCloseWithoutMessage(final Short errorCode, final Short errorSubcode) {
266         this.terminationReasonNotified = true;
267         this.listener.onSessionTerminated(this, new BGPTerminationReason(
268             BGPError.forValue(errorCode, errorSubcode)));
269         this.closeWithoutMessage();
270     }
271
272     synchronized void endOfInput() {
273         if (this.state == State.UP) {
274             LOG.info(END_OF_INPUT);
275             this.listener.onSessionDown(this, new IOException(END_OF_INPUT));
276         }
277     }
278
279     @GuardedBy("this")
280     private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification msg) {
281         future.addListener(
282             (ChannelFutureListener) f -> {
283                 if (!f.isSuccess()) {
284                     LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause());
285                 } else {
286                     LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
287                 }
288             });
289         this.lastMessageSentAt = System.nanoTime();
290         this.sessionStats.updateSentMsg(msg);
291         this.sessionState.messageSent(msg);
292         return future;
293     }
294
295     void flush() {
296         this.channel.flush();
297     }
298
299     synchronized void write(final Notification msg) {
300         try {
301             writeEpilogue(this.channel.write(msg), msg);
302         } catch (final Exception e) {
303             LOG.warn("Message {} was not sent.", msg, e);
304         }
305     }
306
307     synchronized ChannelFuture writeAndFlush(final Notification msg) {
308         if (isWritable()) {
309             return writeEpilogue(this.channel.writeAndFlush(msg), msg);
310         }
311         return this.channel.newFailedFuture(new NonWritableChannelException());
312     }
313
314     private synchronized void closeWithoutMessage() {
315         if (this.state == State.IDLE) {
316             return;
317         }
318         LOG.info("Closing session: {}", this);
319         this.channel.close().addListener((ChannelFutureListener) future -> Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()));
320         this.state = State.IDLE;
321         removePeerSession();
322         this.sessionState.setSessionState(this.state);
323     }
324
325     /**
326      * Closes BGP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
327      * modified, because he initiated the closing. (To prevent concurrent modification exception).
328      *
329      * @param e BGPDocumentedException
330      */
331     private synchronized void terminate(final BGPDocumentedException e) {
332         final BGPError error = e.getError();
333         final byte[] data = e.getData();
334         final NotifyBuilder builder = new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode());
335         if (data != null && data.length != 0) {
336             builder.setData(data);
337         }
338         this.writeAndFlush(builder.build());
339         notifyTerminationReasonAndCloseWithoutMessage(error.getCode(), error.getSubcode());
340     }
341
342     private void removePeerSession() {
343         if (this.peerRegistry != null) {
344             this.peerRegistry.removePeerSession(StrictBGPPeerRegistry.getIpAddress(this.channel.remoteAddress()));
345         }
346     }
347
348     /**
349      * If HoldTimer expires, the session ends. If a message (whichever) was received during this period, the HoldTimer
350      * will be rescheduled by HOLD_TIMER_VALUE + the time that has passed from the start of the HoldTimer to the time at
351      * which the message was received. If the session was closed by the time this method starts to execute (the session
352      * state will become IDLE), then rescheduling won't occur.
353      */
354     private synchronized void handleHoldTimer() {
355         if (this.state == State.IDLE) {
356             return;
357         }
358
359         final long ct = System.nanoTime();
360         final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(this.holdTimerValue);
361
362         if (ct >= nextHold) {
363             LOG.debug("HoldTimer expired. {}", new Date());
364             this.terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED));
365         } else {
366             this.channel.eventLoop().schedule(this::handleHoldTimer, nextHold - ct, TimeUnit.NANOSECONDS);
367         }
368     }
369
370     /**
371      * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
372      * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
373      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
374      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
375      */
376     private synchronized void handleKeepaliveTimer() {
377         if (this.state == State.IDLE) {
378             return;
379         }
380
381         final long ct = System.nanoTime();
382         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
383
384         if (ct >= nextKeepalive) {
385             this.writeAndFlush(KEEP_ALIVE);
386             nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
387             this.sessionStats.updateSentMsgKA();
388         }
389         this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
390     }
391
392     @Override
393     public final String toString() {
394         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
395     }
396
397     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
398         toStringHelper.add("channel", this.channel);
399         toStringHelper.add("state", this.getState());
400         return toStringHelper;
401     }
402
403     @Override
404     public Set<BgpTableType> getAdvertisedTableTypes() {
405         return this.tableTypes;
406     }
407
408     @Override
409     public List<AddressFamilies> getAdvertisedAddPathTableTypes() {
410         return this.addPathTypes;
411     }
412
413     @Override
414     public List<BgpTableType> getAdvertisedGracefulRestartTableTypes() {
415         return Collections.emptyList();
416     }
417
418     protected synchronized void sessionUp() {
419         this.sessionStats.startSessionStopwatch();
420         this.state = State.UP;
421         this.sessionState.setSessionState(this.state);
422         this.listener.onSessionUp(this);
423     }
424
425     public synchronized State getState() {
426         return this.state;
427     }
428
429     @Override
430     public final Ipv4Address getBgpId() {
431         return this.bgpId;
432     }
433
434     @Override
435     public final AsNumber getAsNumber() {
436         return this.asNumber;
437     }
438
439     synchronized boolean isWritable() {
440         return this.channel != null && this.channel.isWritable();
441     }
442
443     @Override
444     public synchronized BgpSessionState getBgpSessionState() {
445         return this.sessionStats.getBgpSessionState();
446     }
447
448     @Override
449     public synchronized void resetBgpSessionStats() {
450         this.sessionStats.resetBgpSessionStats();
451     }
452
453     public ChannelOutputLimiter getLimiter() {
454         return this.limiter;
455     }
456
457     @Override
458     public final void channelInactive(final ChannelHandlerContext ctx) {
459         LOG.debug("Channel {} inactive.", ctx.channel());
460         this.endOfInput();
461
462         try {
463             super.channelInactive(ctx);
464         } catch (final Exception e) {
465             throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
466         }
467     }
468
469     @Override
470     protected final void channelRead0(final ChannelHandlerContext ctx, final Notification msg) {
471         LOG.debug("Message was received: {}", msg);
472         this.handleMessage(msg);
473     }
474
475     @Override
476     public final void handlerAdded(final ChannelHandlerContext ctx) {
477         this.sessionUp();
478     }
479
480     @Override
481     public synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
482         LOG.warn("BGP session encountered error", cause);
483         if (cause.getCause() instanceof BGPDocumentedException) {
484             this.terminate((BGPDocumentedException) cause.getCause());
485         } else {
486             this.close();
487         }
488     }
489
490     @Override
491     public BGPSessionState getBGPSessionState() {
492         return this.sessionState;
493     }
494
495     @Override
496     public BGPTimersState getBGPTimersState() {
497         return this.sessionState;
498     }
499
500     @Override
501     public BGPTransportState getBGPTransportState() {
502         return this.sessionState;
503     }
504
505     @Override
506     public void registerMessagesCounter(final BGPMessagesListener bgpMessagesListener) {
507         this.sessionState.registerMessagesCounter(bgpMessagesListener);
508     }
509 }