Added synchronized to close().
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / RIBImpl.java
index 43087a58d6e7e94bcd0f56ebdd5b8ead029b3cb8..029bd55d56eb55189e0bb6a2adf524dcb5010e77 100644 (file)
@@ -11,11 +11,12 @@ import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.List;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -24,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -61,7 +61,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.mult
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.path.attributes.mp.reach.nlri.AdvertizedRoutesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.path.attributes.mp.unreach.nlri.WithdrawnRoutesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRib;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.path.attributes.mp.reach.nlri.AdvertizedRoutesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.path.attributes.mp.unreach.nlri.WithdrawnRoutesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRib;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRibBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.RibId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.Rib;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.RibId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.Rib;
@@ -84,15 +83,34 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     private static final Logger LOG = LoggerFactory.getLogger(RIBImpl.class);
     private static final Update EOR = new UpdateBuilder().build();
     private static final TablesKey IPV4_UNICAST_TABLE = new TablesKey(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
     private static final Logger LOG = LoggerFactory.getLogger(RIBImpl.class);
     private static final Update EOR = new UpdateBuilder().build();
     private static final TablesKey IPV4_UNICAST_TABLE = new TablesKey(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
+
+    /*
+     * FIXME: performance: this needs to be turned into a Peer->offset map.
+     *        The offset is used to locate a the per-peer state entry in the
+     *        RIB tables.
+     *
+     *        For the first release, that map is updated whenever configuration
+     *        changes and remains constant on peer flaps. On re-configuration
+     *        a resize task is scheduled, so large tables may take some time
+     *        before they continue reacting to updates.
+     *
+     *        For subsequent releases, if we make the reformat process concurrent,
+     *        we can trigger reformats when Graceful Restart Time expires for a
+     *        particular peer.
+     */
     private final ConcurrentMap<Peer, AdjRIBsOut> ribOuts = new ConcurrentHashMap<>();
     private final ReconnectStrategyFactory tcpStrategyFactory;
     private final ReconnectStrategyFactory sessionStrategyFactory;
     private final ConcurrentMap<Peer, AdjRIBsOut> ribOuts = new ConcurrentHashMap<>();
     private final ReconnectStrategyFactory tcpStrategyFactory;
     private final ReconnectStrategyFactory sessionStrategyFactory;
+
+    /**
+     * BGP Best Path selection comparator for ingress best path selection.
+     */
     private final BGPObjectComparator comparator;
     private final BGPDispatcher dispatcher;
     private final BindingTransactionChain chain;
     private final AsNumber localAs;
     private final Ipv4Address bgpIdentifier;
     private final BGPObjectComparator comparator;
     private final BGPDispatcher dispatcher;
     private final BindingTransactionChain chain;
     private final AsNumber localAs;
     private final Ipv4Address bgpIdentifier;
-    private final List<BgpTableType> localTables;
+    private final Set<BgpTableType> localTables;
     private final RIBTables tables;
     private final BlockingQueue<Peer> peers;
     private final DataBroker dataBroker;
     private final RIBTables tables;
     private final BlockingQueue<Peer> peers;
     private final DataBroker dataBroker;
@@ -104,19 +122,22 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
                 final Peer peer = RIBImpl.this.peers.take();
                 LOG.debug("Advertizing loc-rib to new peer {}.", peer);
                 for (final BgpTableType key : RIBImpl.this.localTables) {
                 final Peer peer = RIBImpl.this.peers.take();
                 LOG.debug("Advertizing loc-rib to new peer {}.", peer);
                 for (final BgpTableType key : RIBImpl.this.localTables) {
-                    final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(RIBImpl.this.ribOuts, RIBImpl.this.comparator, RIBImpl.this.chain.newWriteOnlyTransaction());
-                    final AbstractAdjRIBs<?, ?, ?> adj = (AbstractAdjRIBs<?, ?, ?>) RIBImpl.this.tables.get(new TablesKey(key.getAfi(), key.getSafi()));
-                    adj.addAllEntries(trans);
-                    Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            LOG.trace("Advertizing {} to peer {} committed successfully", key.getAfi(), peer);
-                        }
-                        @Override
-                        public void onFailure(final Throwable t) {
-                            LOG.error("Failed to update peer {} with RIB {}", peer, t);
-                        }
-                    });
+
+                    synchronized (RIBImpl.this) {
+                        final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(RIBImpl.this.ribOuts, RIBImpl.this.comparator, RIBImpl.this.chain.newWriteOnlyTransaction());
+                        final AbstractAdjRIBs<?, ?, ?> adj = (AbstractAdjRIBs<?, ?, ?>) RIBImpl.this.tables.get(new TablesKey(key.getAfi(), key.getSafi()));
+                        adj.addAllEntries(trans);
+                        Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                LOG.trace("Advertizing {} to peer {} committed successfully", key.getAfi(), peer);
+                            }
+                            @Override
+                            public void onFailure(final Throwable t) {
+                                LOG.error("Failed to update peer {} with RIB {}", peer, t);
+                            }
+                        });
+                    }
                 }
             } catch (final InterruptedException e) {
                 LOG.info("Scheduler thread was interrupted.", e);
                 }
             } catch (final InterruptedException e) {
                 LOG.info("Scheduler thread was interrupted.", e);
@@ -127,7 +148,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     public RIBImpl(final RibId ribId, final AsNumber localAs, final Ipv4Address localBgpId, final RIBExtensionConsumerContext extensions,
         final BGPDispatcher dispatcher, final ReconnectStrategyFactory tcpStrategyFactory,
         final ReconnectStrategyFactory sessionStrategyFactory, final DataBroker dps, final List<BgpTableType> localTables) {
     public RIBImpl(final RibId ribId, final AsNumber localAs, final Ipv4Address localBgpId, final RIBExtensionConsumerContext extensions,
         final BGPDispatcher dispatcher, final ReconnectStrategyFactory tcpStrategyFactory,
         final ReconnectStrategyFactory sessionStrategyFactory, final DataBroker dps, final List<BgpTableType> localTables) {
-        super(InstanceIdentifier.builder(BgpRib.class).child(Rib.class, new RibKey(Preconditions.checkNotNull(ribId))).toInstance());
+        super(InstanceIdentifier.builder(BgpRib.class).child(Rib.class, new RibKey(Preconditions.checkNotNull(ribId))).build());
         this.chain = dps.createTransactionChain(this);
         this.localAs = Preconditions.checkNotNull(localAs);
         this.comparator = new BGPObjectComparator(localAs);
         this.chain = dps.createTransactionChain(this);
         this.localAs = Preconditions.checkNotNull(localAs);
         this.comparator = new BGPObjectComparator(localAs);
@@ -135,26 +156,18 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
         this.dispatcher = Preconditions.checkNotNull(dispatcher);
         this.sessionStrategyFactory = Preconditions.checkNotNull(sessionStrategyFactory);
         this.tcpStrategyFactory = Preconditions.checkNotNull(tcpStrategyFactory);
         this.dispatcher = Preconditions.checkNotNull(dispatcher);
         this.sessionStrategyFactory = Preconditions.checkNotNull(sessionStrategyFactory);
         this.tcpStrategyFactory = Preconditions.checkNotNull(tcpStrategyFactory);
-        this.localTables = ImmutableList.copyOf(localTables);
+        this.localTables = ImmutableSet.copyOf(localTables);
         this.tables = new RIBTables(extensions);
         this.peers = new LinkedBlockingQueue<>();
         this.dataBroker = dps;
 
         LOG.debug("Instantiating RIB table {} at {}", ribId, getInstanceIdentifier());
 
         this.tables = new RIBTables(extensions);
         this.peers = new LinkedBlockingQueue<>();
         this.dataBroker = dps;
 
         LOG.debug("Instantiating RIB table {} at {}", ribId, getInstanceIdentifier());
 
-        final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
-        Optional<Rib> o;
-        try {
-            o = trans.read(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier()).get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IllegalStateException("Failed to read topology", e);
-        }
-        Preconditions.checkState(!o.isPresent(), "Data provider conflict detected on object {}", getInstanceIdentifier());
+        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
 
         // put empty BgpRib if not exists
 
         // put empty BgpRib if not exists
-        trans.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(BgpRib.class).build(), new BgpRibBuilder().build());
         trans.put(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier(), new RibBuilder().setKey(new RibKey(ribId)).setId(ribId).setLocRib(
         trans.put(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier(), new RibBuilder().setKey(new RibKey(ribId)).setId(ribId).setLocRib(
-            new LocRibBuilder().setTables(Collections.<Tables> emptyList()).build()).build());
+            new LocRibBuilder().setTables(Collections.<Tables> emptyList()).build()).build(), true);
 
         for (final BgpTableType t : localTables) {
             final TablesKey key = new TablesKey(t.getAfi(), t.getSafi());
 
         for (final BgpTableType t : localTables) {
             final TablesKey key = new TablesKey(t.getAfi(), t.getSafi());
@@ -208,12 +221,16 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
                 final PathAttributes2 mpu = attrs.getAugmentation(PathAttributes2.class);
                 if (mpu != null) {
                     final MpUnreachNlri nlri = mpu.getMpUnreachNlri();
                 final PathAttributes2 mpu = attrs.getAugmentation(PathAttributes2.class);
                 if (mpu != null) {
                     final MpUnreachNlri nlri = mpu.getMpUnreachNlri();
-
                     final AdjRIBsIn<?, ?> ari = this.tables.get(new TablesKey(nlri.getAfi(), nlri.getSafi()));
                     final AdjRIBsIn<?, ?> ari = this.tables.get(new TablesKey(nlri.getAfi(), nlri.getSafi()));
-                    if (ari != null) {
-                        ari.removeRoutes(trans, peer, nlri);
+                    // EOR messages do not contain withdrawn routes
+                    if (nlri.getWithdrawnRoutes() != null) {
+                        if (ari != null) {
+                            ari.removeRoutes(trans, peer, nlri);
+                        } else {
+                            LOG.debug("Not removing objects from unhandled NLRI {}", nlri);
+                        }
                     } else {
                     } else {
-                        LOG.debug("Not removing objects from unhandled NLRI {}", nlri);
+                        ari.markUptodate(trans, peer);
                     }
                 }
             }
                     }
                 }
             }
@@ -315,7 +332,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     }
 
     @Override
     }
 
     @Override
-    public void close() throws InterruptedException, ExecutionException {
+    public synchronized void close() throws InterruptedException, ExecutionException {
         final WriteTransaction t = this.chain.newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
         t.submit().get();
         final WriteTransaction t = this.chain.newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
         t.submit().get();
@@ -333,7 +350,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     }
 
     @Override
     }
 
     @Override
-    public List<? extends BgpTableType> getLocalTables() {
+    public Set<? extends BgpTableType> getLocalTables() {
         return this.localTables;
     }
 
         return this.localTables;
     }