Update RIB-based constants
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / AdjRibInWriter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.ADJRIBIN_NID;
12 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.ADJRIBOUT_NID;
13 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.ATTRIBUTES_NID;
14 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.EFFRIBIN_NID;
15 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.TABLES_NID;
16 import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.UPTODATE_NID;
17
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.ImmutableMap.Builder;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Locale;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutionException;
36 import java.util.stream.Collectors;
37 import javax.annotation.Nonnull;
38 import javax.annotation.Nullable;
39 import javax.annotation.concurrent.GuardedBy;
40 import javax.annotation.concurrent.NotThreadSafe;
41 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
44 import org.opendaylight.mdsal.common.api.CommitInfo;
45 import org.opendaylight.protocol.bgp.rib.impl.ApplicationPeer.RegisterAppPeerListener;
46 import org.opendaylight.protocol.bgp.rib.impl.spi.PeerTransactionChain;
47 import org.opendaylight.protocol.bgp.rib.impl.spi.RIBSupportContext;
48 import org.opendaylight.protocol.bgp.rib.impl.spi.RIBSupportContextRegistry;
49 import org.opendaylight.protocol.bgp.rib.spi.IdentifierUtils;
50 import org.opendaylight.protocol.bgp.rib.spi.PeerRoleUtil;
51 import org.opendaylight.protocol.bgp.rib.spi.RIBNormalizedNodes;
52 import org.opendaylight.protocol.bgp.rib.spi.RIBQNames;
53 import org.opendaylight.protocol.bgp.rib.spi.RibSupportUtils;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.SendReceive;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.update.attributes.MpReachNlri;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev180329.update.attributes.MpUnreachNlri;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerRole;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.Peer;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.peer.SupportedTables;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
62 import org.opendaylight.yangtools.yang.common.QName;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.InstanceIdentifierBuilder;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
67 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
68 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
72 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
73 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
74 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
75 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 /**
80  * Writer of Adjacency-RIB-In for a single peer. An instance of this object
81  * is attached to each {@link BGPPeer} and {@link ApplicationPeer}.
82  */
83 @NotThreadSafe
84 final class AdjRibInWriter {
85
86     private static final Logger LOG = LoggerFactory.getLogger(AdjRibInWriter.class);
87
88     private static final QName PEER_ROLE_QNAME = QName.create(Peer.QNAME, "peer-role").intern();
89     private static final NodeIdentifier PEER_ID = NodeIdentifier.create(RIBQNames.PEER_ID_QNAME);
90     private static final NodeIdentifier PEER_ROLE = NodeIdentifier.create(PEER_ROLE_QNAME);
91     private static final NodeIdentifier PEER_TABLES = NodeIdentifier.create(SupportedTables.QNAME);
92     private static final QName SEND_RECEIVE = QName.create(SupportedTables.QNAME, "send-receive").intern();
93
94     // FIXME: is there a utility method to construct this?
95     private static final MapNode EMPTY_TABLES = ImmutableNodes.mapNodeBuilder(TABLES_NID).build();
96     private static final ContainerNode EMPTY_ADJRIBIN = Builders.containerBuilder()
97             .withNodeIdentifier(ADJRIBIN_NID).addChild(EMPTY_TABLES).build();
98     private static final ContainerNode EMPTY_EFFRIBIN = Builders.containerBuilder()
99             .withNodeIdentifier(EFFRIBIN_NID).addChild(EMPTY_TABLES).build();
100     private static final ContainerNode EMPTY_ADJRIBOUT = Builders.containerBuilder()
101             .withNodeIdentifier(ADJRIBOUT_NID).addChild(EMPTY_TABLES).build();
102
103     private final Map<TablesKey, TableContext> tables;
104     private final YangInstanceIdentifier ribPath;
105     private final PeerTransactionChain chain;
106     private final PeerRole role;
107     @GuardedBy("this")
108     private final Map<TablesKey, Collection<NodeIdentifierWithPredicates>> staleRoutesRegistry = new HashMap<>();
109     @GuardedBy("this")
110     private FluentFuture<? extends CommitInfo> submitted;
111
112     private AdjRibInWriter(final YangInstanceIdentifier ribPath, final PeerTransactionChain chain, final PeerRole role,
113             final Map<TablesKey, TableContext> tables) {
114         this.ribPath = requireNonNull(ribPath);
115         this.chain = requireNonNull(chain);
116         this.tables = requireNonNull(tables);
117         this.role = requireNonNull(role);
118     }
119
120     /**
121      * Create a new writer using a transaction chain.
122      *
123      * @param role                peer's role
124      * @param chain               transaction chain  @return A fresh writer instance
125      */
126     static AdjRibInWriter create(@Nonnull final YangInstanceIdentifier ribId, @Nonnull final PeerRole role,
127             @Nonnull final PeerTransactionChain chain) {
128         return new AdjRibInWriter(ribId, chain, role, Collections.emptyMap());
129     }
130
131     /**
132      * Transform this writer to a new writer, which is in charge of specified tables.
133      * Empty tables are created for new entries and old tables are deleted. Once this
134      * method returns, the old instance must not be reasonably used.
135      *
136      * @param newPeerId         new peer BGP identifier
137      * @param peerPath          path of the peer in the datastore
138      * @param registry          RIB extension registry
139      * @param tableTypes        New tables, must not be null
140      * @param addPathTablesType supported add path tables
141      * @return New writer
142      */
143     AdjRibInWriter transform(final PeerId newPeerId, final YangInstanceIdentifier peerPath,
144             final RIBSupportContextRegistry registry,
145             final Set<TablesKey> tableTypes, final Map<TablesKey, SendReceive> addPathTablesType) {
146         return transform(newPeerId, peerPath, registry, tableTypes, addPathTablesType, null);
147     }
148
149     AdjRibInWriter transform(final PeerId newPeerId, final YangInstanceIdentifier peerPath,
150             final RIBSupportContextRegistry registry, final Set<TablesKey> tableTypes,
151             final Map<TablesKey, SendReceive> addPathTablesType,
152             @Nullable final RegisterAppPeerListener registerAppPeerListener) {
153         final DOMDataWriteTransaction tx = this.chain.getDomChain().newWriteOnlyTransaction();
154
155         createEmptyPeerStructure(newPeerId, peerPath, tx);
156         final ImmutableMap<TablesKey, TableContext> tb = createNewTableInstances(peerPath, registry, tableTypes,
157                 addPathTablesType, tx);
158
159         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
160             @Override
161             public void onSuccess(final CommitInfo result) {
162                 if (registerAppPeerListener != null) {
163                     LOG.trace("Application Peer Listener registered");
164                     registerAppPeerListener.register();
165                 }
166             }
167
168             @Override
169             public void onFailure(final Throwable throwable) {
170                 if (registerAppPeerListener != null) {
171                     LOG.error("Failed to create Empty Structure, Application Peer Listener won't be registered",
172                             throwable);
173                 } else {
174                     LOG.error("Failed to create Empty Structure", throwable);
175                 }
176             }
177         }, MoreExecutors.directExecutor());
178         return new AdjRibInWriter(this.ribPath, this.chain, this.role, tb);
179     }
180
181     /**
182      * Create new table instances, potentially creating their empty entries.
183      */
184     private static ImmutableMap<TablesKey, TableContext> createNewTableInstances(
185             final YangInstanceIdentifier newPeerPath, final RIBSupportContextRegistry registry,
186             final Set<TablesKey> tableTypes, final Map<TablesKey, SendReceive> addPathTablesType,
187             final DOMDataWriteTransaction tx) {
188
189         final Builder<TablesKey, TableContext> tb = ImmutableMap.builder();
190         for (final TablesKey tableKey : tableTypes) {
191             final RIBSupportContext rs = registry.getRIBSupportContext(tableKey);
192             // TODO: Use returned value once Instance Identifier builder allows for it.
193             final NodeIdentifierWithPredicates instanceIdentifierKey = RibSupportUtils.toYangTablesKey(tableKey);
194             if (rs == null) {
195                 LOG.warn("No support for table type {}, skipping it", tableKey);
196                 continue;
197             }
198             installAdjRibsOutTables(newPeerPath, rs, instanceIdentifierKey, tableKey,
199                     addPathTablesType.get(tableKey), tx);
200             installAdjRibInTables(newPeerPath, tableKey, rs, instanceIdentifierKey, tx, tb);
201         }
202         return tb.build();
203     }
204
205     private static void installAdjRibInTables(final YangInstanceIdentifier newPeerPath, final TablesKey tableKey,
206             final RIBSupportContext rs, final NodeIdentifierWithPredicates instanceIdentifierKey,
207             final DOMDataWriteTransaction tx, final Builder<TablesKey, TableContext> tb) {
208         // We will use table keys very often, make sure they are optimized
209         final InstanceIdentifierBuilder idb = YangInstanceIdentifier.builder(newPeerPath
210                 .node(EMPTY_ADJRIBIN.getIdentifier()).node(TABLES_NID));
211         idb.nodeWithKey(instanceIdentifierKey.getNodeType(), instanceIdentifierKey.getKeyValues());
212
213         final TableContext ctx = new TableContext(rs, idb.build());
214         ctx.createEmptyTableStructure(tx);
215
216         tx.merge(LogicalDatastoreType.OPERATIONAL, ctx.getTableId().node(ATTRIBUTES_NID).node(UPTODATE_NID),
217             RIBNormalizedNodes.ATTRIBUTES_UPTODATE_FALSE);
218         LOG.debug("Created table instance {}", ctx.getTableId());
219         tb.put(tableKey, ctx);
220     }
221
222     private static void installAdjRibsOutTables(final YangInstanceIdentifier newPeerPath, final RIBSupportContext rs,
223             final NodeIdentifierWithPredicates instanceIdentifierKey, final TablesKey tableKey,
224             final SendReceive sendReceive, final DOMDataWriteTransaction tx) {
225         final NodeIdentifierWithPredicates supTablesKey = RibSupportUtils.toYangKey(SupportedTables.QNAME, tableKey);
226         final DataContainerNodeAttrBuilder<NodeIdentifierWithPredicates, MapEntryNode> tt =
227                 Builders.mapEntryBuilder().withNodeIdentifier(supTablesKey);
228         for (final Entry<QName, Object> e : supTablesKey.getKeyValues().entrySet()) {
229             tt.withChild(ImmutableNodes.leafNode(e.getKey(), e.getValue()));
230         }
231         if (sendReceive != null) {
232             tt.withChild(ImmutableNodes.leafNode(SEND_RECEIVE, sendReceive.toString().toLowerCase(Locale.ENGLISH)));
233         }
234         tx.put(LogicalDatastoreType.OPERATIONAL, newPeerPath.node(PEER_TABLES).node(supTablesKey), tt.build());
235         rs.createEmptyTableStructure(tx, newPeerPath.node(EMPTY_ADJRIBOUT.getIdentifier())
236                 .node(TABLES_NID).node(instanceIdentifierKey));
237     }
238
239     private void createEmptyPeerStructure(final PeerId newPeerId,
240             final YangInstanceIdentifier peerPath, final DOMDataWriteTransaction tx) {
241         final NodeIdentifierWithPredicates peerKey = IdentifierUtils.domPeerId(newPeerId);
242
243         tx.put(LogicalDatastoreType.OPERATIONAL, peerPath, peerSkeleton(peerKey, newPeerId.getValue()));
244         LOG.debug("New peer {} structure installed.", peerPath);
245     }
246
247     @VisibleForTesting
248     MapEntryNode peerSkeleton(final NodeIdentifierWithPredicates peerKey, final String peerId) {
249         final DataContainerNodeBuilder<NodeIdentifierWithPredicates, MapEntryNode> pb = Builders.mapEntryBuilder();
250         pb.withNodeIdentifier(peerKey);
251         pb.withChild(ImmutableNodes.leafNode(PEER_ID, peerId));
252         pb.withChild(ImmutableNodes.leafNode(PEER_ROLE, PeerRoleUtil.roleForString(this.role)));
253         pb.withChild(ImmutableMapNodeBuilder.create().withNodeIdentifier(PEER_TABLES).build());
254         pb.withChild(EMPTY_ADJRIBIN);
255         pb.withChild(EMPTY_EFFRIBIN);
256         pb.withChild(EMPTY_ADJRIBOUT);
257         return pb.build();
258     }
259
260     void markTableUptodate(final TablesKey tableTypes) {
261         final DOMDataWriteTransaction tx = this.chain.getDomChain().newWriteOnlyTransaction();
262         final TableContext ctx = this.tables.get(tableTypes);
263         tx.merge(LogicalDatastoreType.OPERATIONAL, ctx.getTableId().node(ATTRIBUTES_NID).node(UPTODATE_NID),
264             RIBNormalizedNodes.ATTRIBUTES_UPTODATE_TRUE);
265         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
266             @Override
267             public void onSuccess(final CommitInfo result) {
268                 LOG.trace("Write Attributes uptodate, succeed");
269             }
270
271             @Override
272             public void onFailure(final Throwable throwable) {
273                 LOG.error("Write Attributes uptodate failed", throwable);
274             }
275         }, MoreExecutors.directExecutor());
276     }
277
278     void updateRoutes(final MpReachNlri nlri, final org.opendaylight.yang.gen.v1.urn.opendaylight.params
279             .xml.ns.yang.bgp.message.rev180329.path.attributes.Attributes attributes) {
280         final TablesKey key = new TablesKey(nlri.getAfi(), nlri.getSafi());
281         final TableContext ctx = this.tables.get(key);
282         if (ctx == null) {
283             LOG.debug("No table for {}, not accepting NLRI {}", key, nlri);
284             return;
285         }
286
287         final DOMDataWriteTransaction tx = this.chain.getDomChain().newWriteOnlyTransaction();
288         final Collection<NodeIdentifierWithPredicates> routeKeys = ctx.writeRoutes(tx, nlri, attributes);
289         final Collection<NodeIdentifierWithPredicates> staleRoutes = this.staleRoutesRegistry.get(key);
290         if (staleRoutes != null) {
291             staleRoutes.removeAll(routeKeys);
292         }
293         LOG.trace("Write routes {}", nlri);
294         final FluentFuture<? extends CommitInfo> future = tx.commit();
295         this.submitted = future;
296         future.addCallback(new FutureCallback<CommitInfo>() {
297             @Override
298             public void onSuccess(final CommitInfo result) {
299                 LOG.trace("Write routes {}, succeed", nlri);
300             }
301
302             @Override
303             public void onFailure(final Throwable throwable) {
304                 LOG.error("Write routes failed", throwable);
305             }
306         }, MoreExecutors.directExecutor());
307     }
308
309     void removeRoutes(final MpUnreachNlri nlri) {
310         final TablesKey key = new TablesKey(nlri.getAfi(), nlri.getSafi());
311         final TableContext ctx = this.tables.get(key);
312         if (ctx == null) {
313             LOG.debug("No table for {}, not accepting NLRI {}", key, nlri);
314             return;
315         }
316         LOG.trace("Removing routes {}", nlri);
317         final DOMDataWriteTransaction tx = this.chain.getDomChain().newWriteOnlyTransaction();
318         ctx.removeRoutes(tx, nlri);
319         final FluentFuture<? extends CommitInfo> future = tx.commit();
320         this.submitted = future;
321         future.addCallback(new FutureCallback<CommitInfo>() {
322             @Override
323             public void onSuccess(final CommitInfo result) {
324                 LOG.trace("Removing routes {}, succeed", nlri);
325             }
326
327             @Override
328             public void onFailure(final Throwable throwable) {
329                 LOG.error("Removing routes failed", throwable);
330             }
331         }, MoreExecutors.directExecutor());
332     }
333
334     void releaseChain() {
335         if (this.submitted != null) {
336             try {
337                 this.submitted.get();
338             } catch (final InterruptedException | ExecutionException throwable) {
339                 LOG.error("Write routes failed", throwable);
340             }
341         }
342     }
343
344     void storeStaleRoutes(final Set<TablesKey> gracefulTables) {
345         final CountDownLatch latch = new CountDownLatch(gracefulTables.size());
346
347         try (DOMDataReadOnlyTransaction tx = this.chain.getDomChain().newReadOnlyTransaction()) {
348             for (TablesKey tablesKey : gracefulTables) {
349                 final TableContext ctx = this.tables.get(tablesKey);
350                 if (ctx == null) {
351                     LOG.warn("Missing table for address family {}", tablesKey);
352                     latch.countDown();
353                     continue;
354                 }
355
356                 Futures.addCallback(tx.read(LogicalDatastoreType.OPERATIONAL, ctx.routesPath()),
357                     new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
358                         @Override
359                         public void onSuccess(final Optional<NormalizedNode<?, ?>> routesOptional) {
360                             try {
361                                 if (routesOptional.isPresent()) {
362                                     synchronized (AdjRibInWriter.this.staleRoutesRegistry) {
363                                         final MapNode routesNode = (MapNode) routesOptional.get();
364                                         final List<NodeIdentifierWithPredicates> routes = routesNode.getValue().stream()
365                                                 .map(MapEntryNode::getIdentifier)
366                                                 .collect(Collectors.toList());
367                                         if (!routes.isEmpty()) {
368                                             AdjRibInWriter.this.staleRoutesRegistry.put(tablesKey, routes);
369                                         }
370                                     }
371                                 }
372                             } finally {
373                                 latch.countDown();
374                             }
375                         }
376
377                         @Override
378                         public void onFailure(final Throwable throwable) {
379                             LOG.warn("Failed to store stale routes for table {}", tablesKey, throwable);
380                             latch.countDown();
381                         }
382                     }, MoreExecutors.directExecutor());
383             }
384         }
385
386         try {
387             latch.await();
388         } catch (InterruptedException e) {
389             LOG.warn("Interrupted while waiting to store stale routes with {} tasks of {} to finish", latch.getCount(),
390                 gracefulTables, e);
391         }
392     }
393
394     void removeStaleRoutes(final TablesKey tableKey) {
395         final TableContext ctx = this.tables.get(tableKey);
396         if (ctx == null) {
397             LOG.debug("No table for {}, not removing any stale routes", tableKey);
398             return;
399         }
400         final Collection<NodeIdentifierWithPredicates> routeKeys = this.staleRoutesRegistry.get(tableKey);
401         if (routeKeys == null || routeKeys.isEmpty()) {
402             LOG.debug("No stale routes present in table {}", tableKey);
403             return;
404         }
405         LOG.trace("Removing routes {}", routeKeys);
406         final DOMDataWriteTransaction tx = this.chain.getDomChain().newWriteOnlyTransaction();
407         routeKeys.forEach(routeKey -> {
408             tx.delete(LogicalDatastoreType.OPERATIONAL, ctx.routePath(routeKey));
409         });
410         final FluentFuture<? extends CommitInfo> future = tx.commit();
411         this.submitted = future;
412         future.addCallback(new FutureCallback<CommitInfo>() {
413             @Override
414             public void onSuccess(final CommitInfo result) {
415                 LOG.trace("Removing routes {}, succeed", routeKeys);
416                 synchronized (AdjRibInWriter.this.staleRoutesRegistry) {
417                     staleRoutesRegistry.remove(tableKey);
418                 }
419             }
420
421             @Override
422             public void onFailure(final Throwable throwable) {
423                 LOG.warn("Removing routes {}, failed", routeKeys, throwable);
424             }
425         }, MoreExecutors.directExecutor());
426     }
427
428     FluentFuture<? extends CommitInfo> clearTables(final Set<TablesKey> tablesToClear) {
429         if (tablesToClear == null || tablesToClear.isEmpty()) {
430             return CommitInfo.emptyFluentFuture();
431         }
432
433         final DOMDataWriteTransaction wtx = this.chain.getDomChain().newWriteOnlyTransaction();
434         tablesToClear.forEach(tableKey -> {
435             final TableContext ctx = this.tables.get(tableKey);
436             wtx.delete(LogicalDatastoreType.OPERATIONAL, ctx.routesPath().getParent());
437         });
438         return wtx.commit();
439     }
440 }