Modernize RPC registration
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPPeer.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.ADJRIBOUT_NID;
12 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.TABLES_NID;
13
14 import com.google.common.base.MoreObjects;
15 import com.google.common.base.Stopwatch;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.CacheLoader;
18 import com.google.common.cache.LoadingCache;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.ImmutableSet;
21 import com.google.common.collect.Sets;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Optional;
35 import java.util.Set;
36 import java.util.concurrent.TimeUnit;
37 import java.util.stream.Collectors;
38 import org.checkerframework.checker.lock.qual.GuardedBy;
39 import org.checkerframework.checker.lock.qual.Holding;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.opendaylight.mdsal.binding.api.RpcProviderService;
42 import org.opendaylight.mdsal.common.api.CommitInfo;
43 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
44 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
45 import org.opendaylight.protocol.bgp.parser.BGPError;
46 import org.opendaylight.protocol.bgp.parser.impl.message.update.LocalPreferenceAttributeParser;
47 import org.opendaylight.protocol.bgp.parser.spi.MessageUtil;
48 import org.opendaylight.protocol.bgp.parser.spi.RevisedErrorHandlingSupport;
49 import org.opendaylight.protocol.bgp.rib.impl.config.BgpPeer;
50 import org.opendaylight.protocol.bgp.rib.impl.config.GracefulRestartUtil;
51 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
52 import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
53 import org.opendaylight.protocol.bgp.rib.impl.state.BGPSessionStateProvider;
54 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
55 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
56 import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
57 import org.opendaylight.protocol.bgp.rib.spi.RIBSupport;
58 import org.opendaylight.protocol.bgp.rib.spi.RouterIds;
59 import org.opendaylight.protocol.bgp.rib.spi.state.BGPSessionState;
60 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTimersState;
61 import org.opendaylight.protocol.bgp.rib.spi.state.BGPTransportState;
62 import org.opendaylight.protocol.util.Ipv4Util;
63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
64 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressNoZone;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.ipv4.prefixes.DestinationIpv4Builder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.ipv4.prefixes.destination.ipv4.Ipv4Prefixes;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.ipv4.prefixes.destination.ipv4.Ipv4PrefixesBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.update.attributes.mp.reach.nlri.advertized.routes.destination.type.DestinationIpv4CaseBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Update;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.open.message.BgpParameters;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.path.attributes.Attributes;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.path.attributes.AttributesBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.update.message.Nlri;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.BgpAddPathTableType;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.BgpTableType;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.RouteRefresh;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.SendReceive;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.attributes.reach.MpReachNlri;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.attributes.reach.MpReachNlriBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.attributes.reach.mp.reach.nlri.AdvertizedRoutesBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.attributes.unreach.MpUnreachNlri;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.attributes.unreach.MpUnreachNlriBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.attributes.unreach.mp.unreach.nlri.WithdrawnRoutesBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.GracefulRestartCapability;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.mp.capabilities.add.path.capability.AddressFamilies;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.peer.rpc.rev180329.ResetSession;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.peer.rpc.rev180329.RestartGracefully;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.peer.rpc.rev180329.RouteRefreshRequest;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerRole;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.PeerKey;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.ClusterIdentifier;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.Ipv4AddressFamily;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.RouteTarget;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.UnicastSubsequentAddressFamily;
96 import org.opendaylight.yangtools.concepts.Registration;
97 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
98 import org.opendaylight.yangtools.yang.binding.Notification;
99 import org.opendaylight.yangtools.yang.common.Empty;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
102 import org.slf4j.Logger;
103 import org.slf4j.LoggerFactory;
104
105 /**
106  * Class representing a peer. We have a single instance for each peer, which provides translation from BGP events into
107  * RIB actions.
108  */
109 public class BGPPeer extends AbstractPeer implements BGPSessionListener {
110     private static final Logger LOG = LoggerFactory.getLogger(BGPPeer.class);
111     private static final TablesKey IPV4_UCAST_TABLE_KEY =
112         new TablesKey(Ipv4AddressFamily.VALUE, UnicastSubsequentAddressFamily.VALUE);
113
114     private final RIB rib;
115
116     // FIXME: Alright, this right here is a ton of state which has intertwined initialization and dependencies Split
117     //        these out into separate behavior objects. This also has relationship with state in AbstractPeer -- which
118     //        hints at an obvious layer of indirection. Yeah, yeah, we can always add one of those, but the point
119     //        is that this class is a mutable meeting point, whereas the behaviour has captured invariants.
120     private final LoadingCache<NodeIdentifierWithPredicates, YangInstanceIdentifier> tablesIId =
121         CacheBuilder.newBuilder().build(new CacheLoader<NodeIdentifierWithPredicates, YangInstanceIdentifier>() {
122             @Override
123             public YangInstanceIdentifier load(final NodeIdentifierWithPredicates key) {
124                 return peerRibOutIId.node(TABLES_NID).node(key).toOptimized();
125             }
126         });
127
128     private ImmutableSet<TablesKey> tables = ImmutableSet.of();
129     private final Map<TablesKey, AdjRibOutListener> adjRibOutListenerSet = new HashMap<>();
130     private final List<RouteTarget> rtMemberships = new ArrayList<>();
131     private final RpcProviderService rpcRegistry;
132     private final BGPTableTypeRegistryConsumer tableTypeRegistry;
133     private final BgpPeer bgpPeer;
134
135     // FIXME: This should be a constant co-located with ApplicationPeer.peerId
136     private YangInstanceIdentifier peerRibOutIId;
137     @GuardedBy("this")
138     private Registration trackerRegistration;
139
140     @GuardedBy("this")
141     private BGPSession currentSession;
142     @GuardedBy("this")
143     private AdjRibInWriter ribWriter;
144     @GuardedBy("this")
145     private EffectiveRibInWriter effRibInWriter;
146     private Registration rpcRegistration;
147     private ImmutableMap<TablesKey, SendReceive> addPathTableMaps = ImmutableMap.of();
148     // FIXME: This should be a constant co-located with ApplicationPeer.peerId
149     private YangInstanceIdentifier peerPath;
150     // FIXME: This is for supportsTable() -- a trivial behavior thing, where 'peer-down' type states always return false
151     private boolean sessionUp;
152     private boolean llgrSupport;
153     private Stopwatch peerRestartStopwatch;
154     private long currentSelectionDeferralTimerSeconds;
155     private final List<TablesKey> missingEOT = new ArrayList<>();
156
157     public BGPPeer(
158             final BGPTableTypeRegistryConsumer tableTypeRegistry,
159             final IpAddressNoZone neighborAddress,
160             final String peerGroupName,
161             final RIB rib,
162             final PeerRole role,
163             final ClusterIdentifier clusterId,
164             final AsNumber localAs,
165             final RpcProviderService rpcRegistry,
166             final Set<TablesKey> afiSafisAdvertized,
167             final Set<TablesKey> afiSafisGracefulAdvertized,
168             final Map<TablesKey, Integer> llGracefulTablesAdvertised,
169             final BgpPeer bgpPeer) {
170         super(rib, Ipv4Util.toStringIP(neighborAddress), peerGroupName, role, clusterId,
171                 localAs, neighborAddress, afiSafisAdvertized, afiSafisGracefulAdvertized, llGracefulTablesAdvertised);
172         this.tableTypeRegistry = requireNonNull(tableTypeRegistry);
173         this.rib = requireNonNull(rib);
174         this.rpcRegistry = rpcRegistry;
175         this.bgpPeer = bgpPeer;
176     }
177
178     private static Attributes nextHopToAttribute(final Attributes attrs, final MpReachNlri mpReach) {
179         if (attrs.getCNextHop() == null && mpReach.getCNextHop() != null) {
180             final AttributesBuilder attributesBuilder = new AttributesBuilder(attrs);
181             attributesBuilder.setCNextHop(mpReach.getCNextHop());
182             return attributesBuilder.build();
183         }
184         return attrs;
185     }
186
187     /**
188      * Creates MPReach for the prefixes to be handled in the same way as linkstate routes.
189      *
190      * @param message Update message containing prefixes in NLRI
191      * @return MpReachNlri with prefixes from the nlri field
192      */
193     private static MpReachNlri prefixesToMpReach(final Update message) {
194         final List<Ipv4Prefixes> prefixes = message.getNlri().stream()
195                 .map(n -> new Ipv4PrefixesBuilder().setPrefix(n.getPrefix()).setPathId(n.getPathId()).build())
196                 .collect(Collectors.toList());
197         final MpReachNlriBuilder b = new MpReachNlriBuilder()
198             .setAfi(Ipv4AddressFamily.VALUE)
199             .setSafi(UnicastSubsequentAddressFamily.VALUE)
200             .setAdvertizedRoutes(new AdvertizedRoutesBuilder()
201                 .setDestinationType(new DestinationIpv4CaseBuilder()
202                     .setDestinationIpv4(new DestinationIpv4Builder().setIpv4Prefixes(prefixes).build())
203                     .build())
204                 .build());
205         if (message.getAttributes() != null) {
206             b.setCNextHop(message.getAttributes().getCNextHop());
207         }
208         return b.build();
209     }
210
211     /**
212      * Create MPUnreach for the prefixes to be handled in the same way as linkstate routes.
213      *
214      * @param message            Update message containing withdrawn routes
215      * @param isAnyNlriAnnounced isAnyNlriAnnounced
216      * @return MpUnreachNlri with prefixes from the withdrawn routes field
217      */
218     private static MpUnreachNlri prefixesToMpUnreach(final Update message, final boolean isAnyNlriAnnounced) {
219         final List<Ipv4Prefixes> prefixes = new ArrayList<>();
220         message.getWithdrawnRoutes().forEach(w -> {
221
222             Optional<Nlri> nlriAnounced = Optional.empty();
223             if (isAnyNlriAnnounced) {
224                 nlriAnounced = message.getNlri().stream().filter(n -> Objects.equals(n.getPrefix(), w.getPrefix())
225                         && Objects.equals(n.getPathId(), w.getPathId()))
226                         .findAny();
227             }
228             if (!nlriAnounced.isPresent()) {
229                 prefixes.add(new Ipv4PrefixesBuilder().setPrefix(w.getPrefix()).setPathId(w.getPathId()).build());
230             }
231         });
232         return new MpUnreachNlriBuilder().setAfi(Ipv4AddressFamily.VALUE).setSafi(UnicastSubsequentAddressFamily.VALUE)
233                 .setWithdrawnRoutes(new WithdrawnRoutesBuilder().setDestinationType(new org.opendaylight.yang.gen.v1
234                         .urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.update.attributes.mp.unreach.nlri
235                         .withdrawn.routes.destination.type.DestinationIpv4CaseBuilder().setDestinationIpv4(
236                             new DestinationIpv4Builder().setIpv4Prefixes(prefixes).build()).build()).build()).build();
237     }
238
239     private static ImmutableMap<TablesKey, SendReceive> mapTableTypesFamilies(
240             final List<AddressFamilies> addPathTablesType) {
241         return addPathTablesType.stream().collect(ImmutableMap.toImmutableMap(
242             af -> new TablesKey(af.getAfi(), af.getSafi()), BgpAddPathTableType::getSendReceive));
243     }
244
245     public synchronized void instantiateServiceInstance() {
246         createDomChain();
247         ribWriter = AdjRibInWriter.create(rib.getYangRibId(), getRole(), this);
248         setActive(true);
249     }
250
251     @Override
252     public synchronized FluentFuture<? extends CommitInfo> close() {
253         final FluentFuture<? extends CommitInfo> future = releaseConnection(true);
254         closeDomChain();
255         setActive(false);
256         return future;
257     }
258
259     @Override
260     public void onMessage(final BGPSession session, final Notification<?> msg) throws BGPDocumentedException {
261         if (msg instanceof Update update) {
262             onUpdateMessage(update);
263         } else if (msg instanceof RouteRefresh routeRefresh) {
264             onRouteRefreshMessage(routeRefresh);
265         } else {
266             LOG.info("Ignoring unhandled message class {}", msg.getClass());
267         }
268     }
269
270     private void onRouteRefreshMessage(final RouteRefresh message) {
271         final var rrAfi = message.getAfi();
272         final var rrSafi = message.getSafi();
273
274         final TablesKey key = new TablesKey(rrAfi, rrSafi);
275         synchronized (this) {
276             final AdjRibOutListener listener = adjRibOutListenerSet.remove(key);
277             if (listener != null) {
278                 listener.close();
279                 createAdjRibOutListener(key, listener.isMpSupported());
280             } else {
281                 LOG.info("Ignoring RouteRefresh message. Afi/Safi is not supported: {}, {}.", rrAfi, rrSafi);
282             }
283         }
284     }
285
286     /**
287      * Check for presence of well known mandatory attribute LOCAL_PREF in Update message.
288      *
289      * @param message Update message
290      */
291     private void checkMandatoryAttributesPresence(final Update message) throws BGPDocumentedException {
292         if (MessageUtil.isAnyNlriPresent(message)) {
293             final Attributes attrs = message.getAttributes();
294             if (getRole() == PeerRole.Ibgp && (attrs == null || attrs.getLocalPref() == null)) {
295                 throw new BGPDocumentedException(BGPError.MANDATORY_ATTR_MISSING_MSG + "LOCAL_PREF",
296                         BGPError.WELL_KNOWN_ATTR_MISSING,
297                         new byte[]{LocalPreferenceAttributeParser.TYPE});
298             }
299         }
300     }
301
302     /**
303      * Process Update message received.
304      * Calls {@link #checkMandatoryAttributesPresence(Update)} to check for presence of mandatory attributes.
305      *
306      * @param message Update message
307      */
308     private synchronized void onUpdateMessage(final Update message) throws BGPDocumentedException {
309         checkMandatoryAttributesPresence(message);
310
311         // update AdjRibs
312         final Attributes attrs = message.getAttributes();
313         MpReachNlri mpReach;
314         final boolean isAnyNlriAnnounced = message.getNlri() != null;
315         if (isAnyNlriAnnounced) {
316             mpReach = prefixesToMpReach(message);
317         } else {
318             mpReach = MessageUtil.getMpReachNlri(attrs);
319         }
320         if (mpReach != null) {
321             ribWriter.updateRoutes(mpReach, nextHopToAttribute(attrs, mpReach));
322         }
323         final MpUnreachNlri mpUnreach;
324         if (message.getWithdrawnRoutes() != null) {
325             mpUnreach = prefixesToMpUnreach(message, isAnyNlriAnnounced);
326         } else {
327             mpUnreach = MessageUtil.getMpUnreachNlri(attrs);
328         }
329         final boolean endOfRib = BgpPeerUtil.isEndOfRib(message);
330         if (mpUnreach != null) {
331             if (endOfRib) {
332                 final TablesKey tablesKey = new TablesKey(mpUnreach.getAfi(), mpUnreach.getSafi());
333                 ribWriter.removeStaleRoutes(tablesKey);
334                 missingEOT.remove(tablesKey);
335                 handleGracefulEndOfRib();
336             } else {
337                 ribWriter.removeRoutes(mpUnreach);
338             }
339         } else if (endOfRib) {
340             ribWriter.removeStaleRoutes(IPV4_UCAST_TABLE_KEY);
341             missingEOT.remove(IPV4_UCAST_TABLE_KEY);
342             handleGracefulEndOfRib();
343         }
344     }
345
346     @Holding("this")
347     private void handleGracefulEndOfRib() {
348         if (isLocalRestarting()) {
349             if (missingEOT.isEmpty()) {
350                 createEffRibInWriter();
351                 effRibInWriter.init();
352                 registerPrefixesCounters(effRibInWriter, effRibInWriter);
353                 for (final TablesKey key : getAfiSafisAdvertized()) {
354                     createAdjRibOutListener(key, true);
355                 }
356                 setLocalRestartingState(false);
357                 setGracefulPreferences(false, Collections.emptySet());
358             }
359         }
360     }
361
362     @Override
363     public synchronized void onSessionUp(final BGPSession session) {
364         currentSession = session;
365         sessionUp = true;
366
367         final var chain = rib.createPeerDOMChain();
368         ribOutChain = chain;
369         chain.addCallback(new FutureCallback<Empty>() {
370             @Override
371             public void onSuccess(final Empty result) {
372                 LOG.debug("RibOut transaction chain {} successful.", chain);
373             }
374
375             @Override
376             public void onFailure(final Throwable cause) {
377                 onRibOutChainFailed(cause);
378             }
379         });
380
381         if (currentSession instanceof BGPSessionStateProvider stateProvider) {
382             stateProvider.registerMessagesCounter(this);
383         }
384         final GracefulRestartCapability advertisedGracefulRestartCapability =
385                 session.getAdvertisedGracefulRestartCapability();
386         final var advertisedTables = advertisedGracefulRestartCapability.getTables();
387         final var advertisedLLTables = session.getAdvertisedLlGracefulRestartCapability().getTables();
388
389         final List<AddressFamilies> addPathTablesType = session.getAdvertisedAddPathTableTypes();
390         final Set<BgpTableType> advertizedTableTypes = session.getAdvertisedTableTypes();
391         LOG.info("Session with peer {} went up with tables {} and Add Path tables {}", getName(),
392                 advertizedTableTypes, addPathTablesType);
393         final Set<TablesKey> setTables = advertizedTableTypes.stream().map(t -> new TablesKey(t.getAfi(), t.getSafi()))
394                 .collect(Collectors.toSet());
395         tables = ImmutableSet.copyOf(setTables);
396
397         addPathTableMaps = mapTableTypesFamilies(addPathTablesType);
398         final boolean restartingLocally = isLocalRestarting();
399         if (!restartingLocally) {
400             addBgp4Support();
401         }
402         if (!isRestartingGracefully()) {
403             peerId = RouterIds.createPeerId(session.getBgpId());
404
405             final KeyedInstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib
406                 .rev180329.bgp.rib.rib.Peer, PeerKey> peerIId = getInstanceIdentifier().child(org.opendaylight.yang.gen
407                     .v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.Peer.class,
408                     new PeerKey(peerId));
409             peerPath = createPeerPath(peerId);
410             peerRibOutIId = peerPath.node(ADJRIBOUT_NID);
411             trackerRegistration = rib.getPeerTracker().registerPeer(this);
412             createEffRibInWriter();
413             registerPrefixesCounters(effRibInWriter, effRibInWriter);
414
415             effRibInWriter.init();
416             ribWriter = ribWriter.transform(peerId, peerPath, rib.getRibSupportContext(),
417                     tables, addPathTableMaps);
418
419             if (rpcRegistry != null) {
420                 final var bgpPeerHandler = new BgpPeerRpc(this, session, tables);
421                 rpcRegistration = rpcRegistry.registerRpcImplementations(List.of(
422                     (ResetSession) bgpPeerHandler::resetSession,
423                     (RestartGracefully) bgpPeerHandler::restartGracefully,
424                     (RouteRefreshRequest) bgpPeerHandler::routeRefreshRequest), ImmutableSet.of(
425                         rib.getInstanceIdentifier().child(
426                             org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib
427                             .rib.Peer.class, new PeerKey(peerId))));
428             }
429         } else {
430             final Set<TablesKey> forwardingTables;
431             if (advertisedTables == null) {
432                 forwardingTables = Collections.emptySet();
433             } else {
434                 forwardingTables = advertisedTables.values().stream()
435                         .filter(table -> table.getAfiFlags() != null)
436                         .filter(table -> table.getAfiFlags().getForwardingState())
437                         .map(table -> new TablesKey(table.getAfi(), table.getSafi()))
438                         .collect(Collectors.toSet());
439             }
440             ribWriter.clearTables(Sets.difference(tables, forwardingTables));
441             if (restartingLocally) {
442                 effRibInWriter.close();
443                 peerRestartStopwatch = Stopwatch.createStarted();
444                 handleSelectionReferralTimer();
445                 missingEOT.addAll(tables);
446             }
447         }
448         if (advertisedTables == null || advertisedTables.isEmpty()) {
449             setAdvertizedGracefulRestartTableTypes(Collections.emptyList());
450         } else {
451             setAdvertizedGracefulRestartTableTypes(advertisedTables.values().stream()
452                     .map(t -> new TablesKey(t.getAfi(), t.getSafi())).collect(Collectors.toList()));
453         }
454         setAfiSafiGracefulRestartState(advertisedGracefulRestartCapability.getRestartTime().toJava(), false,
455             restartingLocally);
456
457         final Map<TablesKey, Integer> llTablesReceived;
458         if (advertisedLLTables != null) {
459             llTablesReceived = new HashMap<>();
460             for (var table : advertisedLLTables.values()) {
461                 llTablesReceived.put(new TablesKey(table.getAfi(), table.getSafi()),
462                     table.getLongLivedStaleTime().getValue().intValue());
463             }
464         } else {
465             llTablesReceived = Collections.emptyMap();
466         }
467         setAdvertizedLlGracefulRestartTableTypes(llTablesReceived);
468
469         if (!llTablesReceived.isEmpty()) {
470             llgrSupport = true;
471             // FIXME: propagate preserved tables
472         } else {
473             // FIXME: clear preserved tables
474             llgrSupport = false;
475         }
476
477         if (!restartingLocally) {
478             if (!setTables.contains(IPV4_UCAST_TABLE_KEY)) {
479                 createAdjRibOutListener(IPV4_UCAST_TABLE_KEY, false);
480             }
481             for (final TablesKey key : getAfiSafisAdvertized()) {
482                 createAdjRibOutListener(key, true);
483             }
484         }
485
486         // SpotBugs does not grok Optional.ifPresent() and thinks we are using unsynchronized access
487         final Optional<RevisedErrorHandlingSupport> errorHandling = bgpPeer.getErrorHandling();
488         if (errorHandling.isPresent()) {
489             currentSession.addDecoderConstraint(RevisedErrorHandlingSupport.class, errorHandling.orElseThrow());
490         }
491     }
492
493     private boolean isRestartingGracefully() {
494         return isLocalRestarting() || isPeerRestarting();
495     }
496
497     private synchronized void createEffRibInWriter() {
498         final var chain = rib.createPeerDOMChain();
499         chain.addCallback(this);
500
501         effRibInWriter = new EffectiveRibInWriter(this, rib, chain, peerPath, tables, tableTypeRegistry, rtMemberships,
502             rtCache);
503     }
504
505     //try to add a support for old-school BGP-4, if peer did not advertise IPv4-Unicast MP capability
506     @Holding("this")
507     private void addBgp4Support() {
508         if (!tables.contains(IPV4_UCAST_TABLE_KEY)) {
509             final HashSet<TablesKey> newSet = new HashSet<>(tables);
510             newSet.add(IPV4_UCAST_TABLE_KEY);
511             tables = ImmutableSet.copyOf(newSet);
512         }
513     }
514
515     @Holding("this")
516     private void createAdjRibOutListener(final TablesKey key, final boolean mpSupport) {
517         final RIBSupport<?, ?> ribSupport = rib.getRibSupportContext().getRIBSupport(key);
518
519         // not particularly nice
520         if (ribSupport != null && currentSession instanceof BGPSessionImpl bgpSession) {
521             final AdjRibOutListener adjRibOut = AdjRibOutListener.create(peerId, rib.getYangRibId(),
522                 rib.getCodecsRegistry(), ribSupport, rib.getService(), bgpSession.getLimiter(), mpSupport);
523             adjRibOutListenerSet.put(key, adjRibOut);
524             registerPrefixesSentCounter(key, adjRibOut);
525         }
526     }
527
528     @Override
529     public synchronized void onSessionDown(final BGPSession session, final Exception exc) {
530         if (exc.getMessage().equals(BGPSessionImpl.END_OF_INPUT)) {
531             LOG.info("Session with peer {} went down", getName());
532         } else {
533             LOG.info("Session with peer {} went down", getName(), exc);
534         }
535         releaseConnectionGracefully();
536     }
537
538     @Override
539     public synchronized void onSessionTerminated(final BGPSession session, final BGPTerminationReason cause) {
540         LOG.info("Session with peer {} terminated: {}", getName(), cause);
541         releaseConnectionGracefully();
542     }
543
544     @Override
545     public String toString() {
546         return MoreObjects.toStringHelper(this).add("name", getName()).add("tables", tables).toString();
547     }
548
549     @Override
550     public synchronized FluentFuture<? extends CommitInfo> releaseConnection() {
551         return releaseConnection(true);
552     }
553
554     /**
555      * On transaction chain failure, we don't want to wait for future.
556      *
557      * @param isWaitForSubmitted if true, wait for submitted future before closing binding chain. if false, don't wait.
558      */
559     @Holding("this")
560     private @NonNull FluentFuture<? extends CommitInfo> releaseConnection(final boolean isWaitForSubmitted) {
561         LOG.info("Closing session with peer");
562         sessionUp = false;
563         adjRibOutListenerSet.values().forEach(AdjRibOutListener::close);
564         adjRibOutListenerSet.clear();
565         final FluentFuture<? extends CommitInfo> future;
566         // FIXME: this is a typical example of something which should be handled by a behavior into which we have
567         //        transitioned way before this method is called. This really begs to be an abstract base class with
568         //        a 'clearTables' or similar callout
569         if (isRestartingGracefully()) {
570             final Set<TablesKey> gracefulTables = getGracefulTables();
571             ribWriter.storeStaleRoutes(gracefulTables);
572             future = ribWriter.clearTables(Sets.difference(tables, gracefulTables));
573             if (isPeerRestarting()) {
574                 peerRestartStopwatch = Stopwatch.createStarted();
575                 handleRestartTimer();
576             }
577         } else {
578             future = terminateConnection();
579         }
580         releaseRibOutChain(isWaitForSubmitted);
581
582         closeSession();
583         return future;
584     }
585
586     @Holding("this")
587     @SuppressWarnings("checkstyle:illegalCatch")
588     private void closeSession() {
589         if (currentSession != null) {
590             try {
591                 if (isRestartingGracefully()) {
592                     currentSession.closeWithoutMessage();
593                 } else {
594                     currentSession.close();
595                 }
596             } catch (final Exception e) {
597                 LOG.warn("Error closing session with peer", e);
598             }
599             currentSession = null;
600         }
601     }
602
603     private Set<TablesKey> getGracefulTables() {
604         return tables.stream()
605                 .filter(this::isGracefulRestartReceived)
606                 .filter(this::isGracefulRestartAdvertized)
607                 .collect(Collectors.toSet());
608     }
609
610     private synchronized FluentFuture<? extends CommitInfo> terminateConnection() {
611         if (trackerRegistration != null) {
612             trackerRegistration.close();
613             trackerRegistration = null;
614         }
615         if (rpcRegistration != null) {
616             rpcRegistration.close();
617         }
618         ribWriter.releaseChain();
619
620         if (effRibInWriter != null) {
621             effRibInWriter.close();
622         }
623         tables = ImmutableSet.of();
624         addPathTableMaps = ImmutableMap.of();
625         final var future = removePeer(peerPath);
626         resetState();
627
628         return future;
629     }
630
631     /**
632      * If Graceful Restart Timer expires, remove all routes advertised by peer.
633      */
634     private synchronized void handleRestartTimer() {
635         if (!isPeerRestarting()) {
636             return;
637         }
638
639         final long peerRestartTimeNanos = TimeUnit.SECONDS.toNanos(getPeerRestartTime());
640         final long elapsedNanos = peerRestartStopwatch.elapsed(TimeUnit.NANOSECONDS);
641         if (elapsedNanos >= peerRestartTimeNanos) {
642             setAfiSafiGracefulRestartState(0, false, false);
643             onSessionTerminated(currentSession, new BGPTerminationReason(BGPError.HOLD_TIMER_EXPIRED));
644         }
645
646         currentSession.schedule(this::handleRestartTimer, peerRestartTimeNanos - elapsedNanos, TimeUnit.NANOSECONDS);
647     }
648
649     private synchronized void handleSelectionReferralTimer() {
650         if (!isLocalRestarting()) {
651             return;
652         }
653
654         final long referalTimerNanos = TimeUnit.SECONDS.toNanos(currentSelectionDeferralTimerSeconds);
655         final long elapsedNanos = peerRestartStopwatch.elapsed(TimeUnit.NANOSECONDS);
656         if (elapsedNanos >= referalTimerNanos) {
657             missingEOT.clear();
658             handleGracefulEndOfRib();
659         }
660         currentSession.schedule(this::handleSelectionReferralTimer, referalTimerNanos - elapsedNanos,
661             TimeUnit.NANOSECONDS);
662     }
663
664     @Holding("this")
665     private void releaseConnectionGracefully() {
666         if (getPeerRestartTime() > 0) {
667             setRestartingState();
668         }
669         releaseConnection(true);
670     }
671
672     @SuppressFBWarnings("IS2_INCONSISTENT_SYNC")
673     @Override
674     public SendReceive getSupportedAddPathTables(final TablesKey tableKey) {
675         return addPathTableMaps.get(tableKey);
676     }
677
678     @Override
679     public boolean supportsTable(final TablesKey tableKey) {
680         return sessionUp && getAfiSafisAdvertized().contains(tableKey) && tables.contains(tableKey);
681     }
682
683     @Override
684     public YangInstanceIdentifier getRibOutIId(final NodeIdentifierWithPredicates tablekey) {
685         return tablesIId.getUnchecked(tablekey);
686     }
687
688     @Override
689     public synchronized void onFailure(final Throwable cause) {
690         LOG.error("Transaction domChain failed.", cause);
691         releaseConnection(true);
692     }
693
694     private synchronized void onRibOutChainFailed(final Throwable cause) {
695         LOG.error("RibOut transaction chain failed.", cause);
696         releaseConnection(false);
697     }
698
699     @Override
700     public synchronized void markUptodate(final TablesKey tablesKey) {
701         ribWriter.markTableUptodate(tablesKey);
702     }
703
704     @Override
705     public synchronized BGPSessionState getBGPSessionState() {
706         if (currentSession instanceof BGPSessionStateProvider stateProvider) {
707             return stateProvider.getBGPSessionState();
708         }
709         return null;
710     }
711
712     @Override
713     public synchronized BGPTimersState getBGPTimersState() {
714         if (currentSession instanceof BGPSessionStateProvider stateProvider) {
715             return stateProvider.getBGPTimersState();
716         }
717         return null;
718     }
719
720     @Override
721     public synchronized BGPTransportState getBGPTransportState() {
722         if (currentSession instanceof BGPSessionStateProvider stateProvider) {
723             return stateProvider.getBGPTransportState();
724         }
725         return null;
726     }
727
728     @Override
729     public List<RouteTarget> getMemberships() {
730         return rtMemberships;
731     }
732
733     @Override
734     public synchronized ListenableFuture<?> restartGracefully(final long selectionDeferralTimerSeconds) {
735         final Set<TablesKey> tablesToPreserve = getGracefulTables();
736         if (tablesToPreserve == null || tablesToPreserve.isEmpty()) {
737             LOG.info("Peer {} is not capable of graceful restart or have no matching graceful tables.", peerId);
738             return Futures.immediateFailedFuture(new UnsupportedOperationException(
739                     "Peer is not capable of graceful restart"));
740         }
741         setGracefulPreferences(true, tablesToPreserve);
742         currentSelectionDeferralTimerSeconds = selectionDeferralTimerSeconds;
743         setLocalRestartingState(true);
744         return releaseConnection(true);
745     }
746
747     @Override
748     boolean supportsLLGR() {
749         return llgrSupport;
750     }
751
752     private synchronized void setGracefulPreferences(final boolean localRestarting,
753                                                      final Set<TablesKey> preservedTables) {
754         final Set<TablesKey> gracefulTables = tables.stream()
755                 .filter(this::isGracefulRestartAdvertized)
756                 .collect(Collectors.toSet());
757         final BgpParameters bgpParameters = GracefulRestartUtil.getGracefulBgpParameters(
758                 bgpPeer.getBgpFixedCapabilities(), gracefulTables, preservedTables,
759                 bgpPeer.getGracefulRestartTimer(), localRestarting, Collections.emptySet());
760         final BGPSessionPreferences oldPrefs = rib.getDispatcher().getBGPPeerRegistry()
761                 .getPeerPreferences(getNeighborAddress());
762         final BGPSessionPreferences newPrefs = new BGPSessionPreferences(
763                 oldPrefs.getMyAs(),
764                 oldPrefs.getHoldTime(),
765                 oldPrefs.getBgpId(),
766                 oldPrefs.getExpectedRemoteAs(),
767                 Collections.singletonList(bgpParameters),
768                 oldPrefs.getMd5Password());
769         rib.getDispatcher().getBGPPeerRegistry()
770                 .updatePeerPreferences(getNeighborAddress(), newPrefs);
771     }
772 }