Split out EffectiveRibInWriter.processModifications()
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / EffectiveRibInWriter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.collect.ImmutableSet;
15 import com.google.common.util.concurrent.FluentFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.atomic.LongAdder;
25 import javax.annotation.Nonnull;
26 import javax.annotation.concurrent.GuardedBy;
27 import javax.annotation.concurrent.NotThreadSafe;
28 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
29 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
32 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
33 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.mdsal.common.api.CommitInfo;
37 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
38 import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
39 import org.opendaylight.protocol.bgp.rib.impl.spi.RIBSupportContextRegistry;
40 import org.opendaylight.protocol.bgp.rib.impl.spi.RibOutRefresh;
41 import org.opendaylight.protocol.bgp.rib.impl.state.peer.PrefixesInstalledCounters;
42 import org.opendaylight.protocol.bgp.rib.impl.state.peer.PrefixesReceivedCounters;
43 import org.opendaylight.protocol.bgp.rib.spi.RIBSupport;
44 import org.opendaylight.protocol.bgp.rib.spi.policy.BGPRibRoutingPolicy;
45 import org.opendaylight.protocol.bgp.rib.spi.policy.BGPRouteEntryImportParameters;
46 import org.opendaylight.protocol.bgp.route.targetcontrain.spi.ClientRouteTargetContrainCache;
47 import org.opendaylight.protocol.bgp.route.targetcontrain.spi.RouteTargetMembeshipUtil;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev180329.path.attributes.Attributes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerRole;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.Peer;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.PeerKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.peer.AdjRibIn;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.peer.EffectiveRibIn;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.tables.Routes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.Ipv4AddressFamily;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.Ipv6AddressFamily;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.MplsLabeledVpnSubsequentAddressFamily;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.RouteTarget;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.yang.binding.ChildOf;
65 import org.opendaylight.yangtools.yang.binding.ChoiceIn;
66 import org.opendaylight.yangtools.yang.binding.DataObject;
67 import org.opendaylight.yangtools.yang.binding.Identifiable;
68 import org.opendaylight.yangtools.yang.binding.Identifier;
69 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.IdentifiableItem;
71 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
72 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 /**
78  * Implementation of the BGP import policy. Listens on peer's Adj-RIB-In, inspects all inbound
79  * routes in the context of the advertising peer's role and applies the inbound policy.
80  * <p>
81  * Inbound policy is applied as follows:
82  * <p>
83  * 1) if the peer is an eBGP peer, perform attribute replacement and filtering
84  * 2) check if a route is admissible based on attributes attached to it, as well as the
85  * advertising peer's role
86  * 3) output admitting routes with edited attributes into /bgp-rib/rib/peer/effective-rib-in/tables/routes
87  */
88 @NotThreadSafe
89 final class EffectiveRibInWriter implements PrefixesReceivedCounters, PrefixesInstalledCounters,
90         AutoCloseable, ClusteredDataTreeChangeListener<Tables> {
91
92     private static final Logger LOG = LoggerFactory.getLogger(EffectiveRibInWriter.class);
93     static final NodeIdentifier TABLE_ROUTES = new NodeIdentifier(Routes.QNAME);
94     private static final TablesKey IVP4_VPN_TABLE_KEY = new TablesKey(Ipv4AddressFamily.class,
95             MplsLabeledVpnSubsequentAddressFamily.class);
96     private static final TablesKey IVP6_VPN_TABLE_KEY = new TablesKey(Ipv6AddressFamily.class,
97             MplsLabeledVpnSubsequentAddressFamily.class);
98     private final RIBSupportContextRegistry registry;
99     private final KeyedInstanceIdentifier<Peer, PeerKey> peerIId;
100     private final InstanceIdentifier<EffectiveRibIn> effRibTables;
101     private final DataBroker databroker;
102     private final List<RouteTarget> rtMemberships;
103     private final RibOutRefresh vpnTableRefresher;
104     private final ClientRouteTargetContrainCache rtCache;
105     private ListenerRegistration<?> reg;
106     private BindingTransactionChain chain;
107     private final Map<TablesKey, LongAdder> prefixesReceived;
108     private final Map<TablesKey, LongAdder> prefixesInstalled;
109     private final BGPRibRoutingPolicy ribPolicies;
110     private final BGPRouteEntryImportParameters peerImportParameters;
111     private final BGPTableTypeRegistryConsumer tableTypeRegistry;
112     @GuardedBy("this")
113     private FluentFuture<? extends CommitInfo> submitted;
114     private boolean rtMembershipsUpdated;
115
116     EffectiveRibInWriter(
117             final BGPRouteEntryImportParameters peer,
118             final RIB rib,
119             final BindingTransactionChain chain,
120             final KeyedInstanceIdentifier<Peer, PeerKey> peerIId,
121             final Set<TablesKey> tables,
122             final BGPTableTypeRegistryConsumer tableTypeRegistry,
123             final List<RouteTarget> rtMemberships,
124             final ClientRouteTargetContrainCache rtCache) {
125         this.registry = requireNonNull(rib.getRibSupportContext());
126         this.chain = requireNonNull(chain);
127         this.peerIId = requireNonNull(peerIId);
128         this.effRibTables = this.peerIId.child(EffectiveRibIn.class);
129         this.prefixesInstalled = buildPrefixesTables(tables);
130         this.prefixesReceived = buildPrefixesTables(tables);
131         this.ribPolicies = requireNonNull(rib.getRibPolicies());
132         this.databroker = requireNonNull(rib.getDataBroker());
133         this.tableTypeRegistry = requireNonNull(tableTypeRegistry);
134         this.peerImportParameters = peer;
135         this.rtMemberships = rtMemberships;
136         this.rtCache = rtCache;
137         this.vpnTableRefresher = rib;
138     }
139
140     public void init() {
141         final DataTreeIdentifier<Tables> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
142                 this.peerIId.child(AdjRibIn.class).child(Tables.class));
143         LOG.debug("Registered Effective RIB on {}", this.peerIId);
144         this.reg = requireNonNull(this.databroker).registerDataTreeChangeListener(treeId, this);
145     }
146
147     private static Map<TablesKey, LongAdder> buildPrefixesTables(final Set<TablesKey> tables) {
148         final ImmutableMap.Builder<TablesKey, LongAdder> b = ImmutableMap.builder();
149         tables.forEach(table -> b.put(table, new LongAdder()));
150         return b.build();
151     }
152
153     @Override
154     public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Tables>> changes) {
155         if (this.chain == null) {
156             LOG.trace("Chain closed. Ignoring Changes : {}", changes);
157             return;
158         }
159
160         LOG.trace("Data changed called to effective RIB. Change : {}", changes);
161         if (!changes.isEmpty()) {
162             processModifications(changes);
163         }
164
165         //Refresh VPN Table if RT Memberships were updated
166         if (this.rtMembershipsUpdated) {
167             this.vpnTableRefresher.refreshTable(IVP4_VPN_TABLE_KEY, this.peerImportParameters.getFromPeerId());
168             this.vpnTableRefresher.refreshTable(IVP6_VPN_TABLE_KEY, this.peerImportParameters.getFromPeerId());
169             this.rtMembershipsUpdated = false;
170         }
171     }
172
173     @GuardedBy("this")
174     @SuppressWarnings("unchecked")
175     private void processModifications(final Collection<DataTreeModification<Tables>> changes) {
176         final WriteTransaction tx = this.chain.newWriteOnlyTransaction();
177         for (final DataTreeModification<Tables> tc : changes) {
178             final DataObjectModification<Tables> table = tc.getRootNode();
179             final DataObjectModification.ModificationType modificationType = table.getModificationType();
180             switch (modificationType) {
181                 case DELETE:
182                     final Tables removeTable = table.getDataBefore();
183                     final TablesKey tableKey = removeTable.key();
184                     final KeyedInstanceIdentifier<Tables, TablesKey> effectiveTablePath
185                             = this.effRibTables.child(Tables.class, tableKey);
186                     LOG.debug("Delete Effective Table {} modification type {}, "
187                             , effectiveTablePath, modificationType);
188                     tx.delete(LogicalDatastoreType.OPERATIONAL, effectiveTablePath);
189                     CountersUtil.decrement(this.prefixesInstalled.get(tableKey), tableKey);
190                     break;
191                 case SUBTREE_MODIFIED:
192                     final Tables before = table.getDataBefore();
193                     final Tables after = table.getDataAfter();
194                     final TablesKey tk = after.key();
195                     LOG.debug("Process table {} type {}, dataAfter {}, dataBefore {}",
196                             tk, modificationType, after, before);
197
198                     final KeyedInstanceIdentifier<Tables, TablesKey> tablePath
199                             = this.effRibTables.child(Tables.class, tk);
200                     final RIBSupport ribSupport = this.registry.getRIBSupport(tk);
201                     if (ribSupport == null) {
202                         break;
203                     }
204
205                     final DataObjectModification<org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp
206                         .rib.rev180329.rib.tables.Attributes> adjRibAttrsChanged = table.getModifiedChildContainer(
207                             org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329
208                                 .rib.tables.Attributes.class);
209                     if (adjRibAttrsChanged != null) {
210                         tx.put(LogicalDatastoreType.OPERATIONAL,
211                             tablePath.child(org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp
212                                 .rib.rev180329.rib.tables.Attributes.class), adjRibAttrsChanged.getDataAfter());
213                     }
214
215                     final DataObjectModification routesChangesContainer = table.getModifiedChildContainer(
216                         ribSupport.routesCaseClass(), ribSupport.routesContainerClass());
217
218                     if (routesChangesContainer == null) {
219                         break;
220                     }
221                     updateRoutes(tx, tk, ribSupport, tablePath, routesChangesContainer.getModifiedChildren());
222                     break;
223                 case WRITE:
224                     writeTable(tx, table);
225                     break;
226                 default:
227                     LOG.warn("Ignoring unhandled root {}", table);
228                     break;
229             }
230         }
231
232         final FluentFuture<? extends CommitInfo> future = tx.commit();
233         this.submitted = future;
234         future.addCallback(new FutureCallback<CommitInfo>() {
235             @Override
236             public void onSuccess(final CommitInfo result) {
237                 LOG.trace("Successful commit");
238             }
239
240             @Override
241             public void onFailure(final Throwable trw) {
242                 LOG.error("Failed commit", trw);
243             }
244         }, MoreExecutors.directExecutor());
245     }
246
247     @SuppressWarnings("unchecked")
248     private <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>,
249         R extends Route & ChildOf<? super S> & Identifiable<I>, I extends Identifier<R>> void updateRoutes(
250             final WriteTransaction tx,
251             final TablesKey tableKey, final RIBSupport<C, S, R, I> ribSupport,
252             final KeyedInstanceIdentifier<Tables, TablesKey> tablePath,
253             final Collection<DataObjectModification<R>> routeChanges) {
254         for (final DataObjectModification<R> routeChanged : routeChanges) {
255             final PathArgument routeChangeId = routeChanged.getIdentifier();
256             verify(routeChangeId instanceof IdentifiableItem, "Route change %s has invalid identifier %s",
257                 routeChanged, routeChangeId);
258             final I routeKey = ((IdentifiableItem<R, I>) routeChangeId).getKey();
259
260             switch (routeChanged.getModificationType()) {
261                 case SUBTREE_MODIFIED:
262                 case WRITE:
263                     writeRoutes(tx, tableKey, ribSupport, tablePath, routeKey, routeChanged.getDataAfter());
264                     break;
265                 case DELETE:
266                     final InstanceIdentifier<R> routeIID = ribSupport.createRouteIdentifier(tablePath, routeKey);
267                     deleteRoutes(routeIID, routeChanged.getDataBefore(), tx);
268                     break;
269             }
270         }
271     }
272
273     private <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>,
274         R extends Route & ChildOf<? super S> & Identifiable<I>, I extends Identifier<R>> void writeRoutes(
275                 final WriteTransaction tx, final TablesKey tk, final RIBSupport<C, S, R, I> ribSupport,
276             final KeyedInstanceIdentifier<Tables, TablesKey> tablePath, final I routeKey,
277             final R route) {
278         final InstanceIdentifier<R> routeIID = ribSupport.createRouteIdentifier(tablePath, routeKey);
279         CountersUtil.increment(this.prefixesReceived.get(tk), tk);
280         final Optional<Attributes> effAtt = this.ribPolicies
281                 .applyImportPolicies(this.peerImportParameters, route.getAttributes(),
282                         tableTypeRegistry.getAfiSafiType(ribSupport.getTablesKey()).get());
283         if (effAtt.isPresent()) {
284             final Optional<RouteTarget> rtMembership = RouteTargetMembeshipUtil.getRT(route);
285             if (rtMembership.isPresent()) {
286                 final RouteTarget rt = rtMembership.get();
287                 if(PeerRole.Ebgp != this.peerImportParameters.getFromPeerRole()) {
288                     this.rtCache.cacheRoute(route);
289                 }
290                 this.rtMemberships.add(rt);
291                 this.rtMembershipsUpdated = true;
292             }
293             CountersUtil.increment(this.prefixesInstalled.get(tk), tk);
294             tx.put(LogicalDatastoreType.OPERATIONAL, routeIID, route);
295             tx.put(LogicalDatastoreType.OPERATIONAL, routeIID.child(Attributes.class), effAtt.get());
296         } else {
297             deleteRoutes(routeIID, route, tx);
298
299         }
300     }
301
302     private <R extends Route> void deleteRoutes(final InstanceIdentifier<R> routeIID,
303             final R route, final WriteTransaction tx) {
304         final Optional<RouteTarget> rtMembership = RouteTargetMembeshipUtil.getRT(route);
305         if (rtMembership.isPresent()) {
306             if(PeerRole.Ebgp != this.peerImportParameters.getFromPeerRole()) {
307                 this.rtCache.uncacheRoute(route);
308             }
309             this.rtMemberships.remove(rtMembership.get());
310             this.rtMembershipsUpdated = true;
311         }
312         tx.delete(LogicalDatastoreType.OPERATIONAL, routeIID);
313     }
314
315     @SuppressWarnings("unchecked")
316     private void writeTable(final WriteTransaction tx, final DataObjectModification<Tables> table) {
317         final Tables newTable = table.getDataAfter();
318         if (newTable == null) {
319             return;
320         }
321         final TablesKey tableKey = newTable.key();
322         final KeyedInstanceIdentifier<Tables, TablesKey> tablePath
323                 = this.effRibTables.child(Tables.class, tableKey);
324
325         // Create an empty table
326         LOG.trace("Create Empty table at {}", tablePath);
327         if (table.getDataBefore() == null) {
328             tx.put(LogicalDatastoreType.OPERATIONAL, tablePath, new TablesBuilder()
329                     .setAfi(tableKey.getAfi()).setSafi(tableKey.getSafi())
330                     .setAttributes(newTable.getAttributes()).build());
331         }
332
333         final RIBSupport ribSupport = this.registry.getRIBSupport(tableKey);
334         final Routes routes = newTable.getRoutes();
335         if (ribSupport == null || routes == null) {
336             return;
337         }
338
339         final DataObjectModification routesChangesContainer =
340                 table.getModifiedChildContainer(ribSupport.routesCaseClass(), ribSupport.routesContainerClass());
341
342         if (routesChangesContainer == null) {
343             return;
344         }
345         updateRoutes(tx, tableKey, ribSupport, tablePath, routesChangesContainer.getModifiedChildren());
346     }
347
348     @Override
349     public synchronized void close() {
350         if (this.reg != null) {
351             this.reg.close();
352             this.reg = null;
353         }
354         if (this.submitted != null) {
355             try {
356                 this.submitted.get();
357             } catch (final InterruptedException | ExecutionException throwable) {
358                 LOG.error("Write routes failed", throwable);
359             }
360         }
361         if (this.chain != null) {
362             this.chain.close();
363             this.chain = null;
364         }
365         this.prefixesReceived.values().forEach(LongAdder::reset);
366         this.prefixesInstalled.values().forEach(LongAdder::reset);
367     }
368
369     @Override
370     public long getPrefixedReceivedCount(final TablesKey tablesKey) {
371         final LongAdder counter = this.prefixesReceived.get(tablesKey);
372         if (counter == null) {
373             return 0;
374         }
375         return counter.longValue();
376     }
377
378     @Override
379     public Set<TablesKey> getTableKeys() {
380         return ImmutableSet.copyOf(this.prefixesReceived.keySet());
381     }
382
383     @Override
384     public boolean isSupported(final TablesKey tablesKey) {
385         return this.prefixesReceived.containsKey(tablesKey);
386     }
387
388     @Override
389     public long getPrefixedInstalledCount(final TablesKey tablesKey) {
390         final LongAdder counter = this.prefixesInstalled.get(tablesKey);
391         if (counter == null) {
392             return 0;
393         }
394         return counter.longValue();
395     }
396
397     @Override
398     public long getTotalPrefixesInstalled() {
399         return this.prefixesInstalled.values().stream().mapToLong(LongAdder::longValue).sum();
400     }
401 }