08dc78e484ce208878cce5938f307b0e6a28e761
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.base.MoreObjects.ToStringHelper;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.buffer.ByteBufUtil;
17 import io.netty.channel.Channel;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.SimpleChannelInboundHandler;
22 import io.netty.handler.codec.DecoderException;
23 import io.netty.util.concurrent.ScheduledFuture;
24 import java.io.IOException;
25 import java.nio.channels.NonWritableChannelException;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Set;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Function;
33 import org.checkerframework.checker.lock.qual.GuardedBy;
34 import org.checkerframework.checker.lock.qual.Holding;
35 import org.opendaylight.protocol.bgp.parser.AsNumberUtil;
36 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
37 import org.opendaylight.protocol.bgp.parser.BGPError;
38 import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil;
39 import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
40 import org.opendaylight.protocol.bgp.parser.GracefulRestartUtil;
41 import org.opendaylight.protocol.bgp.parser.spi.MultiPathSupport;
42 import org.opendaylight.protocol.bgp.parser.spi.PeerConstraint;
43 import org.opendaylight.protocol.bgp.parser.spi.pojo.MultiPathSupportImpl;
44 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPMessagesListener;
45 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
46 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
47 import org.opendaylight.protocol.bgp.rib.impl.state.BGPSessionStateImpl;
48 import org.opendaylight.protocol.bgp.rib.impl.state.BGPSessionStateProvider;
49 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
50 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
51 import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
52 import org.opendaylight.protocol.bgp.rib.spi.State;
53 import org.opendaylight.protocol.bgp.rib.spi.state.BGPSessionState;
54 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTimersState;
55 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTransportState;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Keepalive;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.KeepaliveBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Notify;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.NotifyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Open;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Update;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.open.message.BgpParameters;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.open.message.bgp.parameters.OptionalCapabilities;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.open.message.bgp.parameters.optional.capabilities.CParameters;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.BgpTableType;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.CParameters1;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.MpCapabilities;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.RouteRefresh;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.AddPathCapability;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.GracefulRestartCapability;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.LlGracefulRestartCapability;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.MultiprotocolCapability;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.add.path.capability.AddressFamilies;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
77 import org.opendaylight.yangtools.yang.binding.ChildOf;
78 import org.opendaylight.yangtools.yang.binding.Notification;
79 import org.opendaylight.yangtools.yang.common.Uint8;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 @VisibleForTesting
84 public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification<?>> implements BGPSession,
85         BGPSessionStateProvider, AutoCloseable {
86
87     private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class);
88
89     private static final Keepalive KEEP_ALIVE = new KeepaliveBuilder().build();
90
91     private static final int KA_TO_DEADTIMER_RATIO = 3;
92
93     static final String END_OF_INPUT = "End of input detected. Close the session.";
94
95     /**
96      * System.nanoTime value about when was sent the last message.
97      */
98     @VisibleForTesting
99     private long lastMessageSentAt;
100
101     /**
102      * System.nanoTime value about when was received the last message.
103      */
104     private long lastMessageReceivedAt;
105
106     private final BGPSessionListener listener;
107
108     private final BGPSynchronization sync;
109
110     private int kaCounter = 0;
111
112     private final Channel channel;
113
114     @GuardedBy("this")
115     private State state = State.OPEN_CONFIRM;
116
117     private final Set<BgpTableType> tableTypes;
118     private final List<AddressFamilies> addPathTypes;
119     private final long holdTimerNanos;
120     private final long keepAliveNanos;
121     private final AsNumber asNumber;
122     private final Ipv4Address bgpId;
123     private final BGPPeerRegistry peerRegistry;
124     private final ChannelOutputLimiter limiter;
125     private final BGPSessionStateImpl sessionState;
126     private final GracefulRestartCapability gracefulCapability;
127     private final LlGracefulRestartCapability llGracefulCapability;
128     private boolean terminationReasonNotified;
129
130     public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen,
131             final BGPSessionPreferences localPreferences, final BGPPeerRegistry peerRegistry) {
132         this(listener, channel, remoteOpen, localPreferences.getHoldTime(), peerRegistry);
133     }
134
135     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
136         justification = "Class not final for mocking and SpotBugs is confused by lambdas around line 200")
137     public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen,
138             final int localHoldTimer, final BGPPeerRegistry peerRegistry) {
139         this.listener = requireNonNull(listener);
140         this.channel = requireNonNull(channel);
141         limiter = new ChannelOutputLimiter(this);
142         this.channel.pipeline().addLast(limiter);
143
144         final int remoteHoldTimer = remoteOpen.getHoldTimer().toJava();
145         final int holdTimerValue = Math.min(remoteHoldTimer, localHoldTimer);
146         LOG.info("BGP HoldTimer new value: {}", holdTimerValue);
147         holdTimerNanos = TimeUnit.SECONDS.toNanos(holdTimerValue);
148         keepAliveNanos = TimeUnit.SECONDS.toNanos(holdTimerValue / KA_TO_DEADTIMER_RATIO);
149
150         asNumber = AsNumberUtil.advertizedAsNumber(remoteOpen);
151         this.peerRegistry = peerRegistry;
152         sessionState = new BGPSessionStateImpl();
153
154         final Set<TablesKey> tts = new HashSet<>();
155         final Set<BgpTableType> tats = new HashSet<>();
156         final List<AddressFamilies> addPathCapabilitiesList = new ArrayList<>();
157         final List<BgpParameters> bgpParameters = remoteOpen.getBgpParameters();
158         if (bgpParameters != null) {
159             for (final BgpParameters param : bgpParameters) {
160                 for (final OptionalCapabilities optCapa : param.nonnullOptionalCapabilities()) {
161                     final CParameters cParam = optCapa.getCParameters();
162                     final CParameters1 cParam1 = cParam.augmentation(CParameters1.class);
163                     if (cParam1 != null) {
164                         final MultiprotocolCapability multi = cParam1.getMultiprotocolCapability();
165                         if (multi != null) {
166                             final TablesKey tt = new TablesKey(multi.getAfi(), multi.getSafi());
167                             LOG.trace("Added table type to sync {}", tt);
168                             tts.add(tt);
169                             tats.add(new BgpTableTypeImpl(tt.getAfi(), tt.getSafi()));
170                         } else {
171                             final AddPathCapability addPathCap = cParam1.getAddPathCapability();
172                             if (addPathCap != null) {
173                                 addPathCapabilitiesList.addAll(addPathCap.getAddressFamilies());
174                             }
175                         }
176                     }
177                 }
178             }
179             gracefulCapability = findSingleCapability(bgpParameters, "Graceful Restart",
180                 CParameters1::getGracefulRestartCapability, GracefulRestartUtil.EMPTY_GR_CAPABILITY);
181             llGracefulCapability = findSingleCapability(bgpParameters, "Long-lived Graceful Restart",
182                 CParameters1::getLlGracefulRestartCapability, GracefulRestartUtil.EMPTY_LLGR_CAPABILITY);
183         } else {
184             gracefulCapability = GracefulRestartUtil.EMPTY_GR_CAPABILITY;
185             llGracefulCapability = GracefulRestartUtil.EMPTY_LLGR_CAPABILITY;
186         }
187
188         sync = new BGPSynchronization(this.listener, tts);
189         tableTypes = tats;
190         addPathTypes = addPathCapabilitiesList;
191
192         if (!addPathTypes.isEmpty()) {
193             addDecoderConstraint(MultiPathSupport.class,
194                 MultiPathSupportImpl.createParserMultiPathSupport(addPathTypes));
195         }
196
197         if (holdTimerValue != 0) {
198             channel.eventLoop().schedule(this::handleHoldTimer, holdTimerNanos, TimeUnit.NANOSECONDS);
199             channel.eventLoop().schedule(this::handleKeepaliveTimer, keepAliveNanos, TimeUnit.NANOSECONDS);
200         }
201         bgpId = remoteOpen.getBgpIdentifier();
202         sessionState.advertizeCapabilities(holdTimerValue, channel.remoteAddress(), channel.localAddress(),
203                 tableTypes, bgpParameters);
204     }
205
206     private static <T extends ChildOf<MpCapabilities>> T findSingleCapability(
207             final List<BgpParameters> bgpParameters, final String name, final Function<CParameters1, T> extractor,
208             final T empty) {
209         final var found = new ArrayList<T>(1);
210         for (var bgpParams : bgpParameters) {
211             for (var optCapability : bgpParams.nonnullOptionalCapabilities()) {
212                 final var cparam = optCapability.getCParameters();
213                 if (cparam != null) {
214                     final var augment = cparam.augmentation(CParameters1.class);
215                     if (augment != null) {
216                         final T capa = extractor.apply(augment);
217                         if (capa != null) {
218                             found.add(capa);
219                         }
220                     }
221                 }
222             }
223         }
224
225         return switch (found.size()) {
226             // irrecoverable
227             case 0 -> {
228                 LOG.debug("{} capability not advertised.", name);
229                 yield empty;
230             }
231
232             // fast path
233             case 1 -> found.get(0);
234
235             // slow path
236             default -> {
237                 final var set = Set.copyOf(found);
238                 if (set.size() != 1) {
239                     LOG.warn("Multiple instances of {} capability advertised: {}, ignoring.", name, set);
240                     yield empty;
241                 } else {
242                     yield found.get(0);
243                 }
244             }
245         };
246     }
247
248     /**
249      * Set the extend message coder for current channel.
250      * The reason for separating this part from constructor is, in #channel.pipeline().replace(..), the
251      * invokeChannelRead() will be invoked after the original message coder handler got removed. And there
252      * is chance that before the session instance is fully initiated (constructor returns), a KeepAlive
253      * message arrived already in the channel buffer. Thus #AbstractBGPSessionNegotiator.handleMessage(..)
254      * gets invoked again and a deadlock is caused.  A BGP final state machine error will happen as BGP
255      * negotiator is still in OPEN_SENT state as the session constructor hasn't returned yet.
256      */
257     public synchronized void setChannelExtMsgCoder(final Open remoteOpen) {
258         final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen);
259         if (enableExMess) {
260             BGPMessageHeaderDecoder.enableExtendedMessages(channel);
261         }
262     }
263
264     @Override
265     public synchronized void close() {
266         if (state != State.IDLE) {
267             if (!terminationReasonNotified) {
268                 writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode())
269                         .setErrorSubcode(BGPError.CEASE.getSubcode()).build());
270             }
271             closeWithoutMessage();
272         }
273     }
274
275     /**
276      * Handles incoming message based on their type.
277      *
278      * @param msg incoming message
279      */
280     void handleMessage(final Notification<?> msg) {
281         // synchronize on listener and then on this object to ensure correct order of locking
282         synchronized (listener) {
283             synchronized (this) {
284                 if (state == State.IDLE) {
285                     return;
286                 }
287                 try {
288                     // Update last reception time
289                     lastMessageReceivedAt = System.nanoTime();
290
291                     if (msg instanceof Open) {
292                         // Open messages should not be present here
293                         terminate(new BGPDocumentedException(null, BGPError.FSM_ERROR));
294                     } else if (msg instanceof Notify notify) {
295                         // Notifications are handled internally
296                         LOG.info("Session closed because Notification message received: {} / {}, data={}",
297                                 notify.getErrorCode(),
298                                 notify.getErrorSubcode(),
299                                 notify.getData() != null ? ByteBufUtil.hexDump(notify.getData()) : null);
300                         notifyTerminationReasonAndCloseWithoutMessage(notify.getErrorCode(), notify.getErrorSubcode());
301                     } else if (msg instanceof Keepalive) {
302                         // Keepalives are handled internally
303                         LOG.trace("Received KeepAlive message.");
304                         kaCounter++;
305                         if (kaCounter >= 2) {
306                             sync.kaReceived();
307                         }
308                     } else if (msg instanceof RouteRefresh) {
309                         listener.onMessage(this, msg);
310                     } else if (msg instanceof Update) {
311                         listener.onMessage(this, msg);
312                         sync.updReceived((Update) msg);
313                     } else {
314                         LOG.warn("Ignoring unhandled message: {}.", msg.getClass());
315                     }
316
317                     sessionState.messageReceived(msg);
318                 } catch (final BGPDocumentedException e) {
319                     terminate(e);
320                 }
321             }
322         }
323     }
324
325     @Holding({"this.listener", "this"})
326     private void notifyTerminationReasonAndCloseWithoutMessage(final BGPError error) {
327         terminationReasonNotified = true;
328         closeWithoutMessage();
329         listener.onSessionTerminated(this, new BGPTerminationReason(error));
330     }
331
332     @Holding({"this.listener", "this"})
333     private void notifyTerminationReasonAndCloseWithoutMessage(final Uint8 errorCode, final Uint8 errorSubcode) {
334         terminationReasonNotified = true;
335         closeWithoutMessage();
336         listener.onSessionTerminated(this, new BGPTerminationReason(BGPError.forValue(errorCode, errorSubcode)));
337     }
338
339     void endOfInput() {
340         // synchronize on listener and then on this object to ensure correct order of locking
341         synchronized (listener) {
342             synchronized (this) {
343                 if (state == State.UP) {
344                     LOG.info(END_OF_INPUT);
345                     listener.onSessionDown(this, new IOException(END_OF_INPUT));
346                 }
347             }
348         }
349     }
350
351     @Holding("this")
352     private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification<?> msg) {
353         future.addListener((ChannelFutureListener) f -> {
354             if (f.isSuccess()) {
355                 LOG.trace("Message {} sent to socket {}", msg, channel);
356             } else {
357                 LOG.warn("Failed to send message {} to socket {}", msg, channel, f.cause());
358             }
359         });
360         lastMessageSentAt = System.nanoTime();
361         sessionState.messageSent(msg);
362         return future;
363     }
364
365     void flush() {
366         channel.flush();
367     }
368
369     @SuppressWarnings("checkstyle:illegalCatch")
370     synchronized void write(final Notification<?> msg) {
371         try {
372             writeEpilogue(channel.write(msg), msg);
373         } catch (final Exception e) {
374             LOG.warn("Message {} was not sent.", msg, e);
375         }
376     }
377
378     synchronized ChannelFuture writeAndFlush(final Notification<?> msg) {
379         if (channel.isWritable()) {
380             return writeEpilogue(channel.writeAndFlush(msg), msg);
381         }
382         return channel.newFailedFuture(new NonWritableChannelException());
383     }
384
385     @Override
386     public synchronized void closeWithoutMessage() {
387         if (state == State.IDLE) {
388             return;
389         }
390         LOG.info("Closing session: {}", this);
391         channel.close().addListener((ChannelFutureListener) future -> {
392             if (future.isSuccess()) {
393                 LOG.debug("Channel {} closed successfully", channel);
394             } else {
395                 LOG.warn("Channel {} failed to close", channel, future.cause());
396             }
397         });
398
399         state = State.IDLE;
400         removePeerSession();
401         sessionState.setSessionState(state);
402     }
403
404     /**
405      * Closes BGP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
406      * modified, because he initiated the closing. (To prevent concurrent modification exception).
407      *
408      * @param cause BGPDocumentedException
409      */
410     @VisibleForTesting
411     @Holding({"this.listener", "this"})
412     void terminate(final BGPDocumentedException cause) {
413         final BGPError error = cause.getError();
414         final byte[] data = cause.getData();
415         final NotifyBuilder builder = new NotifyBuilder().setErrorCode(error.getCode())
416                 .setErrorSubcode(error.getSubcode());
417         if (data != null && data.length != 0) {
418             builder.setData(data);
419         }
420         writeAndFlush(builder.build());
421         notifyTerminationReasonAndCloseWithoutMessage(error);
422     }
423
424     private void removePeerSession() {
425         if (peerRegistry != null) {
426             peerRegistry.removePeerSession(StrictBGPPeerRegistry.getIpAddress(channel.remoteAddress()));
427         }
428     }
429
430     /**
431      * If HoldTimer expires, the session ends. If a message (whichever) was received during this period, the HoldTimer
432      * will be rescheduled by HOLD_TIMER_VALUE + the time that has passed from the start of the HoldTimer to the time at
433      * which the message was received. If the session was closed by the time this method starts to execute (the session
434      * state will become IDLE), then rescheduling won't occur.
435      */
436     private void handleHoldTimer() {
437         // synchronize on listener and then on this object to ensure correct order of locking
438         synchronized (listener) {
439             synchronized (this) {
440                 if (state == State.IDLE) {
441                     return;
442                 }
443
444                 final long ct = System.nanoTime();
445                 final long nextHold = lastMessageReceivedAt + holdTimerNanos;
446
447                 if (ct >= nextHold) {
448                     LOG.debug("HoldTimer expired. {}", new Date());
449                     terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED));
450                 } else {
451                     channel.eventLoop().schedule(this::handleHoldTimer, nextHold - ct, TimeUnit.NANOSECONDS);
452                 }
453             }
454         }
455     }
456
457     /**
458      * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
459      * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
460      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
461      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
462      */
463     private synchronized void handleKeepaliveTimer() {
464         if (state == State.IDLE) {
465             LOG.debug("Skipping keepalive on session idle {}", this);
466             return;
467         }
468
469         final long ct = System.nanoTime();
470         final long nextKeepalive = lastMessageSentAt + keepAliveNanos;
471         long nextNanos = nextKeepalive - ct;
472
473         if (nextNanos <= 0) {
474             final ChannelFuture future = writeAndFlush(KEEP_ALIVE);
475             LOG.debug("Enqueued session {} keepalive as {}", this, future);
476             nextNanos = keepAliveNanos;
477             if (LOG.isDebugEnabled()) {
478                 future.addListener(compl -> LOG.debug("Session {} keepalive completed as {}", this, compl));
479             }
480         } else {
481             LOG.debug("Skipping keepalive on session {}", this);
482         }
483
484         LOG.debug("Scheduling next keepalive on {} in {} nanos", this, nextNanos);
485         channel.eventLoop().schedule(this::handleKeepaliveTimer, nextNanos, TimeUnit.NANOSECONDS);
486     }
487
488     @Override
489     public final String toString() {
490         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
491     }
492
493     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
494         toStringHelper.add("channel", channel);
495         toStringHelper.add("state", getState());
496         return toStringHelper;
497     }
498
499     @Override
500     public Set<BgpTableType> getAdvertisedTableTypes() {
501         return tableTypes;
502     }
503
504     @Override
505     public List<AddressFamilies> getAdvertisedAddPathTableTypes() {
506         return addPathTypes;
507     }
508
509     @Override
510     public GracefulRestartCapability getAdvertisedGracefulRestartCapability() {
511         return gracefulCapability;
512     }
513
514     @Override
515     public LlGracefulRestartCapability getAdvertisedLlGracefulRestartCapability() {
516         return llGracefulCapability;
517     }
518
519     @VisibleForTesting
520     @SuppressWarnings("checkstyle:illegalCatch")
521     void sessionUp() {
522         // synchronize on listener and then on this object to ensure correct order of locking
523         synchronized (listener) {
524             synchronized (this) {
525                 state = State.UP;
526                 try {
527                     sessionState.setSessionState(state);
528                     listener.onSessionUp(this);
529                 } catch (final Exception e) {
530                     handleException(e);
531                     throw e;
532                 }
533             }
534         }
535     }
536
537     public synchronized State getState() {
538         return state;
539     }
540
541     @Override
542     public final Ipv4Address getBgpId() {
543         return bgpId;
544     }
545
546     @Override
547     public final AsNumber getAsNumber() {
548         return asNumber;
549     }
550
551     public ChannelOutputLimiter getLimiter() {
552         return limiter;
553     }
554
555     @Override
556     public final void channelInactive(final ChannelHandlerContext ctx) throws Exception {
557         LOG.debug("Channel {} inactive.", ctx.channel());
558         endOfInput();
559         super.channelInactive(ctx);
560     }
561
562     @Override
563     protected final void channelRead0(final ChannelHandlerContext ctx, final Notification<?> msg) {
564         LOG.trace("Message was received: {} from {}", msg, channel.remoteAddress());
565         handleMessage(msg);
566     }
567
568     @Override
569     public final void handlerAdded(final ChannelHandlerContext ctx) {
570         sessionUp();
571     }
572
573     @Override
574     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
575         // synchronize on listener and then on this object to ensure correct order of locking
576         synchronized (listener) {
577             synchronized (this) {
578                 handleException(cause);
579             }
580         }
581     }
582
583     /**
584      * Handle exception occurred in the BGP session. The session in error state should be closed
585      * properly so that it can be restored later.
586      */
587     @Holding({"this.listener", "this"})
588     @VisibleForTesting
589     void handleException(final Throwable cause) {
590         // We have two things to do here:
591         // - log a warning with appropriate context
592         final Throwable toLog;
593         // - terminate the session with the appropriate error
594         final BGPDocumentedException toReport;
595
596         if (cause instanceof BGPDocumentedException bde) {
597             // Easy case
598             toLog = toReport = bde;
599         } else if (cause.getCause() instanceof BGPDocumentedException bde) {
600             // we are going to report the cause, that's for sure
601             toReport = bde;
602
603             // if this is a DecoderException, assume it is coming from ByteToMessageDecoder.callDecode() and the context
604             // is just Netty thread call stack starting at a io.netty.util.internal.ThreadExecutorMap Runnable.
605             // Trim such context unless we have trace enabled.
606             toLog = cause instanceof DecoderException && !LOG.isTraceEnabled() ? bde : cause;
607         } else {
608             // if we can include the causal chain for terminate(), great, but only log cause, i.e. without us in
609             // in the picture.
610             toReport = cause instanceof Exception ex ? new BGPDocumentedException(null, BGPError.CEASE, ex)
611                 : new BGPDocumentedException(BGPError.CEASE);
612             toLog = cause;
613         }
614
615         LOG.warn("BGP session encountered error", toLog);
616         terminate(toReport);
617     }
618
619     @Override
620     public BGPSessionState getBGPSessionState() {
621         return sessionState;
622     }
623
624     @Override
625     public BGPTimersState getBGPTimersState() {
626         return sessionState;
627     }
628
629     @Override
630     public BGPTransportState getBGPTransportState() {
631         return sessionState;
632     }
633
634     @Override
635     public void registerMessagesCounter(final BGPMessagesListener bgpMessagesListener) {
636         sessionState.registerMessagesCounter(bgpMessagesListener);
637     }
638
639     @Override
640     public <T extends PeerConstraint> void addDecoderConstraint(final Class<T> constraintClass, final T constraint) {
641         channel.pipeline().get(BGPByteToMessageDecoder.class).addDecoderConstraint(constraintClass, constraint);
642     }
643
644     @Override
645     public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
646         return channel.eventLoop().schedule(command, delay, unit);
647     }
648 }