Bump upstreams
[bgpcep.git] / bgp / topology-provider / src / main / java / org / opendaylight / bgpcep / bgp / topology / provider / AbstractTopologyBuilder.java
index 746f7f86a0fb965f15cde9fd986013f21b1bdc7c..2ecdcad958c4f8199b2bf0f7a1196d700c8a02ea 100644 (file)
@@ -14,8 +14,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.bgpcep.topology.TopologyReference;
@@ -25,9 +25,7 @@ import org.opendaylight.mdsal.binding.api.DataObjectModification;
 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
 import org.opendaylight.mdsal.binding.api.DataTreeModification;
 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
-import org.opendaylight.mdsal.binding.api.Transaction;
 import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
@@ -44,13 +42,14 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractTopologyBuilder<T extends Route> implements ClusteredDataTreeChangeListener<T>,
-    TopologyReference, TransactionChainListener {
+public abstract class AbstractTopologyBuilder<T extends Route>
+        implements ClusteredDataTreeChangeListener<T>, TopologyReference, FutureCallback<Empty> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
     // we limit the listener reset interval to be 5 min at most
     private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
@@ -58,15 +57,15 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     private final InstanceIdentifier<Topology> topology;
     private final RibReference locRibReference;
     private final DataBroker dataProvider;
-    private final Class<? extends AddressFamily> afi;
-    private final Class<? extends SubsequentAddressFamily> safi;
+    private final AddressFamily afi;
+    private final SubsequentAddressFamily safi;
     private final TopologyKey topologyKey;
     private final TopologyTypes topologyTypes;
     private final long listenerResetLimitInMillsec;
     private final int listenerResetEnforceCounter;
 
     @GuardedBy("this")
-    private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
+    private Registration listenerRegistration = null;
     @GuardedBy("this")
     private TransactionChain chain = null;
     private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -79,31 +78,32 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     protected boolean networkTopologyTransaction = true;
 
     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
-            final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
-        final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
-        final int listenerResetEnforceCounter) {
+            final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+            final SubsequentAddressFamily safi, final long listenerResetLimitInMillsec,
+            final int listenerResetEnforceCounter) {
         this.dataProvider = dataProvider;
         this.locRibReference = requireNonNull(locRibReference);
-        this.topologyKey = new TopologyKey(requireNonNull(topologyId));
-        this.topologyTypes = types;
+        topologyKey = new TopologyKey(requireNonNull(topologyId));
+        topologyTypes = types;
         this.afi = afi;
         this.safi = safi;
         this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
         this.listenerResetEnforceCounter = listenerResetEnforceCounter;
-        this.topology = InstanceIdentifier.builder(NetworkTopology.class)
-                .child(Topology.class, this.topologyKey).build();
+        topology = InstanceIdentifier.builder(NetworkTopology.class)
+                .child(Topology.class, topologyKey)
+                .build();
     }
 
     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
-        final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
-        final Class<? extends SubsequentAddressFamily> safi) {
+        final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+        final SubsequentAddressFamily safi) {
         this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
                 LISTENER_RESET_ENFORCE_COUNTER);
     }
 
     public final synchronized void start() {
-        LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology,
-                this.afi, this.safi);
+        LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", locRibReference, topology,
+                afi, safi);
         initTransactionChain();
         initOperationalTopology();
         registerDataChangeListener();
@@ -113,27 +113,27 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
      * Register to data tree change listener.
      */
     private synchronized void registerDataChangeListener() {
-        Preconditions.checkState(this.listenerRegistration == null,
+        Preconditions.checkState(listenerRegistration == null,
                 "Topology Listener on topology %s has been registered before.",
                 this.getInstanceIdentifier());
-        final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier()
-                .child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
+        final InstanceIdentifier<Tables> tablesId = locRibReference.getInstanceIdentifier()
+                .child(LocRib.class).child(Tables.class, new TablesKey(afi, safi));
         final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
                 getRouteWildcard(tablesId));
 
-        this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
+        listenerRegistration = dataProvider.registerDataTreeChangeListener(id, this);
         LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(),
-                this.listenerScheduledRestartTime);
+                listenerScheduledRestartTime);
     }
 
     /**
      * Unregister to data tree change listener.
      */
     private synchronized void unregisterDataChangeListener() {
-        if (this.listenerRegistration != null) {
+        if (listenerRegistration != null) {
             LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
-            this.listenerRegistration.close();
-            this.listenerRegistration = null;
+            listenerRegistration.close();
+            listenerRegistration = null;
         }
     }
 
@@ -147,11 +147,11 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
 
     @Override
     public final InstanceIdentifier<Topology> getInstanceIdentifier() {
-        return this.topology;
+        return topology;
     }
 
     public final synchronized FluentFuture<? extends CommitInfo> close() {
-        if (this.closed.getAndSet(true)) {
+        if (closed.getAndSet(true)) {
             LOG.trace("Transaction chain was already closed.");
             return CommitInfo.emptyFluentFuture();
         }
@@ -164,9 +164,9 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
+    public synchronized void onDataTreeChanged(final List<DataTreeModification<T>> changes) {
         if (networkTopologyTransaction) {
-            if (this.closed.get()) {
+            if (closed.get()) {
                 LOG.trace("Transaction chain was already closed, skipping update.");
                 return;
             }
@@ -175,7 +175,7 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
                 LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
                 return;
             }
-            final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
+            final ReadWriteTransaction trans = chain.newReadWriteTransaction();
             LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
             final AtomicBoolean transactionInError = new AtomicBoolean(false);
             for (final DataTreeModification<T> change : changes) {
@@ -244,12 +244,12 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     }
 
     private synchronized void initOperationalTopology() {
-        requireNonNull(this.chain, "A valid transaction chain must be provided.");
-        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
-        trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, this.topology,
-                new TopologyBuilder().withKey(this.topologyKey).setServerProvided(Boolean.TRUE)
-                        .setTopologyTypes(this.topologyTypes)
-                        .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build());
+        requireNonNull(chain, "A valid transaction chain must be provided.");
+        final WriteTransaction trans = chain.newWriteOnlyTransaction();
+        trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, topology,
+                new TopologyBuilder().withKey(topologyKey).setServerProvided(Boolean.TRUE)
+                        .setTopologyTypes(topologyTypes)
+                        .setLink(Map.of()).setNode(Map.of()).build());
         trans.commit().addCallback(new FutureCallback<CommitInfo>() {
             @Override
             public void onSuccess(final CommitInfo result) {
@@ -269,8 +269,8 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
      * Destroy the current operational topology data. Note a valid transaction must be provided.
      */
     private synchronized FluentFuture<? extends CommitInfo> destroyOperationalTopology() {
-        requireNonNull(this.chain, "A valid transaction chain must be provided.");
-        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
+        requireNonNull(chain, "A valid transaction chain must be provided.");
+        final WriteTransaction trans = chain.newWriteOnlyTransaction();
         trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
         final FluentFuture<? extends CommitInfo> future = trans.commit();
         future.addCallback(new FutureCallback<CommitInfo>() {
@@ -294,16 +294,17 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
      */
     private synchronized void initTransactionChain() {
         LOG.debug("Initializing transaction chain for topology {}", this);
-        Preconditions.checkState(this.chain == null,
+        Preconditions.checkState(chain == null,
                 "Transaction chain has to be closed before being initialized");
-        this.chain = this.dataProvider.createMergingTransactionChain(this);
+        chain = dataProvider.createMergingTransactionChain();
+        chain.addCallback(this);
     }
 
     /**
      * Destroy the current transaction chain.
      */
     private synchronized void destroyTransactionChain() {
-        if (this.chain != null) {
+        if (chain != null) {
             LOG.debug("Destroy transaction chain for topology {}", this);
             // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
             // and the transaction factory cannot be reopen even if we recreate the transaction chain
@@ -317,7 +318,7 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
 //                LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain,
 //                  getInstanceIdentifier());
 //            }
-            this.chain = null;
+            chain = null;
         }
     }
 
@@ -327,7 +328,7 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
      */
     @VisibleForTesting
     protected synchronized void resetListener() {
-        requireNonNull(this.listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
+        requireNonNull(listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
         LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
         // unregister current listener to prevent incoming data tree change first
         unregisterDataChangeListener();
@@ -370,13 +371,13 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
      */
     @VisibleForTesting
     protected synchronized boolean restartTransactionChainOnDemand() {
-        if (this.listenerScheduledRestartTime > 0) {
+        if (listenerScheduledRestartTime > 0) {
             // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener,
             // otherwise we should only reset the transaction chain
-            if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
+            if (System.currentTimeMillis() > listenerScheduledRestartTime) {
                 // reset the the restart timer
-                this.listenerScheduledRestartTime = 0;
-                this.listenerScheduledRestartEnforceCounter = 0;
+                listenerScheduledRestartTime = 0;
+                listenerScheduledRestartEnforceCounter = 0;
                 resetListener();
                 return true;
             }
@@ -388,29 +389,27 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
 
     @VisibleForTesting
     protected synchronized void scheduleListenerRestart() {
-        if (0 == this.listenerScheduledRestartTime) {
-            this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
-        } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
-            && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
+        if (0 == listenerScheduledRestartTime) {
+            listenerScheduledRestartTime = System.currentTimeMillis() + listenerResetLimitInMillsec;
+        } else if (System.currentTimeMillis() > listenerScheduledRestartTime
+            && ++listenerScheduledRestartEnforceCounter < listenerResetEnforceCounter) {
             // if the transaction failure happens again, we will delay the listener restart up to
             // #LISTENER_RESET_LIMIT_IN_MILLSEC times
-            this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
+            listenerScheduledRestartTime += listenerResetLimitInMillsec;
         }
         LOG.debug("A listener restart was scheduled at {} (current system time is {})",
-                this.listenerScheduledRestartTime, System.currentTimeMillis());
+                listenerScheduledRestartTime, System.currentTimeMillis());
     }
 
     @Override
-    public final synchronized void onTransactionChainFailed(final TransactionChain transactionChain,
-            final Transaction transaction, final Throwable cause) {
-        LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(),
-                transaction != null ? transaction.getIdentifier() : null, cause);
+    public final synchronized void onFailure(final Throwable cause) {
+        LOG.error("Topology builder for {} failed", getInstanceIdentifier(), cause);
         scheduleListenerRestart();
         restartTransactionChainOnDemand();
     }
 
     @Override
-    public final void onTransactionChainSuccessful(final TransactionChain transactionChain) {
+    public final void onSuccess(final Empty value) {
         LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
     }
 }