Bump upstreams
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / AdjRibOutListener.java
index beb1177e645a496a100a434047f6c700ec2479c9..f57c877acc2826f9905abe7edd6ded682a9f7c3f 100644 (file)
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
+import static java.util.Objects.requireNonNull;
+import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.ADJRIBOUT_NID;
+import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.PEER_NID;
+import static org.opendaylight.protocol.bgp.rib.spi.RIBNodeIdentifiers.TABLES_NID;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker.DataTreeChangeExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.protocol.bgp.rib.impl.spi.Codecs;
 import org.opendaylight.protocol.bgp.rib.impl.spi.CodecsRegistry;
-import org.opendaylight.protocol.bgp.rib.impl.stats.UnsignedInt32Counter;
+import org.opendaylight.protocol.bgp.rib.impl.state.peer.PrefixesSentCounters;
 import org.opendaylight.protocol.bgp.rib.spi.IdentifierUtils;
 import org.opendaylight.protocol.bgp.rib.spi.RIBSupport;
-import org.opendaylight.protocol.bgp.rib.spi.RibSupportUtils;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.ipv4.routes.ipv4.routes.Ipv4Route;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Update;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.UpdateBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.path.attributes.Attributes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.update.message.NlriBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.update.message.WithdrawnRoutesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.Peer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.peer.AdjRibOut;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.ipv4.routes.ipv4.routes.Ipv4Route;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.PathId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.Update;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.UpdateBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.path.attributes.Attributes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.update.message.NlriBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev200120.update.message.WithdrawnRoutesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerId;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Instantiated for each peer and table, listens on a particular peer's adj-rib-out,
- * performs transcoding to BA form (message) and sends it down the channel.
+ * Instantiated for each peer and table, listens on a particular peer's adj-rib-out, performs transcoding to BA form
+ * (message) and sends it down the channel. This class is NOT thread-safe.
  */
-@NotThreadSafe
-final class AdjRibOutListener implements ClusteredDOMDataTreeChangeListener {
-
+final class AdjRibOutListener implements DOMDataTreeChangeListener, PrefixesSentCounters {
     private static final Logger LOG = LoggerFactory.getLogger(AdjRibOutListener.class);
-
-    static final QName PREFIX_QNAME = QName.create(Ipv4Route.QNAME, "prefix").intern();
-    private final YangInstanceIdentifier.NodeIdentifier routeKeyLeaf = new YangInstanceIdentifier.NodeIdentifier(PREFIX_QNAME);
+    private static final QName PREFIX_QNAME = QName.create(Ipv4Route.QNAME, "prefix").intern();
+    private static final QName PATHID_QNAME = QName.create(Ipv4Route.QNAME, "path-id").intern();
+    private static final NodeIdentifier ROUTE_KEY_PREFIX_LEAF = NodeIdentifier.create(PREFIX_QNAME);
+    private static final NodeIdentifier ROUTE_KEY_PATHID_LEAF = NodeIdentifier.create(PATHID_QNAME);
 
     private final ChannelOutputLimiter session;
     private final Codecs codecs;
-    private final RIBSupport support;
+    private final RIBSupport<?, ?> support;
+    // FIXME: this field needs to be eliminated: either subclass this class or create a filtering ribsupport
     private final boolean mpSupport;
-    private final ListenerRegistration<AdjRibOutListener> registerDataTreeChangeListener;
-    private final UnsignedInt32Counter routeCounter;
-
-    private AdjRibOutListener(final PeerId peerId, final TablesKey tablesKey, final YangInstanceIdentifier ribId,
-        final CodecsRegistry registry, final RIBSupport support, final DOMDataTreeChangeService service,
-        final ChannelOutputLimiter session, final boolean mpSupport, final UnsignedInt32Counter routeCounter) {
-        this.session = Preconditions.checkNotNull(session);
-        this.support = Preconditions.checkNotNull(support);
-        this.codecs = registry.getCodecs(this.support);
+    private final Registration registerDataTreeChangeListener;
+    private final LongAdder prefixesSentCounter = new LongAdder();
+    private boolean initalState;
+
+    private AdjRibOutListener(final PeerId peerId, final YangInstanceIdentifier ribId, final CodecsRegistry registry,
+            final RIBSupport<?, ?> support, final DataTreeChangeExtension service, final ChannelOutputLimiter session,
+            final boolean mpSupport) {
+        this.session = requireNonNull(session);
+        this.support = requireNonNull(support);
+        codecs = registry.getCodecs(this.support);
         this.mpSupport = mpSupport;
-        final YangInstanceIdentifier adjRibOutId =  ribId.node(Peer.QNAME).node(IdentifierUtils.domPeerId(peerId)).node(AdjRibOut.QNAME).node(Tables.QNAME).node(RibSupportUtils.toYangTablesKey(tablesKey));
-        this.registerDataTreeChangeListener = service.registerDataTreeChangeListener(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, adjRibOutId), this);
-        this.routeCounter = routeCounter;
+        final YangInstanceIdentifier adjRibOutId = ribId.node(PEER_NID).node(IdentifierUtils.domPeerId(peerId))
+                .node(ADJRIBOUT_NID).node(TABLES_NID).node(support.tablesKey());
+        /*
+         *  After listener registration should always be executed ODTC. Even when empty table is present
+         *  in data store. Within this first ODTC execution we should advertise present routes and than
+         *  send EOR marker. initialState flag is distinguishing between first ODTC execution and the rest.
+         */
+        initalState = true;
+        registerDataTreeChangeListener = service.registerTreeChangeListener(
+                DOMDataTreeIdentifier.of(LogicalDatastoreType.OPERATIONAL, adjRibOutId), this);
+    }
+
+    static AdjRibOutListener create(
+            final @NonNull PeerId peerId,
+            final @NonNull YangInstanceIdentifier ribId,
+            final @NonNull CodecsRegistry registry,
+            final @NonNull RIBSupport<?, ?> support,
+            final @NonNull DataTreeChangeExtension service,
+            final @NonNull ChannelOutputLimiter session,
+            final boolean mpSupport) {
+        return new AdjRibOutListener(peerId, ribId, registry, support, service, session, mpSupport);
     }
 
-    static AdjRibOutListener create(@Nonnull final PeerId peerId, @Nonnull final TablesKey tablesKey, @Nonnull final YangInstanceIdentifier ribId,
-        @Nonnull final CodecsRegistry registry, @Nonnull final RIBSupport support, @Nonnull final DOMDataTreeChangeService service,
-        @Nonnull final ChannelOutputLimiter session, @Nonnull final boolean mpSupport, @Nonnull final UnsignedInt32Counter routeCounter
-    ) {
-        return new AdjRibOutListener(peerId, tablesKey, ribId, registry, support, service, session, mpSupport, routeCounter);
+    @Override
+    public void onInitialData() {
+        // FIXME: flush initial state
     }
 
     @Override
-    public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
+    public void onDataTreeChanged(final List<DataTreeCandidate> changes) {
         LOG.debug("Data change received for AdjRibOut {}", changes);
-        for (final DataTreeCandidate tc : changes) {
-            LOG.trace("Change {} type {}", tc.getRootNode(), tc.getRootNode().getModificationType());
-            for (final DataTreeCandidateNode child : tc.getRootNode().getChildNodes()) {
-                processSupportedFamilyRoutes(child);
+        for (var tc : changes) {
+            LOG.trace("Change {} type {}", tc.getRootNode(), tc.getRootNode().modificationType());
+            for (var child : tc.getRootNode().childNodes()) {
+                for (var route : support.changedRoutes(child)) {
+                    processRouteChange(route);
+                }
             }
         }
-        this.session.flush();
-    }
-
-    private void processSupportedFamilyRoutes(final DataTreeCandidateNode child) {
-        for (final DataTreeCandidateNode route : this.support.changedRoutes(child)) {
-            processRouteChange(route);
+        if (initalState) {
+            session.write(BgpPeerUtil.createEndOfRib(support.getTablesKey()));
+            initalState = false;
         }
+        session.flush();
     }
 
     private void processRouteChange(final DataTreeCandidateNode route) {
         final Update update;
-        switch (route.getModificationType()) {
-        case UNMODIFIED:
-            LOG.debug("Skipping unmodified route {}", route.getIdentifier());
-            return;
-        case DELETE:
-        case DISAPPEARED:
-            // FIXME: we can batch deletions into a single batch
-            update = withdraw((MapEntryNode) route.getDataBefore().get());
-            LOG.debug("Withdrawing routes {}", update);
-            break;
-        case APPEARED:
-        case SUBTREE_MODIFIED:
-        case WRITE:
-            update = advertise((MapEntryNode) route.getDataAfter().get());
-            LOG.debug("Advertising routes {}", update);
-            break;
-        default:
-            LOG.warn("Ignoring unhandled modification type {}", route.getModificationType());
-            return;
+        switch (route.modificationType()) {
+            case UNMODIFIED:
+                LOG.debug("Skipping unmodified route {}", route.name());
+                return;
+            case DELETE:
+            case DISAPPEARED:
+                // FIXME: we can batch deletions into a single batch
+                update = withdraw((MapEntryNode) route.getDataBefore());
+                LOG.debug("Withdrawing routes {}", update);
+                break;
+            case APPEARED:
+            case SUBTREE_MODIFIED:
+            case WRITE:
+                update = advertise((MapEntryNode) route.getDataAfter());
+                LOG.debug("Advertising routes {}", update);
+                break;
+            default:
+                LOG.warn("Ignoring unhandled modification type {}", route.modificationType());
+                return;
         }
-        this.session.write(update);
+        session.write(update);
     }
 
     private Attributes routeAttributes(final MapEntryNode route) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("AdjRibOut parsing route {}", NormalizedNodes.toStringTree(route));
         }
-        final ContainerNode advertisedAttrs = (ContainerNode) NormalizedNodes.findNode(route, this.support.routeAttributesIdentifier()).orNull();
-        return this.codecs.deserializeAttributes(advertisedAttrs);
+        final ContainerNode advertisedAttrs = (ContainerNode) NormalizedNodes.findNode(route,
+                support.routeAttributesIdentifier()).orElse(null);
+        return codecs.deserializeAttributes(advertisedAttrs);
     }
 
     private Update withdraw(final MapEntryNode route) {
-        this.routeCounter.decreaseCount();
-        if (!this.mpSupport) {
-            return buildUpdate(Collections.<MapEntryNode>emptyList(), Collections.singleton(route), routeAttributes(route));
-        }
-        return this.support.buildUpdate(Collections.<MapEntryNode>emptyList(), Collections.singleton(route), routeAttributes(route));
+        return mpSupport
+            ? support.buildUpdate(Collections.emptyList(), Collections.singleton(route), routeAttributes(route))
+                : buildUpdate(Collections.emptyList(), Collections.singleton(route), routeAttributes(route));
     }
 
     private Update advertise(final MapEntryNode route) {
-        this.routeCounter.increaseCount();
-        if (!this.mpSupport) {
-            return buildUpdate(Collections.singleton(route), Collections.<MapEntryNode>emptyList(), routeAttributes(route));
-        }
-        return this.support.buildUpdate(Collections.singleton(route), Collections.<MapEntryNode>emptyList(), routeAttributes(route));
+        prefixesSentCounter.increment();
+        return mpSupport
+            ? support.buildUpdate(Collections.singleton(route), Collections.emptyList(), routeAttributes(route))
+                : buildUpdate(Collections.singleton(route), Collections.emptyList(), routeAttributes(route));
     }
 
-    private Update buildUpdate(@Nonnull final Collection<MapEntryNode> advertised, @Nonnull final Collection<MapEntryNode> withdrawn, @Nonnull final Attributes attr) {
-        final UpdateBuilder ub = new UpdateBuilder()
-            .setWithdrawnRoutes(new WithdrawnRoutesBuilder().setWithdrawnRoutes(extractPrefixes(withdrawn)).build())
-            .setNlri(new NlriBuilder().setNlri(extractPrefixes(advertised)).build());
-        ub.setAttributes(attr);
-        return ub.build();
+    private static Update buildUpdate(
+            final @NonNull Collection<MapEntryNode> advertised,
+            final @NonNull Collection<MapEntryNode> withdrawn,
+            final @NonNull Attributes attr) {
+        return new UpdateBuilder()
+            .setWithdrawnRoutes(withdrawn.stream()
+                .map(ipv4Route -> new WithdrawnRoutesBuilder()
+                    .setPrefix(extractPrefix(ipv4Route))
+                    .setPathId(extractPathId(ipv4Route))
+                    .build())
+                .collect(Collectors.toList()))
+            .setNlri(advertised.stream()
+                .map(ipv4Route -> new NlriBuilder()
+                    .setPrefix(extractPrefix(ipv4Route))
+                    .setPathId(extractPathId(ipv4Route)).build())
+                .collect(Collectors.toList()))
+            .setAttributes(attr).build();
     }
 
-    private List<Ipv4Prefix> extractPrefixes(final Collection<MapEntryNode> routes) {
-        final List<Ipv4Prefix> prefs = new ArrayList<>(routes.size());
-        for (final MapEntryNode ipv4Route : routes) {
-            final String prefix = (String) ipv4Route.getChild(this.routeKeyLeaf).get().getValue();
-            prefs.add(new Ipv4Prefix(prefix));
-        }
-        return prefs;
+    private static Ipv4Prefix extractPrefix(final MapEntryNode ipv4Route) {
+        return new Ipv4Prefix((String) ipv4Route.getChildByArg(ROUTE_KEY_PREFIX_LEAF).body());
+    }
+
+    private static PathId extractPathId(final MapEntryNode ipv4Route) {
+        final var pathId = ipv4Route.childByArg(ROUTE_KEY_PATHID_LEAF);
+        return pathId == null ? null : new PathId((Uint32) pathId.body());
     }
 
     public void close() {
-        this.registerDataTreeChangeListener.close();
+        registerDataTreeChangeListener.close();
     }
 
     boolean isMpSupported() {
-        return this.mpSupport;
+        return mpSupport;
+    }
+
+    @Override
+    public long getPrefixesSentCount() {
+        return prefixesSentCounter.longValue();
     }
 }