BGPCEP-754: Start using default import policy
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / EffectiveRibInWriter.java
index 45a32878649c981d8da3fe7ebb87598ddad565d8..e39e6f9e015cd935f12bd52d7181cb237c212542 100644 (file)
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionConsumerContext;
+import org.opendaylight.protocol.bgp.rib.impl.spi.AbstractImportPolicy;
+import org.opendaylight.protocol.bgp.rib.impl.spi.ImportPolicyPeerTracker;
+import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
+import org.opendaylight.protocol.bgp.rib.impl.spi.RIBSupportContext;
+import org.opendaylight.protocol.bgp.rib.impl.spi.RIBSupportContextRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.state.peer.PrefixesInstalledCounters;
+import org.opendaylight.protocol.bgp.rib.impl.state.peer.PrefixesReceivedCounters;
+import org.opendaylight.protocol.bgp.rib.spi.BGPPeerTracker;
+import org.opendaylight.protocol.bgp.rib.spi.IdentifierUtils;
 import org.opendaylight.protocol.bgp.rib.spi.RIBSupport;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerRole;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.Peer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
+import org.opendaylight.protocol.bgp.rib.spi.entry.AttributeBindingCodecSerializer;
+import org.opendaylight.protocol.bgp.rib.spi.policy.BGPRibRoutingPolicy;
+import org.opendaylight.protocol.bgp.rib.spi.policy.BGPRouteEntryImportParameters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev171207.path.attributes.Attributes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev171207.PeerId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev171207.PeerRole;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev171207.bgp.rib.rib.peer.AdjRibIn;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev171207.bgp.rib.rib.peer.EffectiveRibIn;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev171207.rib.Tables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev171207.rib.TablesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev171207.rib.tables.Routes;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of the BGP import policy. Listens on all Adj-RIB-In, inspects all inbound
+ * Implementation of the BGP import policy. Listens on peer's Adj-RIB-In, inspects all inbound
  * routes in the context of the advertising peer's role and applies the inbound policy.
- *
+ * <p>
  * Inbound policy is applied as follows:
- *
+ * <p>
  * 1) if the peer is an eBGP peer, perform attribute replacement and filtering
  * 2) check if a route is admissible based on attributes attached to it, as well as the
- *    advertising peer's role
+ * advertising peer's role
  * 3) output admitting routes with edited attributes into /bgp-rib/rib/peer/effective-rib-in/tables/routes
- *
- * Note that we maintain the peer roles using a DCL, even if we could look up our internal
- * structures. This is done so we maintain causality and loose coupling.
  */
 @NotThreadSafe
-final class EffectiveRibInWriter {
-    private static final Predicate<PathArgument> IS_PEER = new Predicate<PathArgument>() {
-        @Override
-        public boolean apply(final PathArgument input) {
-            return input.getNodeType().equals(Peer.QNAME);
-        }
-    };
-    private static final Predicate<PathArgument> IS_TABLES = new Predicate<PathArgument>() {
-        @Override
-        public boolean apply(final PathArgument input) {
-            return input.getNodeType().equals(Tables.QNAME);
-        }
-    };
+final class EffectiveRibInWriter implements PrefixesReceivedCounters, PrefixesInstalledCounters, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(EffectiveRibInWriter.class);
-    private static final QName PEER_ID = QName.create(Peer.QNAME, "peer-id");
+    static final NodeIdentifier TABLE_ROUTES = new NodeIdentifier(Routes.QNAME);
+
+    private final class AdjInTracker implements PrefixesReceivedCounters, PrefixesInstalledCounters, AutoCloseable,
+            ClusteredDOMDataTreeChangeListener {
+        private final RIBSupportContextRegistry registry;
+        private final YangInstanceIdentifier peerIId;
+        private final YangInstanceIdentifier effRibTables;
+        private final ListenerRegistration<?> reg;
+        private final DOMTransactionChain chain;
+        private final Map<TablesKey, LongAdder> prefixesReceived;
+        private final Map<TablesKey, LongAdder> prefixesInstalled;
+        private final BGPRibRoutingPolicy ribPolicies;
+        private final BGPPeerTracker peerTracker;
+        private final AttributeBindingCodecSerializer attBindingCodecSerializer;
+        private final PeerId peerId;
 
-    // FIXME: implement as id.firstIdentifierOf(IS_PEER), null indicating not found
-    private static final NodeIdentifierWithPredicates firstKeyOf(final YangInstanceIdentifier id, final Predicate<PathArgument> match) {
-        final PathArgument ret = Iterables.find(id.getPathArguments(), IS_PEER);
-        Preconditions.checkArgument(ret instanceof NodeIdentifierWithPredicates, "Non-key peer identifier %s", ret);
-        return (NodeIdentifierWithPredicates) ret;
-    }
+        AdjInTracker(final RIB rib,
+                final DOMTransactionChain chain,
+                final YangInstanceIdentifier peerIId,
+                @Nonnull final Set<TablesKey> tables) {
+            this.registry = requireNonNull(rib.getRibSupportContext());
+            this.chain = requireNonNull(chain);
+            this.peerIId = requireNonNull(peerIId);
+            this.effRibTables = this.peerIId.node(EffectiveRibIn.QNAME).node(Tables.QNAME);
+            this.prefixesInstalled = buildPrefixesTables(tables);
+            this.prefixesReceived = buildPrefixesTables(tables);
+            this.ribPolicies = requireNonNull(rib.getRibPolicies());
+            this.peerTracker = requireNonNull(rib.getPeerTracker());
+            this.attBindingCodecSerializer = rib;
+            this.peerId = IdentifierUtils.peerId((NodeIdentifierWithPredicates) peerIId.getLastPathArgument());
+            final DOMDataTreeIdentifier treeId = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+                    this.peerIId.node(AdjRibIn.QNAME).node(Tables.QNAME));
+            LOG.debug("Registered Effective RIB on {}", this.peerIId);
+            this.reg = requireNonNull(rib.getService()).registerDataTreeChangeListener(treeId, this);
+        }
 
-    static final NodeIdentifierWithPredicates peerKey(final YangInstanceIdentifier id) {
-        return firstKeyOf(id, IS_PEER);
-    }
+        private Map<TablesKey, LongAdder> buildPrefixesTables(final Set<TablesKey> tables) {
+            final ImmutableMap.Builder<TablesKey, LongAdder> b = ImmutableMap.builder();
+            tables.forEach(table -> b.put(table, new LongAdder()));
+            return b.build();
+        }
 
-    static final PeerId peerId(final NodeIdentifierWithPredicates peerKey) {
-        return (PeerId) peerKey.getKeyValues().get(PEER_ID);
-    }
+        private void processRoute(final DOMDataWriteTransaction tx, final RIBSupport ribSupport, final AbstractImportPolicy policy,
+                final YangInstanceIdentifier routesPath, final DataTreeCandidateNode route) {
+            LOG.debug("Process route {}", route.getIdentifier());
+            final YangInstanceIdentifier routeId = ribSupport.routePath(routesPath, route.getIdentifier());
+            final TablesKey tablesKey = new TablesKey(ribSupport.getAfi(), ribSupport.getSafi());
+            switch (route.getModificationType()) {
+                case DELETE:
+                case DISAPPEARED:
+                    tx.delete(LogicalDatastoreType.OPERATIONAL, routeId);
+                    LOG.debug("Route deleted. routeId={}", routeId);
+                    CountersUtil.decrement(this.prefixesInstalled.get(tablesKey), tablesKey);
+                    break;
+                case UNMODIFIED:
+                    // No-op
+                    break;
+                case APPEARED:
+                case SUBTREE_MODIFIED:
+                case WRITE:
+                    final NormalizedNode<?, ?> advRoute = route.getDataAfter().get();
+                    tx.put(LogicalDatastoreType.OPERATIONAL, routeId, advRoute);
+                    CountersUtil.increment(this.prefixesReceived.get(tablesKey), tablesKey);
+                    // Lookup per-table attributes from RIBSupport
+
+                    final NodeIdentifierWithPredicates routeIdentifier = ribSupport
+                            .createRouteKeyPathArgument((NodeIdentifierWithPredicates) route.getIdentifier());
+                    Optional<Attributes> advertisedAttrs = this.attBindingCodecSerializer
+                            .getAttributes(ribSupport, routeIdentifier, advRoute);
+
+                    final Optional<Attributes> effectiveAttrs;
+                    if (advertisedAttrs.isPresent()) {
+                        final PeerRole peerRole = this.peerTracker.getRole(this.peerId);
+                        final BGPRouteEntryImportParameters ribPolicyParameters =
+                                new BGPRouteEntryImportParametersImpl(
+                                        (NodeIdentifierWithPredicates) route.getIdentifier(), this.peerId, peerRole);
+                        effectiveAttrs = this.ribPolicies
+                                .applyImportPolicies(ribPolicyParameters, advertisedAttrs.get());
+                        LOG.debug("Route {} effective attributes {} towards {}", route.getIdentifier(), effectiveAttrs,
+                                routeId);
 
-    private static final NodeIdentifierWithPredicates tableKey(final YangInstanceIdentifier id) {
-        return firstKeyOf(id, IS_TABLES);
-    }
+                    } else {
+                        effectiveAttrs = Optional.empty();
+                    }
 
-    /**
-     * Maintains the mapping of PeerId -> Role inside. We are subscribed to our target leaf,
-     * but that is a wildcard:
-     *     /bgp-rib/rib/peer/peer-role
-     *
-     * MD-SAL assumption: we are getting one {@link DataTreeCandidate} for each expanded
-     *                    wildcard path, so are searching for a particular key.
-     */
-    private final class PeerRoleListener implements DOMDataTreeChangeListener {
-        @Override
-        public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
-            synchronized (policies) {
-                for (DataTreeCandidate tc : changes) {
-                    // Obtain the peer's key
-                    final NodeIdentifierWithPredicates peerKey = peerKey(tc.getRootPath());
-
-                    // Check for removal
-                    final Optional<NormalizedNode<?, ?>> maybePeerRole = tc.getRootNode().getDataAfter();
-                    if (maybePeerRole.isPresent()) {
-                        final LeafNode<?> peerRoleLeaf = (LeafNode<?>) maybePeerRole.get();
-                        // FIXME: need codec here
-                        final PeerRole peerRole = (PeerRole) peerRoleLeaf.getValue();
-
-                        // Lookup policy based on role
-                        final AbstractImportPolicy policy = AbstractImportPolicy.forRole(peerRole);
-
-                        // Update lookup map
-                        policies.put(peerId(peerKey), policy);
+                    LOG.debug("Route {} effective attributes {} towards {}", route.getIdentifier(), effectiveAttrs, routeId);
+
+                    final Optional<ContainerNode> normEffAtt = this.attBindingCodecSerializer
+                            .toNormalizedNodeAttribute(ribSupport, routeIdentifier, effectiveAttrs);
+                    if (normEffAtt.isPresent()) {
+                        tx.put(LogicalDatastoreType.OPERATIONAL,
+                                routeId.node(ribSupport.routeAttributesIdentifier()), normEffAtt.get());
+                        if (route.getModificationType() == ModificationType.WRITE) {
+                            CountersUtil.increment(this.prefixesInstalled.get(tablesKey), tablesKey);
+                        }
                     } else {
-                        policies.remove(peerId(peerKey));
+                        LOG.warn("Route {} advertised empty attributes", routeId);
+                        tx.delete(LogicalDatastoreType.OPERATIONAL, routeId);
                     }
-                }
+                    break;
+                default:
+                    LOG.warn("Ignoring unhandled route {}", route);
+                    break;
             }
         }
-    }
 
-    /**
-     * Maintains the individual routes for a particular table's routes under:
-     *     /bgp-rib/rib/peer/adj-rib-in/tables/routes
-     */
-    private final class TableRouteListener implements DOMDataTreeChangeListener {
-        private final NodeIdentifierWithPredicates tableKey;
-        private final YangInstanceIdentifier target;
-        private final RIBSupport ribSupport;
-        private final PeerId peerId;
+        private void processTableChildren(final DOMDataWriteTransaction tx, final RIBSupport ribSupport, final YangInstanceIdentifier tablePath, final Collection<DataTreeCandidateNode> children) {
+            for (final DataTreeCandidateNode child : children) {
+                final PathArgument childIdentifier = child.getIdentifier();
+                final Optional<NormalizedNode<?, ?>> childDataAfter = child.getDataAfter();
+                final TablesKey tablesKey = new TablesKey(ribSupport.getAfi(), ribSupport.getSafi());
+                LOG.debug("Process table {} type {}, dataAfter {}, dataBefore {}", childIdentifier, child
+                        .getModificationType(), childDataAfter, child.getDataBefore());
+                final YangInstanceIdentifier childPath = tablePath.node(childIdentifier);
+                switch (child.getModificationType()) {
+                    case DELETE:
+                    case DISAPPEARED:
+                        tx.delete(LogicalDatastoreType.OPERATIONAL, childPath);
+                        LOG.debug("Route deleted. routeId={}", childPath);
+                        CountersUtil.decrement(this.prefixesInstalled.get(tablesKey), tablesKey);
+                        break;
+                    case UNMODIFIED:
+                        // No-op
+                        break;
+                    case SUBTREE_MODIFIED:
+                        processModifiedRouteTables(child, childIdentifier, tx, ribSupport, EffectiveRibInWriter.this.importPolicy, childPath, childDataAfter);
+                        break;
+                    case APPEARED:
+                    case WRITE:
+                        writeRouteTables(child, childIdentifier, tx, ribSupport, EffectiveRibInWriter.this.importPolicy, childPath, childDataAfter);
+
+                        break;
+                    default:
+                        LOG.warn("Ignoring unhandled child {}", child);
+                        break;
+                }
+            }
+        }
 
-        TableRouteListener(final RIBSupport ribSupport, final NodeIdentifierWithPredicates peerKey, final NodeIdentifierWithPredicates tableKey) {
-            this.ribSupport = Preconditions.checkNotNull(ribSupport);
-            this.tableKey = Preconditions.checkNotNull(tableKey);
+        private void processModifiedRouteTables(final DataTreeCandidateNode child, final PathArgument childIdentifier, final DOMDataWriteTransaction tx,
+                final RIBSupport ribSupport, final AbstractImportPolicy policy, final YangInstanceIdentifier childPath, final Optional<NormalizedNode<?, ?>> childDataAfter) {
+            if (TABLE_ROUTES.equals(childIdentifier)) {
+                for (final DataTreeCandidateNode route : ribSupport.changedRoutes(child)) {
+                    processRoute(tx, ribSupport, policy, childPath, route);
+                }
+            } else {
+                tx.put(LogicalDatastoreType.OPERATIONAL, childPath, childDataAfter.get());
+            }
+        }
 
-            // Lookup peer ID
-            this.peerId = (PeerId) Preconditions.checkNotNull(peerKey.getKeyValues().get(PEER_ID));
+        private void writeRouteTables(final DataTreeCandidateNode child, final PathArgument childIdentifier, final DOMDataWriteTransaction tx, final RIBSupport ribSupport, final AbstractImportPolicy policy, final YangInstanceIdentifier childPath, final Optional<NormalizedNode<?, ?>> childDataAfter) {
+            if (TABLE_ROUTES.equals(childIdentifier)) {
+                final Collection<DataTreeCandidateNode> changedRoutes = ribSupport.changedRoutes(child);
+                if (!changedRoutes.isEmpty()) {
+                    tx.put(LogicalDatastoreType.OPERATIONAL, childPath, childDataAfter.get());
+                    // Routes are special, as they may end up being filtered. The previous put conveniently
+                    // ensured that we have them in at target, so a subsequent delete will not fail :)
+                    for (final DataTreeCandidateNode route : changedRoutes) {
+                        processRoute(tx, ribSupport, policy, childPath, route);
+                    }
+                }
+            }
+        }
 
-            // FIXME: need target table ID
-            target = null;
+        private RIBSupportContext getRibSupport(final NodeIdentifierWithPredicates tableKey) {
+            return this.registry.getRIBSupportContext(tableKey);
         }
 
-        private void updateRoutes(final DOMDataWriteTransaction tx, final DataTreeCandidateNode routes, final ContainerNode effectiveAttrs) {
-            final YangInstanceIdentifier routeId = target.node(routes.getIdentifier());
+        private YangInstanceIdentifier effectiveTablePath(final NodeIdentifierWithPredicates tableKey) {
+            return this.effRibTables.node(tableKey);
+        }
 
-            if (effectiveAttrs != null) {
-                tx.put(LogicalDatastoreType.OPERATIONAL, routeId, routes.getDataAfter().get());
-                tx.put(LogicalDatastoreType.OPERATIONAL, routeId.node(ribSupport.routeAttributes()), effectiveAttrs);
-            } else if (routes.getDataBefore().isPresent()) {
-                tx.delete(LogicalDatastoreType.OPERATIONAL, routeId);
-            }
+        private void modifyTable(final DOMDataWriteTransaction tx, final NodeIdentifierWithPredicates tableKey, final DataTreeCandidateNode table) {
+            final RIBSupportContext ribSupport = getRibSupport(tableKey);
+            final YangInstanceIdentifier tablePath = effectiveTablePath(tableKey);
 
+            processTableChildren(tx, ribSupport.getRibSupport(), tablePath, table.getChildNodes());
         }
 
-        @Override
-        public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
-            // FIXME: note that we need to detect table clears efficiently and propagate them
+        private void writeTable(final DOMDataWriteTransaction tx, final NodeIdentifierWithPredicates tableKey, final DataTreeCandidateNode table) {
+            final RIBSupportContext ribSupport = getRibSupport(tableKey);
+            final YangInstanceIdentifier tablePath = effectiveTablePath(tableKey);
 
-            final DOMDataWriteTransaction tx = chain.newWriteOnlyTransaction();
+            // Create an empty table
+            LOG.trace("Create Empty table", tablePath);
+            ribSupport.createEmptyTableStructure(tx, tablePath);
 
-            for (DataTreeCandidate tc : changes) {
-                // Lookup per-table attributes from RIBSupport
-                final ContainerNode adverisedAttrs = (ContainerNode) NormalizedNodes.findNode(tc.getRootNode().getDataAfter(), ribSupport.routeAttributes()).orNull();
-                final ContainerNode effectiveAttrs;
+            processTableChildren(tx, ribSupport.getRibSupport(), tablePath, table.getChildNodes());
+        }
 
-                if (adverisedAttrs != null && tc.getRootNode().getDataAfter().isPresent()) {
-                    synchronized (policies) {
-                        final AbstractImportPolicy policy = policies.get(peerId);
-                        effectiveAttrs = policy.effectiveAttributes(adverisedAttrs);
+        @Override
+        public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+            LOG.trace("Data changed called to effective RIB. Change : {}", changes);
+
+            // we have a lot of transactions created for 'nothing' because a lot of changes
+            // are skipped, so ensure we only create one transaction when we really need it
+            DOMDataWriteTransaction tx = null;
+            for (final DataTreeCandidate tc : changes) {
+                final YangInstanceIdentifier rootPath = tc.getRootPath();
+
+                final DataTreeCandidateNode root = tc.getRootNode();
+                for (final DataTreeCandidateNode table : root.getChildNodes()) {
+                    if (tx == null) {
+                        tx = this.chain.newWriteOnlyTransaction();
                     }
-                } else {
-                    effectiveAttrs = null;
+                    changeDataTree(tx, rootPath, root, table);
                 }
+            }
+            if (tx != null) {
+                tx.submit();
+            }
+        }
 
-                LOG.debug("Route change {} effective attributes {}", tc.getRootPath(), effectiveAttrs);
+        private void changeDataTree(final DOMDataWriteTransaction tx, final YangInstanceIdentifier rootPath,
+                final DataTreeCandidateNode root, final DataTreeCandidateNode table) {
+            final PathArgument lastArg = table.getIdentifier();
+            Verify.verify(lastArg instanceof NodeIdentifierWithPredicates, "Unexpected type %s in path %s", lastArg.getClass(), rootPath);
+            final NodeIdentifierWithPredicates tableKey = (NodeIdentifierWithPredicates) lastArg;
+            final RIBSupport ribSupport = getRibSupport(tableKey).getRibSupport();
+            final ModificationType modificationType = root.getModificationType();
+            switch (modificationType) {
+                case DELETE:
+                case DISAPPEARED:
+                    final YangInstanceIdentifier effectiveTablePath = effectiveTablePath(tableKey);
+                    LOG.debug("Delete Effective Table {} modification type {}, ", effectiveTablePath, modificationType);
+
+                    // delete the corresponding effective table
+                    tx.delete(LogicalDatastoreType.OPERATIONAL, effectiveTablePath);
+                    final TablesKey tk = new TablesKey(ribSupport.getAfi(), ribSupport.getSafi());
+                    CountersUtil.decrement(this.prefixesInstalled.get(tk), tk);
+                    break;
+                case SUBTREE_MODIFIED:
+                    modifyTable(tx, tableKey, table);
+                    break;
+                case UNMODIFIED:
+                    LOG.info("Ignoring spurious notification on {} data {}", rootPath, table);
+                    break;
+                case APPEARED:
+                case WRITE:
+                    writeTable(tx, tableKey, table);
+                    break;
+                default:
+                    LOG.warn("Ignoring unhandled root {}", root);
+                    break;
+            }
+        }
 
-                updateRoutes(tx, tc.getRootNode(), effectiveAttrs);
+        @Override
+        public synchronized void close() {
+            this.reg.close();
+            this.prefixesReceived.values().forEach(LongAdder::reset);
+            this.prefixesInstalled.values().forEach(LongAdder::reset);
+        }
+
+        @Override
+        public long getPrefixedReceivedCount(final TablesKey tablesKey) {
+            final LongAdder counter = this.prefixesReceived.get(tablesKey);
+            if (counter == null) {
+                return 0;
             }
+            return counter.longValue();
+        }
 
-            tx.submit();
+        @Override
+        public Set<TablesKey> getTableKeys() {
+            return ImmutableSet.copyOf(this.prefixesReceived.keySet());
         }
-    }
 
-    /**
-     * Maintains {@link TableRouteListener} instances.
-     */
-    private final class TableListener implements DOMDataTreeChangeListener {
-        private final Map<YangInstanceIdentifier, ListenerRegistration<?>> routeListeners = new HashMap<>();
-        private final RIBExtensionConsumerContext registry;
-        private final DOMDataTreeChangeService service;
+        @Override
+        public boolean isSupported(final TablesKey tablesKey) {
+            return this.prefixesReceived.containsKey(tablesKey);
+        }
 
-        TableListener(final DOMDataTreeChangeService service, final RIBExtensionConsumerContext registry) {
-            this.registry = Preconditions.checkNotNull(registry);
-            this.service = Preconditions.checkNotNull(service);
+        @Override
+        public long getPrefixedInstalledCount(final TablesKey tablesKey) {
+            final LongAdder counter = this.prefixesInstalled.get(tablesKey);
+            if (counter == null) {
+                return 0;
+            }
+            return counter.longValue();
         }
 
         @Override
-        public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
+        public long getTotalPrefixesInstalled() {
+            return this.prefixesInstalled.values().stream().mapToLong(LongAdder::longValue).sum();
+        }
+    }
 
-            for (DataTreeCandidate tc : changes) {
-                // Obtain the peer's key
-                final NodeIdentifierWithPredicates peerKey = peerKey(tc.getRootPath());
+    private final AdjInTracker adjInTracker;
+    private final AbstractImportPolicy importPolicy;
 
-                // Lookup
-                final NodeIdentifierWithPredicates tableKey = tableKey(tc.getRootPath());
+    static EffectiveRibInWriter create(@Nonnull final RIB rib,
+            @Nonnull final DOMTransactionChain chain,
+            @Nonnull final YangInstanceIdentifier peerIId,
+            @Nonnull final ImportPolicyPeerTracker importPolicyPeerTracker,
+            final PeerRole peerRole,
+            @Nonnull final Set<TablesKey> tables) {
+        return new EffectiveRibInWriter(rib, chain, peerIId, importPolicyPeerTracker, peerRole, tables);
+    }
 
-                switch (tc.getRootNode().getModificationType()) {
-                case DELETE:
-                    final ListenerRegistration<?> reg = routeListeners.remove(tc.getRootPath());
-                    if (reg != null) {
-                        reg.close();
-                    }
-                    break;
-                case WRITE:
-                    // FIXME: use codec to translate
-                    final RIBSupport ribSupport = registry.getRIBSupport(null);
-                    if (ribSupport != null) {
-                        final TableRouteListener routeListener = new TableRouteListener(ribSupport, peerKey, tableKey);
-                        final ListenerRegistration<?> r = service.registerDataTreeChangeListener(
-                            new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,  tc.getRootPath()), routeListener);
-
-                        routeListeners.put(tc.getRootPath(), r);
-                    } else {
-                        LOG.warn("No RIB support for table {}, ignoring advertisements from peer %s", tableKey, peerKey);
-                    }
-                    break;
-                default:
-                    // No-op
-                    break;
-                }
-            }
-        }
+    private EffectiveRibInWriter(final RIB rib, final DOMTransactionChain chain, final YangInstanceIdentifier peerIId,
+            final ImportPolicyPeerTracker importPolicyPeerTracker, final PeerRole peerRole, @Nonnull final Set<TablesKey> tables) {
+        importPolicyPeerTracker.peerRoleChanged(peerIId, peerRole);
+        this.importPolicy = importPolicyPeerTracker.policyFor(IdentifierUtils.peerId((NodeIdentifierWithPredicates) peerIId.getLastPathArgument()));
+        this.adjInTracker = new AdjInTracker(rib, chain, peerIId, tables);
     }
 
-    private final Map<PeerId, AbstractImportPolicy> policies = new HashMap<>();
-    private final DOMTransactionChain chain;
+    @Override
+    public void close() {
+        this.adjInTracker.close();
+    }
+
+    @Override
+    public long getPrefixedReceivedCount(final TablesKey tablesKey) {
+        return this.adjInTracker.getPrefixedReceivedCount(tablesKey);
+    }
 
-    private EffectiveRibInWriter(final DOMTransactionChain chain) {
-        this.chain = Preconditions.checkNotNull(chain);
+    @Override
+    public Set<TablesKey> getTableKeys() {
+        return this.adjInTracker.getTableKeys();
+    }
+
+    @Override
+    public boolean isSupported(final TablesKey tablesKey) {
+        return this.adjInTracker.isSupported(tablesKey);
+    }
+
+    @Override
+    public long getPrefixedInstalledCount(@Nonnull final TablesKey tablesKey) {
+        return this.adjInTracker.getPrefixedInstalledCount(tablesKey);
+    }
 
-        // FIXME: subscribe peerRoleListener, tableListener
+    @Override
+    public long getTotalPrefixesInstalled() {
+        return this.adjInTracker.getTotalPrefixesInstalled();
     }
 }