Added synchronized to close().
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / RIBImpl.java
index 43087a58d6e7e94bcd0f56ebdd5b8ead029b3cb8..029bd55d56eb55189e0bb6a2adf524dcb5010e77 100644 (file)
@@ -11,11 +11,12 @@ import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -24,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -61,7 +61,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.mult
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.path.attributes.mp.reach.nlri.AdvertizedRoutesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.path.attributes.mp.unreach.nlri.WithdrawnRoutesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRib;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRibBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.RibId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.Rib;
@@ -84,15 +83,34 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     private static final Logger LOG = LoggerFactory.getLogger(RIBImpl.class);
     private static final Update EOR = new UpdateBuilder().build();
     private static final TablesKey IPV4_UNICAST_TABLE = new TablesKey(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
+
+    /*
+     * FIXME: performance: this needs to be turned into a Peer->offset map.
+     *        The offset is used to locate a the per-peer state entry in the
+     *        RIB tables.
+     *
+     *        For the first release, that map is updated whenever configuration
+     *        changes and remains constant on peer flaps. On re-configuration
+     *        a resize task is scheduled, so large tables may take some time
+     *        before they continue reacting to updates.
+     *
+     *        For subsequent releases, if we make the reformat process concurrent,
+     *        we can trigger reformats when Graceful Restart Time expires for a
+     *        particular peer.
+     */
     private final ConcurrentMap<Peer, AdjRIBsOut> ribOuts = new ConcurrentHashMap<>();
     private final ReconnectStrategyFactory tcpStrategyFactory;
     private final ReconnectStrategyFactory sessionStrategyFactory;
+
+    /**
+     * BGP Best Path selection comparator for ingress best path selection.
+     */
     private final BGPObjectComparator comparator;
     private final BGPDispatcher dispatcher;
     private final BindingTransactionChain chain;
     private final AsNumber localAs;
     private final Ipv4Address bgpIdentifier;
-    private final List<BgpTableType> localTables;
+    private final Set<BgpTableType> localTables;
     private final RIBTables tables;
     private final BlockingQueue<Peer> peers;
     private final DataBroker dataBroker;
@@ -104,19 +122,22 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
                 final Peer peer = RIBImpl.this.peers.take();
                 LOG.debug("Advertizing loc-rib to new peer {}.", peer);
                 for (final BgpTableType key : RIBImpl.this.localTables) {
-                    final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(RIBImpl.this.ribOuts, RIBImpl.this.comparator, RIBImpl.this.chain.newWriteOnlyTransaction());
-                    final AbstractAdjRIBs<?, ?, ?> adj = (AbstractAdjRIBs<?, ?, ?>) RIBImpl.this.tables.get(new TablesKey(key.getAfi(), key.getSafi()));
-                    adj.addAllEntries(trans);
-                    Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            LOG.trace("Advertizing {} to peer {} committed successfully", key.getAfi(), peer);
-                        }
-                        @Override
-                        public void onFailure(final Throwable t) {
-                            LOG.error("Failed to update peer {} with RIB {}", peer, t);
-                        }
-                    });
+
+                    synchronized (RIBImpl.this) {
+                        final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(RIBImpl.this.ribOuts, RIBImpl.this.comparator, RIBImpl.this.chain.newWriteOnlyTransaction());
+                        final AbstractAdjRIBs<?, ?, ?> adj = (AbstractAdjRIBs<?, ?, ?>) RIBImpl.this.tables.get(new TablesKey(key.getAfi(), key.getSafi()));
+                        adj.addAllEntries(trans);
+                        Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                LOG.trace("Advertizing {} to peer {} committed successfully", key.getAfi(), peer);
+                            }
+                            @Override
+                            public void onFailure(final Throwable t) {
+                                LOG.error("Failed to update peer {} with RIB {}", peer, t);
+                            }
+                        });
+                    }
                 }
             } catch (final InterruptedException e) {
                 LOG.info("Scheduler thread was interrupted.", e);
@@ -127,7 +148,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     public RIBImpl(final RibId ribId, final AsNumber localAs, final Ipv4Address localBgpId, final RIBExtensionConsumerContext extensions,
         final BGPDispatcher dispatcher, final ReconnectStrategyFactory tcpStrategyFactory,
         final ReconnectStrategyFactory sessionStrategyFactory, final DataBroker dps, final List<BgpTableType> localTables) {
-        super(InstanceIdentifier.builder(BgpRib.class).child(Rib.class, new RibKey(Preconditions.checkNotNull(ribId))).toInstance());
+        super(InstanceIdentifier.builder(BgpRib.class).child(Rib.class, new RibKey(Preconditions.checkNotNull(ribId))).build());
         this.chain = dps.createTransactionChain(this);
         this.localAs = Preconditions.checkNotNull(localAs);
         this.comparator = new BGPObjectComparator(localAs);
@@ -135,26 +156,18 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
         this.dispatcher = Preconditions.checkNotNull(dispatcher);
         this.sessionStrategyFactory = Preconditions.checkNotNull(sessionStrategyFactory);
         this.tcpStrategyFactory = Preconditions.checkNotNull(tcpStrategyFactory);
-        this.localTables = ImmutableList.copyOf(localTables);
+        this.localTables = ImmutableSet.copyOf(localTables);
         this.tables = new RIBTables(extensions);
         this.peers = new LinkedBlockingQueue<>();
         this.dataBroker = dps;
 
         LOG.debug("Instantiating RIB table {} at {}", ribId, getInstanceIdentifier());
 
-        final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
-        Optional<Rib> o;
-        try {
-            o = trans.read(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier()).get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IllegalStateException("Failed to read topology", e);
-        }
-        Preconditions.checkState(!o.isPresent(), "Data provider conflict detected on object {}", getInstanceIdentifier());
+        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
 
         // put empty BgpRib if not exists
-        trans.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(BgpRib.class).build(), new BgpRibBuilder().build());
         trans.put(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier(), new RibBuilder().setKey(new RibKey(ribId)).setId(ribId).setLocRib(
-            new LocRibBuilder().setTables(Collections.<Tables> emptyList()).build()).build());
+            new LocRibBuilder().setTables(Collections.<Tables> emptyList()).build()).build(), true);
 
         for (final BgpTableType t : localTables) {
             final TablesKey key = new TablesKey(t.getAfi(), t.getSafi());
@@ -208,12 +221,16 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
                 final PathAttributes2 mpu = attrs.getAugmentation(PathAttributes2.class);
                 if (mpu != null) {
                     final MpUnreachNlri nlri = mpu.getMpUnreachNlri();
-
                     final AdjRIBsIn<?, ?> ari = this.tables.get(new TablesKey(nlri.getAfi(), nlri.getSafi()));
-                    if (ari != null) {
-                        ari.removeRoutes(trans, peer, nlri);
+                    // EOR messages do not contain withdrawn routes
+                    if (nlri.getWithdrawnRoutes() != null) {
+                        if (ari != null) {
+                            ari.removeRoutes(trans, peer, nlri);
+                        } else {
+                            LOG.debug("Not removing objects from unhandled NLRI {}", nlri);
+                        }
                     } else {
-                        LOG.debug("Not removing objects from unhandled NLRI {}", nlri);
+                        ari.markUptodate(trans, peer);
                     }
                 }
             }
@@ -315,7 +332,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     }
 
     @Override
-    public void close() throws InterruptedException, ExecutionException {
+    public synchronized void close() throws InterruptedException, ExecutionException {
         final WriteTransaction t = this.chain.newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
         t.submit().get();
@@ -333,7 +350,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     }
 
     @Override
-    public List<? extends BgpTableType> getLocalTables() {
+    public Set<? extends BgpTableType> getLocalTables() {
         return this.localTables;
     }