Merge "Fix for bgp feature to use get proper configs"
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / RIBImpl.java
index 5e069e31e7ee439fb9de43db603c7fefc4d37e50..c3786f4bd5dddc00d2b592974c25e886dcd7ed18 100644 (file)
@@ -16,17 +16,28 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.protocol.bgp.rib.DefaultRibReference;
+import org.opendaylight.protocol.bgp.rib.impl.spi.AdjRIBsOut;
+import org.opendaylight.protocol.bgp.rib.impl.spi.AdjRIBsOutRegistration;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
+import org.opendaylight.protocol.bgp.rib.spi.AbstractAdjRIBs;
 import org.opendaylight.protocol.bgp.rib.spi.AdjRIBsIn;
+import org.opendaylight.protocol.bgp.rib.spi.BGPObjectComparator;
 import org.opendaylight.protocol.bgp.rib.spi.Peer;
 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionConsumerContext;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
@@ -51,6 +62,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.mult
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRib;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRibBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.RibId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.Rib;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.RibBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.RibKey;
@@ -60,40 +72,72 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @ThreadSafe
-public final class RIBImpl extends DefaultRibReference implements AutoCloseable, RIB {
+public final class RIBImpl extends DefaultRibReference implements AutoCloseable, RIB, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(RIBImpl.class);
     private static final Update EOR = new UpdateBuilder().build();
     private static final TablesKey IPV4_UNICAST_TABLE = new TablesKey(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
+    private final ConcurrentMap<Peer, AdjRIBsOut> ribOuts = new ConcurrentHashMap<>();
     private final ReconnectStrategyFactory tcpStrategyFactory;
     private final ReconnectStrategyFactory sessionStrategyFactory;
+    private final BGPObjectComparator comparator;
     private final BGPDispatcher dispatcher;
-    private final DataBroker dps;
+    private final BindingTransactionChain chain;
     private final AsNumber localAs;
     private final Ipv4Address bgpIdentifier;
     private final List<BgpTableType> localTables;
     private final RIBTables tables;
+    private final BlockingQueue<Peer> peers;
+    private final Thread scheduler = new Thread(new Runnable() {
+
+        @Override
+        public void run() {
+            try {
+                final Peer peer = RIBImpl.this.peers.take();
+                LOG.debug("Advertizing loc-rib to new peer {}.", peer);
+                for (final BgpTableType key : RIBImpl.this.localTables) {
+                    final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(RIBImpl.this.ribOuts, RIBImpl.this.comparator, RIBImpl.this.chain.newWriteOnlyTransaction());
+                    final AbstractAdjRIBs<?, ?, ?> adj = (AbstractAdjRIBs<?, ?, ?>) RIBImpl.this.tables.get(new TablesKey(key.getAfi(), key.getSafi()));
+                    adj.addAllEntries(trans);
+                    Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
+                        @Override
+                        public void onSuccess(final Void result) {
+                            LOG.trace("Advertizing {} to peer {} committed successfully", key.getAfi(), peer);
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable t) {
+                            LOG.error("Failed to update peer {} with RIB {}", peer, t);
+                        }
+                    });
+                }
+            } catch (final InterruptedException e) {
+
+            }
+        }
+    });
 
     public RIBImpl(final RibId ribId, final AsNumber localAs, final Ipv4Address localBgpId, final RIBExtensionConsumerContext extensions,
-            final BGPDispatcher dispatcher, final ReconnectStrategyFactory tcpStrategyFactory,
-            final ReconnectStrategyFactory sessionStrategyFactory, final DataBroker dps, final List<BgpTableType> localTables) {
+        final BGPDispatcher dispatcher, final ReconnectStrategyFactory tcpStrategyFactory,
+        final ReconnectStrategyFactory sessionStrategyFactory, final DataBroker dps, final List<BgpTableType> localTables) {
         super(InstanceIdentifier.builder(BgpRib.class).child(Rib.class, new RibKey(Preconditions.checkNotNull(ribId))).toInstance());
-        this.dps = Preconditions.checkNotNull(dps);
+        this.chain = dps.createTransactionChain(this);
         this.localAs = Preconditions.checkNotNull(localAs);
+        this.comparator = new BGPObjectComparator(localAs);
         this.bgpIdentifier = Preconditions.checkNotNull(localBgpId);
         this.dispatcher = Preconditions.checkNotNull(dispatcher);
         this.sessionStrategyFactory = Preconditions.checkNotNull(sessionStrategyFactory);
         this.tcpStrategyFactory = Preconditions.checkNotNull(tcpStrategyFactory);
         this.localTables = ImmutableList.copyOf(localTables);
         this.tables = new RIBTables(extensions);
+        this.peers = new LinkedBlockingQueue<>();
 
         LOG.debug("Instantiating RIB table {} at {}", ribId, getInstanceIdentifier());
 
-        final ReadWriteTransaction trans = dps.newReadWriteTransaction();
+        final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
         Optional<Rib> o;
         try {
             o = trans.read(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier()).get();
@@ -105,24 +149,24 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
         // put empty BgpRib if not exists
         trans.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(BgpRib.class).build(), new BgpRibBuilder().build());
         trans.put(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier(), new RibBuilder().setKey(new RibKey(ribId)).setId(ribId).setLocRib(
-                new LocRibBuilder().setTables(Collections.<Tables> emptyList()).build()).build());
+            new LocRibBuilder().setTables(Collections.<Tables> emptyList()).build()).build());
 
-        for (BgpTableType t : localTables) {
+        for (final BgpTableType t : localTables) {
             final TablesKey key = new TablesKey(t.getAfi(), t.getSafi());
             if (this.tables.create(trans, this, key) == null) {
                 LOG.debug("Did not create local table for unhandled table type {}", t);
             }
         }
 
-        Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess(final RpcResult<TransactionStatus> result) {
+            public void onSuccess(final Void result) {
                 LOG.trace("Change committed successfully");
             }
 
             @Override
             public void onFailure(final Throwable t) {
-                LOG.error("Failed to initiate RIB {}", getInstanceIdentifier());
+                LOG.error("Failed to initiate RIB {}", getInstanceIdentifier(), t);
             }
         });
     }
@@ -132,20 +176,23 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
 
     @Override
     public synchronized void updateTables(final Peer peer, final Update message) {
-        final WriteTransaction trans = this.dps.newWriteOnlyTransaction();
+        final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(this.ribOuts, this.comparator, this.chain.newWriteOnlyTransaction());
 
         if (!EOR.equals(message)) {
             final WithdrawnRoutes wr = message.getWithdrawnRoutes();
             if (wr != null) {
-                final AdjRIBsIn ari = this.tables.get(IPV4_UNICAST_TABLE);
+                final AdjRIBsIn<?, ?> ari = this.tables.get(IPV4_UNICAST_TABLE);
                 if (ari != null) {
+                    /*
+                     * create MPUnreach for the routes to be handled in the same way as linkstate routes
+                     */
                     ari.removeRoutes(
-                            trans,
-                            peer,
-                            new MpUnreachNlriBuilder().setAfi(Ipv4AddressFamily.class).setSafi(UnicastSubsequentAddressFamily.class).setWithdrawnRoutes(
-                                    new WithdrawnRoutesBuilder().setDestinationType(
-                                            new DestinationIpv4CaseBuilder().setDestinationIpv4(
-                                                    new DestinationIpv4Builder().setIpv4Prefixes(wr.getWithdrawnRoutes()).build()).build()).build()).build());
+                        trans,
+                        peer,
+                        new MpUnreachNlriBuilder().setAfi(Ipv4AddressFamily.class).setSafi(UnicastSubsequentAddressFamily.class).setWithdrawnRoutes(
+                            new WithdrawnRoutesBuilder().setDestinationType(
+                                new DestinationIpv4CaseBuilder().setDestinationIpv4(
+                                    new DestinationIpv4Builder().setIpv4Prefixes(wr.getWithdrawnRoutes()).build()).build()).build()).build());
                 } else {
                     LOG.debug("Not removing objects from unhandled IPv4 Unicast");
                 }
@@ -157,7 +204,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
                 if (mpu != null) {
                     final MpUnreachNlri nlri = mpu.getMpUnreachNlri();
 
-                    final AdjRIBsIn ari = this.tables.get(new TablesKey(nlri.getAfi(), nlri.getSafi()));
+                    final AdjRIBsIn<?, ?> ari = this.tables.get(new TablesKey(nlri.getAfi(), nlri.getSafi()));
                     if (ari != null) {
                         ari.removeRoutes(trans, peer, nlri);
                     } else {
@@ -168,13 +215,16 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
 
             final Nlri ar = message.getNlri();
             if (ar != null) {
-                final AdjRIBsIn ari = this.tables.get(IPV4_UNICAST_TABLE);
+                final AdjRIBsIn<?, ?> ari = this.tables.get(IPV4_UNICAST_TABLE);
                 if (ari != null) {
+                    /*
+                     * create MPReach for the routes to be handled in the same way as linkstate routes
+                     */
                     final MpReachNlriBuilder b = new MpReachNlriBuilder().setAfi(Ipv4AddressFamily.class).setSafi(
-                            UnicastSubsequentAddressFamily.class).setAdvertizedRoutes(
-                                    new AdvertizedRoutesBuilder().setDestinationType(
-                                            new DestinationIpv4CaseBuilder().setDestinationIpv4(
-                                                    new DestinationIpv4Builder().setIpv4Prefixes(ar.getNlri()).build()).build()).build());
+                        UnicastSubsequentAddressFamily.class).setAdvertizedRoutes(
+                            new AdvertizedRoutesBuilder().setDestinationType(
+                                new DestinationIpv4CaseBuilder().setDestinationIpv4(
+                                    new DestinationIpv4Builder().setIpv4Prefixes(ar.getNlri()).build()).build()).build());
                     if (attrs != null) {
                         b.setCNextHop(attrs.getCNextHop());
                     }
@@ -190,7 +240,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
                 if (mpr != null) {
                     final MpReachNlri nlri = mpr.getMpReachNlri();
 
-                    final AdjRIBsIn ari = this.tables.get(new TablesKey(nlri.getAfi(), nlri.getSafi()));
+                    final AdjRIBsIn<?, ?> ari = this.tables.get(new TablesKey(nlri.getAfi(), nlri.getSafi()));
                     if (ari != null) {
                         if (message.equals(ari.endOfRib())) {
                             ari.markUptodate(trans, peer);
@@ -203,7 +253,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
                 }
             }
         } else {
-            final AdjRIBsIn ari = this.tables.get(IPV4_UNICAST_TABLE);
+            final AdjRIBsIn<?, ?> ari = this.tables.get(IPV4_UNICAST_TABLE);
             if (ari != null) {
                 ari.markUptodate(trans, peer);
             } else {
@@ -211,9 +261,9 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
             }
         }
 
-        Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+        Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess(final RpcResult<TransactionStatus> result) {
+            public void onSuccess(final Void result) {
                 LOG.debug("RIB modification successfully committed.");
             }
 
@@ -226,20 +276,20 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
 
     @Override
     public synchronized void clearTable(final Peer peer, final TablesKey key) {
-        final AdjRIBsIn ari = this.tables.get(key);
+        final AdjRIBsIn<?, ?> ari = this.tables.get(key);
         if (ari != null) {
-            final WriteTransaction trans = this.dps.newWriteOnlyTransaction();
+            final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(this.ribOuts, this.comparator, this.chain.newWriteOnlyTransaction());
             ari.clear(trans, peer);
 
-            Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+            Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(final RpcResult<TransactionStatus> result) {
-                    // Nothing to do
+                public void onSuccess(final Void result) {
+                    LOG.trace("Table {} cleared successfully", key);
                 }
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.error("Failed to commit RIB modification", t);
+                    LOG.error("Failed to clear table {}", key, t);
                 }
             });
         }
@@ -254,11 +304,17 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
         return toStringHelper;
     }
 
+    @SuppressWarnings("unchecked")
+    protected <K, V extends Route> AdjRIBsIn<K, V> getTable(final TablesKey key) {
+        return (AdjRIBsIn<K, V>) this.tables.get(key);
+    }
+
     @Override
     public void close() throws InterruptedException, ExecutionException {
-        final WriteTransaction t = this.dps.newWriteOnlyTransaction();
+        final WriteTransaction t = this.chain.newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
-        t.commit().get();
+        t.submit().get();
+        this.chain.close();
     }
 
     @Override
@@ -295,4 +351,34 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     public void initTable(final Peer bgpPeer, final TablesKey key) {
         // FIXME: BUG-196: support graceful restart
     }
+
+    @Override
+    public AdjRIBsOutRegistration registerRIBsOut(final Peer peer, final AdjRIBsOut aro) {
+        final AdjRIBsOutRegistration reg = new AdjRIBsOutRegistration(aro) {
+            @Override
+            protected void removeRegistration() {
+                RIBImpl.this.ribOuts.remove(peer, aro);
+            }
+        };
+
+        this.ribOuts.put(peer, aro);
+        LOG.debug("Registering this peer {} to RIB-Out {}", peer, this.ribOuts);
+        try {
+            this.peers.put(peer);
+            this.scheduler.run();
+        } catch (final InterruptedException e) {
+            //
+        }
+        return reg;
+    }
+
+    @Override
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+        LOG.error("Broken chain in RIB {} transaction {}", getInstanceIdentifier(), transaction.getIdentifier(), cause);
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+        LOG.info("RIB {} closed successfully", getInstanceIdentifier());
+    }
 }