Bump upstreams to 2022.09
[bgpcep.git] / bgp / topology-provider / src / main / java / org / opendaylight / bgpcep / bgp / topology / provider / AbstractTopologyBuilder.java
index 81ba926f19e7fd612c6bf832c1e4d6272f89dacc..9f6326e027796ea304e56b112e2f69928757e053 100644 (file)
@@ -15,30 +15,29 @@ import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.bgpcep.topology.TopologyReference;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.protocol.bgp.rib.RibReference;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.LocRib;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.AddressFamily;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.SubsequentAddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.SubsequentAddressFamily;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@@ -59,8 +58,8 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     private final InstanceIdentifier<Topology> topology;
     private final RibReference locRibReference;
     private final DataBroker dataProvider;
-    private final Class<? extends AddressFamily> afi;
-    private final Class<? extends SubsequentAddressFamily> safi;
+    private final AddressFamily afi;
+    private final SubsequentAddressFamily safi;
     private final TopologyKey topologyKey;
     private final TopologyTypes topologyTypes;
     private final long listenerResetLimitInMillsec;
@@ -69,19 +68,20 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     @GuardedBy("this")
     private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
     @GuardedBy("this")
-    private BindingTransactionChain chain = null;
-    private AtomicBoolean closed = new AtomicBoolean(false);
+    private TransactionChain chain = null;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
     @GuardedBy("this")
     @VisibleForTesting
     protected long listenerScheduledRestartTime = 0;
     @GuardedBy("this")
     @VisibleForTesting
     protected int listenerScheduledRestartEnforceCounter = 0;
+    protected boolean networkTopologyTransaction = true;
 
     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
-            final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
-        final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
-        final int listenerResetEnforceCounter) {
+            final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+            final SubsequentAddressFamily safi, final long listenerResetLimitInMillsec,
+            final int listenerResetEnforceCounter) {
         this.dataProvider = dataProvider;
         this.locRibReference = requireNonNull(locRibReference);
         this.topologyKey = new TopologyKey(requireNonNull(topologyId));
@@ -95,8 +95,8 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     }
 
     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
-        final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
-        final Class<? extends SubsequentAddressFamily> safi) {
+        final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+        final SubsequentAddressFamily safi) {
         this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
                 LISTENER_RESET_ENFORCE_COUNTER);
     }
@@ -118,7 +118,7 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
                 this.getInstanceIdentifier());
         final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier()
                 .child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
-        final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
+        final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
                 getRouteWildcard(tablesId));
 
         this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
@@ -165,56 +165,63 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
-        if (this.closed.get()) {
-            LOG.trace("Transaction chain was already closed, skipping update.");
-            return;
-        }
-        // check if the transaction chain needed to be restarted due to a previous error
-        if (restartTransactionChainOnDemand()) {
-            LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
-            return;
-        }
-        final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
-        LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
-        final AtomicBoolean transactionInError = new AtomicBoolean(false);
-        for (final DataTreeModification<T> change : changes) {
-            try {
-                routeChanged(change, trans);
-            } catch (final RuntimeException exc) {
-                LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
-                        trans.getIdentifier(), this, exc);
-                // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
-                // trans.commit() must be called first to unlock the current transaction chain, to make the chain
-                // closable so we cannot exit the #onDataTreeChanged() yet
-                transactionInError.set(true);
-                break;
+        if (networkTopologyTransaction) {
+            if (this.closed.get()) {
+                LOG.trace("Transaction chain was already closed, skipping update.");
+                return;
             }
-        }
-        trans.commit().addCallback(new FutureCallback<CommitInfo>() {
-            @Override
-            public void onSuccess(final CommitInfo result) {
-                // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
-                // successfully even when an exception is captured, thus #onTransactionChainFailed() never get invoked.
-                // Though the transaction chain remains usable,
-                // the data loss will not be able to be recovered. Thus we schedule a listener restart here
-                if (transactionInError.get()) {
-                    LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart"
-                            + " of listener {}", trans
-                        .getIdentifier(), AbstractTopologyBuilder.this);
-                    scheduleListenerRestart();
-                } else {
-                    LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+            // check if the transaction chain needed to be restarted due to a previous error
+            if (restartTransactionChainOnDemand()) {
+                LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
+                return;
+            }
+            final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
+            LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
+            final AtomicBoolean transactionInError = new AtomicBoolean(false);
+            for (final DataTreeModification<T> change : changes) {
+                try {
+                    routeChanged(change, trans);
+                } catch (final RuntimeException exc) {
+                    LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
+                            trans.getIdentifier(), this, exc);
+                    // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic
+                    // change.
+                    // trans.commit() must be called first to unlock the current transaction chain, to make the chain
+                    // closable so we cannot exit the #onDataTreeChanged() yet
+                    transactionInError.set(true);
+                    break;
                 }
             }
+            trans.commit().addCallback(new FutureCallback<CommitInfo>() {
+                @Override
+                public void onSuccess(final CommitInfo result) {
+                    // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
+                    // successfully even when an exception is captured, thus #onTransactionChainFailed() never get
+                    // invoked. Though the transaction chain remains usable,
+                    // the data loss will not be able to be recovered. Thus we schedule a listener restart here
+                    if (transactionInError.get()) {
+                        LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a"
+                                + " restart of listener {}", trans
+                            .getIdentifier(), AbstractTopologyBuilder.this);
+                        scheduleListenerRestart();
+                    } else {
+                        LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+                    }
+                }
 
-            @Override
-            public void onFailure(final Throwable throwable) {
-                // we do nothing but print out the log. Transaction chain restart will be done in
-                // #onTransactionChainFailed()
-                LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
-                        AbstractTopologyBuilder.this, throwable);
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    // we do nothing but print out the log. Transaction chain restart will be done in
+                    // #onTransactionChainFailed()
+                    LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
+                            AbstractTopologyBuilder.this, throwable);
+                }
+            }, MoreExecutors.directExecutor());
+        } else {
+            for (final DataTreeModification<T> change : changes) {
+                routeChanged(change, null);
             }
-        }, MoreExecutors.directExecutor());
+        }
     }
 
     @VisibleForTesting
@@ -239,10 +246,10 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     private synchronized void initOperationalTopology() {
         requireNonNull(this.chain, "A valid transaction chain must be provided.");
         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
-        trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
-                new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE)
+        trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, this.topology,
+                new TopologyBuilder().withKey(this.topologyKey).setServerProvided(Boolean.TRUE)
                         .setTopologyTypes(this.topologyTypes)
-                        .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build(), true);
+                        .setLink(Map.of()).setNode(Map.of()).build());
         trans.commit().addCallback(new FutureCallback<CommitInfo>() {
             @Override
             public void onSuccess(final CommitInfo result) {
@@ -289,7 +296,7 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
         LOG.debug("Initializing transaction chain for topology {}", this);
         Preconditions.checkState(this.chain == null,
                 "Transaction chain has to be closed before being initialized");
-        this.chain = this.dataProvider.createTransactionChain(this);
+        this.chain = this.dataProvider.createMergingTransactionChain(this);
     }
 
     /**
@@ -394,8 +401,8 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     }
 
     @Override
-    public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> transactionChain,
-            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+    public final synchronized void onTransactionChainFailed(final TransactionChain transactionChain,
+            final Transaction transaction, final Throwable cause) {
         LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(),
                 transaction != null ? transaction.getIdentifier() : null, cause);
         scheduleListenerRestart();
@@ -403,7 +410,7 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     }
 
     @Override
-    public final void onTransactionChainSuccessful(final TransactionChain<?, ?> transactionChain) {
+    public final void onTransactionChainSuccessful(final TransactionChain transactionChain) {
         LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
     }
 }