2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bgp.rib.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.base.MoreObjects.ToStringHelper;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.buffer.ByteBufUtil;
17 import io.netty.channel.Channel;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.SimpleChannelInboundHandler;
22 import io.netty.handler.codec.DecoderException;
23 import io.netty.util.concurrent.ScheduledFuture;
24 import java.io.IOException;
25 import java.nio.channels.NonWritableChannelException;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.HashSet;
29 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Function;
33 import org.checkerframework.checker.lock.qual.GuardedBy;
34 import org.checkerframework.checker.lock.qual.Holding;
35 import org.opendaylight.protocol.bgp.parser.AsNumberUtil;
36 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
37 import org.opendaylight.protocol.bgp.parser.BGPError;
38 import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil;
39 import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
40 import org.opendaylight.protocol.bgp.parser.GracefulRestartUtil;
41 import org.opendaylight.protocol.bgp.parser.spi.MultiPathSupport;
42 import org.opendaylight.protocol.bgp.parser.spi.PeerConstraint;
43 import org.opendaylight.protocol.bgp.parser.spi.pojo.MultiPathSupportImpl;
44 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPMessagesListener;
45 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
46 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
47 import org.opendaylight.protocol.bgp.rib.impl.state.BGPSessionStateImpl;
48 import org.opendaylight.protocol.bgp.rib.impl.state.BGPSessionStateProvider;
49 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
50 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
51 import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
52 import org.opendaylight.protocol.bgp.rib.spi.State;
53 import org.opendaylight.protocol.bgp.rib.spi.state.BGPSessionState;
54 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTimersState;
55 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTransportState;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Keepalive;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.KeepaliveBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Notify;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.NotifyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Open;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Update;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.open.message.BgpParameters;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.open.message.bgp.parameters.OptionalCapabilities;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.open.message.bgp.parameters.optional.capabilities.CParameters;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.BgpTableType;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.CParameters1;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.MpCapabilities;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.RouteRefresh;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.AddPathCapability;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.GracefulRestartCapability;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.LlGracefulRestartCapability;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.MultiprotocolCapability;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.add.path.capability.AddressFamilies;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
77 import org.opendaylight.yangtools.yang.binding.ChildOf;
78 import org.opendaylight.yangtools.yang.binding.Notification;
79 import org.opendaylight.yangtools.yang.common.Uint8;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
84 public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification<?>> implements BGPSession,
85 BGPSessionStateProvider, AutoCloseable {
87 private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class);
89 private static final Keepalive KEEP_ALIVE = new KeepaliveBuilder().build();
91 private static final int KA_TO_DEADTIMER_RATIO = 3;
93 static final String END_OF_INPUT = "End of input detected. Close the session.";
96 * System.nanoTime value about when was sent the last message.
99 private long lastMessageSentAt;
102 * System.nanoTime value about when was received the last message.
104 private long lastMessageReceivedAt;
106 private final BGPSessionListener listener;
108 private final BGPSynchronization sync;
110 private int kaCounter = 0;
112 private final Channel channel;
115 private State state = State.OPEN_CONFIRM;
117 private final Set<BgpTableType> tableTypes;
118 private final List<AddressFamilies> addPathTypes;
119 private final long holdTimerNanos;
120 private final long keepAliveNanos;
121 private final AsNumber asNumber;
122 private final Ipv4Address bgpId;
123 private final BGPPeerRegistry peerRegistry;
124 private final ChannelOutputLimiter limiter;
125 private final BGPSessionStateImpl sessionState;
126 private final GracefulRestartCapability gracefulCapability;
127 private final LlGracefulRestartCapability llGracefulCapability;
128 private boolean terminationReasonNotified;
130 public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen,
131 final BGPSessionPreferences localPreferences, final BGPPeerRegistry peerRegistry) {
132 this(listener, channel, remoteOpen, localPreferences.getHoldTime(), peerRegistry);
135 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
136 justification = "Class not final for mocking and SpotBugs is confused by lambdas around line 200")
137 public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen,
138 final int localHoldTimer, final BGPPeerRegistry peerRegistry) {
139 this.listener = requireNonNull(listener);
140 this.channel = requireNonNull(channel);
141 limiter = new ChannelOutputLimiter(this);
142 this.channel.pipeline().addLast(limiter);
144 final int remoteHoldTimer = remoteOpen.getHoldTimer().toJava();
145 final int holdTimerValue = Math.min(remoteHoldTimer, localHoldTimer);
146 LOG.info("BGP HoldTimer new value: {}", holdTimerValue);
147 holdTimerNanos = TimeUnit.SECONDS.toNanos(holdTimerValue);
148 keepAliveNanos = TimeUnit.SECONDS.toNanos(holdTimerValue / KA_TO_DEADTIMER_RATIO);
150 asNumber = AsNumberUtil.advertizedAsNumber(remoteOpen);
151 this.peerRegistry = peerRegistry;
152 sessionState = new BGPSessionStateImpl();
154 final Set<TablesKey> tts = new HashSet<>();
155 final Set<BgpTableType> tats = new HashSet<>();
156 final List<AddressFamilies> addPathCapabilitiesList = new ArrayList<>();
157 final List<BgpParameters> bgpParameters = remoteOpen.getBgpParameters();
158 if (bgpParameters != null) {
159 for (final BgpParameters param : bgpParameters) {
160 for (final OptionalCapabilities optCapa : param.nonnullOptionalCapabilities()) {
161 final CParameters cParam = optCapa.getCParameters();
162 final CParameters1 cParam1 = cParam.augmentation(CParameters1.class);
163 if (cParam1 != null) {
164 final MultiprotocolCapability multi = cParam1.getMultiprotocolCapability();
166 final TablesKey tt = new TablesKey(multi.getAfi(), multi.getSafi());
167 LOG.trace("Added table type to sync {}", tt);
169 tats.add(new BgpTableTypeImpl(tt.getAfi(), tt.getSafi()));
171 final AddPathCapability addPathCap = cParam1.getAddPathCapability();
172 if (addPathCap != null) {
173 addPathCapabilitiesList.addAll(addPathCap.getAddressFamilies());
179 gracefulCapability = findSingleCapability(bgpParameters, "Graceful Restart",
180 CParameters1::getGracefulRestartCapability, GracefulRestartUtil.EMPTY_GR_CAPABILITY);
181 llGracefulCapability = findSingleCapability(bgpParameters, "Long-lived Graceful Restart",
182 CParameters1::getLlGracefulRestartCapability, GracefulRestartUtil.EMPTY_LLGR_CAPABILITY);
184 gracefulCapability = GracefulRestartUtil.EMPTY_GR_CAPABILITY;
185 llGracefulCapability = GracefulRestartUtil.EMPTY_LLGR_CAPABILITY;
188 sync = new BGPSynchronization(this.listener, tts);
190 addPathTypes = addPathCapabilitiesList;
192 if (!addPathTypes.isEmpty()) {
193 addDecoderConstraint(MultiPathSupport.class,
194 MultiPathSupportImpl.createParserMultiPathSupport(addPathTypes));
197 if (holdTimerValue != 0) {
198 channel.eventLoop().schedule(this::handleHoldTimer, holdTimerNanos, TimeUnit.NANOSECONDS);
199 channel.eventLoop().schedule(this::handleKeepaliveTimer, keepAliveNanos, TimeUnit.NANOSECONDS);
201 bgpId = remoteOpen.getBgpIdentifier();
202 sessionState.advertizeCapabilities(holdTimerValue, channel.remoteAddress(), channel.localAddress(),
203 tableTypes, bgpParameters);
206 private static <T extends ChildOf<MpCapabilities>> T findSingleCapability(
207 final List<BgpParameters> bgpParameters, final String name, final Function<CParameters1, T> extractor,
209 final var found = new ArrayList<T>(1);
210 for (var bgpParams : bgpParameters) {
211 for (var optCapability : bgpParams.nonnullOptionalCapabilities()) {
212 final var cparam = optCapability.getCParameters();
213 if (cparam != null) {
214 final var augment = cparam.augmentation(CParameters1.class);
215 if (augment != null) {
216 final T capa = extractor.apply(augment);
225 return switch (found.size()) {
228 LOG.debug("{} capability not advertised.", name);
233 case 1 -> found.get(0);
237 final var set = Set.copyOf(found);
238 if (set.size() != 1) {
239 LOG.warn("Multiple instances of {} capability advertised: {}, ignoring.", name, set);
249 * Set the extend message coder for current channel.
250 * The reason for separating this part from constructor is, in #channel.pipeline().replace(..), the
251 * invokeChannelRead() will be invoked after the original message coder handler got removed. And there
252 * is chance that before the session instance is fully initiated (constructor returns), a KeepAlive
253 * message arrived already in the channel buffer. Thus #AbstractBGPSessionNegotiator.handleMessage(..)
254 * gets invoked again and a deadlock is caused. A BGP final state machine error will happen as BGP
255 * negotiator is still in OPEN_SENT state as the session constructor hasn't returned yet.
257 public synchronized void setChannelExtMsgCoder(final Open remoteOpen) {
258 final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen);
260 BGPMessageHeaderDecoder.enableExtendedMessages(channel);
265 public synchronized void close() {
266 if (state != State.IDLE) {
267 if (!terminationReasonNotified) {
268 writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode())
269 .setErrorSubcode(BGPError.CEASE.getSubcode()).build());
271 closeWithoutMessage();
276 * Handles incoming message based on their type.
278 * @param msg incoming message
280 void handleMessage(final Notification<?> msg) {
281 // synchronize on listener and then on this object to ensure correct order of locking
282 synchronized (listener) {
283 synchronized (this) {
284 if (state == State.IDLE) {
288 // Update last reception time
289 lastMessageReceivedAt = System.nanoTime();
291 if (msg instanceof Open) {
292 // Open messages should not be present here
293 terminate(new BGPDocumentedException(null, BGPError.FSM_ERROR));
294 } else if (msg instanceof Notify notify) {
295 // Notifications are handled internally
296 LOG.info("Session closed because Notification message received: {} / {}, data={}",
297 notify.getErrorCode(),
298 notify.getErrorSubcode(),
299 notify.getData() != null ? ByteBufUtil.hexDump(notify.getData()) : null);
300 notifyTerminationReasonAndCloseWithoutMessage(notify.getErrorCode(), notify.getErrorSubcode());
301 } else if (msg instanceof Keepalive) {
302 // Keepalives are handled internally
303 LOG.trace("Received KeepAlive message.");
305 if (kaCounter >= 2) {
308 } else if (msg instanceof RouteRefresh) {
309 listener.onMessage(this, msg);
310 } else if (msg instanceof Update) {
311 listener.onMessage(this, msg);
312 sync.updReceived((Update) msg);
314 LOG.warn("Ignoring unhandled message: {}.", msg.getClass());
317 sessionState.messageReceived(msg);
318 } catch (final BGPDocumentedException e) {
325 @Holding({"this.listener", "this"})
326 private void notifyTerminationReasonAndCloseWithoutMessage(final BGPError error) {
327 terminationReasonNotified = true;
328 closeWithoutMessage();
329 listener.onSessionTerminated(this, new BGPTerminationReason(error));
332 @Holding({"this.listener", "this"})
333 private void notifyTerminationReasonAndCloseWithoutMessage(final Uint8 errorCode, final Uint8 errorSubcode) {
334 terminationReasonNotified = true;
335 closeWithoutMessage();
336 listener.onSessionTerminated(this, new BGPTerminationReason(BGPError.forValue(errorCode, errorSubcode)));
340 // synchronize on listener and then on this object to ensure correct order of locking
341 synchronized (listener) {
342 synchronized (this) {
343 if (state == State.UP) {
344 LOG.info(END_OF_INPUT);
345 listener.onSessionDown(this, new IOException(END_OF_INPUT));
352 private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification<?> msg) {
353 future.addListener((ChannelFutureListener) f -> {
355 LOG.trace("Message {} sent to socket {}", msg, channel);
357 LOG.warn("Failed to send message {} to socket {}", msg, channel, f.cause());
360 lastMessageSentAt = System.nanoTime();
361 sessionState.messageSent(msg);
369 @SuppressWarnings("checkstyle:illegalCatch")
370 synchronized void write(final Notification<?> msg) {
372 writeEpilogue(channel.write(msg), msg);
373 } catch (final Exception e) {
374 LOG.warn("Message {} was not sent.", msg, e);
378 synchronized ChannelFuture writeAndFlush(final Notification<?> msg) {
379 if (channel.isWritable()) {
380 return writeEpilogue(channel.writeAndFlush(msg), msg);
382 return channel.newFailedFuture(new NonWritableChannelException());
386 public synchronized void closeWithoutMessage() {
387 if (state == State.IDLE) {
390 LOG.info("Closing session: {}", this);
391 channel.close().addListener((ChannelFutureListener) future -> {
392 if (future.isSuccess()) {
393 LOG.debug("Channel {} closed successfully", channel);
395 LOG.warn("Channel {} failed to close", channel, future.cause());
401 sessionState.setSessionState(state);
405 * Closes BGP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
406 * modified, because he initiated the closing. (To prevent concurrent modification exception).
408 * @param cause BGPDocumentedException
411 @Holding({"this.listener", "this"})
412 void terminate(final BGPDocumentedException cause) {
413 final BGPError error = cause.getError();
414 final byte[] data = cause.getData();
415 final NotifyBuilder builder = new NotifyBuilder().setErrorCode(error.getCode())
416 .setErrorSubcode(error.getSubcode());
417 if (data != null && data.length != 0) {
418 builder.setData(data);
420 writeAndFlush(builder.build());
421 notifyTerminationReasonAndCloseWithoutMessage(error);
424 private void removePeerSession() {
425 if (peerRegistry != null) {
426 peerRegistry.removePeerSession(StrictBGPPeerRegistry.getIpAddress(channel.remoteAddress()));
431 * If HoldTimer expires, the session ends. If a message (whichever) was received during this period, the HoldTimer
432 * will be rescheduled by HOLD_TIMER_VALUE + the time that has passed from the start of the HoldTimer to the time at
433 * which the message was received. If the session was closed by the time this method starts to execute (the session
434 * state will become IDLE), then rescheduling won't occur.
436 private void handleHoldTimer() {
437 // synchronize on listener and then on this object to ensure correct order of locking
438 synchronized (listener) {
439 synchronized (this) {
440 if (state == State.IDLE) {
444 final long ct = System.nanoTime();
445 final long nextHold = lastMessageReceivedAt + holdTimerNanos;
447 if (ct >= nextHold) {
448 LOG.debug("HoldTimer expired. {}", new Date());
449 terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED));
451 channel.eventLoop().schedule(this::handleHoldTimer, nextHold - ct, TimeUnit.NANOSECONDS);
458 * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
459 * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
460 * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
461 * starts to execute (the session state will become IDLE), that rescheduling won't occur.
463 private synchronized void handleKeepaliveTimer() {
464 if (state == State.IDLE) {
465 LOG.debug("Skipping keepalive on session idle {}", this);
469 final long ct = System.nanoTime();
470 final long nextKeepalive = lastMessageSentAt + keepAliveNanos;
471 long nextNanos = nextKeepalive - ct;
473 if (nextNanos <= 0) {
474 final ChannelFuture future = writeAndFlush(KEEP_ALIVE);
475 LOG.debug("Enqueued session {} keepalive as {}", this, future);
476 nextNanos = keepAliveNanos;
477 if (LOG.isDebugEnabled()) {
478 future.addListener(compl -> LOG.debug("Session {} keepalive completed as {}", this, compl));
481 LOG.debug("Skipping keepalive on session {}", this);
484 LOG.debug("Scheduling next keepalive on {} in {} nanos", this, nextNanos);
485 channel.eventLoop().schedule(this::handleKeepaliveTimer, nextNanos, TimeUnit.NANOSECONDS);
489 public final String toString() {
490 return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
493 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
494 toStringHelper.add("channel", channel);
495 toStringHelper.add("state", getState());
496 return toStringHelper;
500 public Set<BgpTableType> getAdvertisedTableTypes() {
505 public List<AddressFamilies> getAdvertisedAddPathTableTypes() {
510 public GracefulRestartCapability getAdvertisedGracefulRestartCapability() {
511 return gracefulCapability;
515 public LlGracefulRestartCapability getAdvertisedLlGracefulRestartCapability() {
516 return llGracefulCapability;
520 @SuppressWarnings("checkstyle:illegalCatch")
522 // synchronize on listener and then on this object to ensure correct order of locking
523 synchronized (listener) {
524 synchronized (this) {
527 sessionState.setSessionState(state);
528 listener.onSessionUp(this);
529 } catch (final Exception e) {
537 public synchronized State getState() {
542 public final Ipv4Address getBgpId() {
547 public final AsNumber getAsNumber() {
551 public ChannelOutputLimiter getLimiter() {
556 public final void channelInactive(final ChannelHandlerContext ctx) throws Exception {
557 LOG.debug("Channel {} inactive.", ctx.channel());
559 super.channelInactive(ctx);
563 protected final void channelRead0(final ChannelHandlerContext ctx, final Notification<?> msg) {
564 LOG.trace("Message was received: {} from {}", msg, channel.remoteAddress());
569 public final void handlerAdded(final ChannelHandlerContext ctx) {
574 public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
575 // synchronize on listener and then on this object to ensure correct order of locking
576 synchronized (listener) {
577 synchronized (this) {
578 handleException(cause);
584 * Handle exception occurred in the BGP session. The session in error state should be closed
585 * properly so that it can be restored later.
587 @Holding({"this.listener", "this"})
589 void handleException(final Throwable cause) {
590 // We have two things to do here:
591 // - log a warning with appropriate context
592 final Throwable toLog;
593 // - terminate the session with the appropriate error
594 final BGPDocumentedException toReport;
596 if (cause instanceof BGPDocumentedException bde) {
598 toLog = toReport = bde;
599 } else if (cause.getCause() instanceof BGPDocumentedException bde) {
600 // we are going to report the cause, that's for sure
603 // if this is a DecoderException, assume it is coming from ByteToMessageDecoder.callDecode() and the context
604 // is just Netty thread call stack starting at a io.netty.util.internal.ThreadExecutorMap Runnable.
605 // Trim such context unless we have trace enabled.
606 toLog = cause instanceof DecoderException && !LOG.isTraceEnabled() ? bde : cause;
608 // if we can include the causal chain for terminate(), great, but only log cause, i.e. without us in
610 toReport = cause instanceof Exception ex ? new BGPDocumentedException(null, BGPError.CEASE, ex)
611 : new BGPDocumentedException(BGPError.CEASE);
615 LOG.warn("BGP session encountered error", toLog);
620 public BGPSessionState getBGPSessionState() {
625 public BGPTimersState getBGPTimersState() {
630 public BGPTransportState getBGPTransportState() {
635 public void registerMessagesCounter(final BGPMessagesListener bgpMessagesListener) {
636 sessionState.registerMessagesCounter(bgpMessagesListener);
640 public <T extends PeerConstraint> void addDecoderConstraint(final Class<T> constraintClass, final T constraint) {
641 channel.pipeline().get(BGPByteToMessageDecoder.class).addDecoderConstraint(constraintClass, constraint);
645 public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
646 return channel.eventLoop().schedule(command, delay, unit);