Bump upstreams
[bgpcep.git] / bgp / topology-provider / src / main / java / org / opendaylight / bgpcep / bgp / topology / provider / AbstractTopologyBuilder.java
index 6bf77c715577257963ece631240914612524c545..2ecdcad958c4f8199b2bf0f7a1196d700c8a02ea 100644 (file)
@@ -7,50 +7,49 @@
  */
 package org.opendaylight.bgpcep.bgp.topology.provider;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.bgpcep.topology.TopologyReference;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.protocol.bgp.rib.RibReference;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.LocRib;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.AddressFamily;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.LocRib;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.SubsequentAddressFamily;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractTopologyBuilder<T extends Route> implements ClusteredDataTreeChangeListener<T>,
-    TopologyReference, TransactionChainListener {
+public abstract class AbstractTopologyBuilder<T extends Route>
+        implements ClusteredDataTreeChangeListener<T>, TopologyReference, FutureCallback<Empty> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
     // we limit the listener reset interval to be 5 min at most
     private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
@@ -58,74 +57,83 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     private final InstanceIdentifier<Topology> topology;
     private final RibReference locRibReference;
     private final DataBroker dataProvider;
-    private final Class<? extends AddressFamily> afi;
-    private final Class<? extends SubsequentAddressFamily> safi;
+    private final AddressFamily afi;
+    private final SubsequentAddressFamily safi;
     private final TopologyKey topologyKey;
     private final TopologyTypes topologyTypes;
     private final long listenerResetLimitInMillsec;
     private final int listenerResetEnforceCounter;
 
     @GuardedBy("this")
-    private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
-    @GuardedBy("this")
-    private BindingTransactionChain chain = null;
+    private Registration listenerRegistration = null;
     @GuardedBy("this")
-    private boolean closed = false;
+    private TransactionChain chain = null;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
     @GuardedBy("this")
     @VisibleForTesting
     protected long listenerScheduledRestartTime = 0;
     @GuardedBy("this")
     @VisibleForTesting
     protected int listenerScheduledRestartEnforceCounter = 0;
+    protected boolean networkTopologyTransaction = true;
 
     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
-            final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
-        final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
-        final int listenerResetEnforceCounter) {
+            final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+            final SubsequentAddressFamily safi, final long listenerResetLimitInMillsec,
+            final int listenerResetEnforceCounter) {
         this.dataProvider = dataProvider;
-        this.locRibReference = Preconditions.checkNotNull(locRibReference);
-        this.topologyKey = new TopologyKey(Preconditions.checkNotNull(topologyId));
-        this.topologyTypes = types;
+        this.locRibReference = requireNonNull(locRibReference);
+        topologyKey = new TopologyKey(requireNonNull(topologyId));
+        topologyTypes = types;
         this.afi = afi;
         this.safi = safi;
         this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
         this.listenerResetEnforceCounter = listenerResetEnforceCounter;
-        this.topology = InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, this.topologyKey).build();
+        topology = InstanceIdentifier.builder(NetworkTopology.class)
+                .child(Topology.class, topologyKey)
+                .build();
     }
 
     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
-        final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
-        final Class<? extends SubsequentAddressFamily> safi) {
-        this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC, LISTENER_RESET_ENFORCE_COUNTER);
+        final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+        final SubsequentAddressFamily safi) {
+        this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
+                LISTENER_RESET_ENFORCE_COUNTER);
     }
 
     public final synchronized void start() {
-        LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology, this.afi, this.safi);
+        LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", locRibReference, topology,
+                afi, safi);
         initTransactionChain();
         initOperationalTopology();
         registerDataChangeListener();
     }
 
     /**
-     * Register to data tree change listener
+     * Register to data tree change listener.
      */
     private synchronized void registerDataChangeListener() {
-        Preconditions.checkState(this.listenerRegistration == null, "Topology Listener on topology %s has been registered before.", this.getInstanceIdentifier());
-        final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier().child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
-        final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, getRouteWildcard(tablesId));
+        Preconditions.checkState(listenerRegistration == null,
+                "Topology Listener on topology %s has been registered before.",
+                this.getInstanceIdentifier());
+        final InstanceIdentifier<Tables> tablesId = locRibReference.getInstanceIdentifier()
+                .child(LocRib.class).child(Tables.class, new TablesKey(afi, safi));
+        final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
+                getRouteWildcard(tablesId));
 
-        this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
-        LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(), this.listenerScheduledRestartTime);
+        listenerRegistration = dataProvider.registerDataTreeChangeListener(id, this);
+        LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(),
+                listenerScheduledRestartTime);
     }
 
     /**
-     * Unregister to data tree change listener
+     * Unregister to data tree change listener.
      */
-    private final synchronized void unregisterDataChangeListener() {
-        if (this.listenerRegistration != null) {
+    private synchronized void unregisterDataChangeListener() {
+        if (listenerRegistration != null) {
             LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
-            this.listenerRegistration.close();
-            this.listenerRegistration = null;
+            listenerRegistration.close();
+            listenerRegistration = null;
         }
     }
 
@@ -139,129 +147,142 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
 
     @Override
     public final InstanceIdentifier<Topology> getInstanceIdentifier() {
-        return this.topology;
+        return topology;
     }
 
-    public final synchronized ListenableFuture<Void> close() {
-        if (this.closed) {
+    public final synchronized FluentFuture<? extends CommitInfo> close() {
+        if (closed.getAndSet(true)) {
             LOG.trace("Transaction chain was already closed.");
-            Futures.immediateFuture(null);
+            return CommitInfo.emptyFluentFuture();
         }
-        this.closed = true;
         LOG.info("Shutting down builder for {}", getInstanceIdentifier());
         unregisterDataChangeListener();
-        final ListenableFuture<Void> future = destroyOperationalTopology();
+        final FluentFuture<? extends CommitInfo> future = destroyOperationalTopology();
         destroyTransactionChain();
         return future;
     }
 
     @Override
-    public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
-        if (this.closed) {
-            LOG.trace("Transaction chain was already closed, skipping update.");
-            return;
-        }
-        // check if the transaction chain needed to be restarted due to a previous error
-        if (restartTransactionChainOnDemand()) {
-            LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
-            return;
-        }
-        final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
-        LOG.debug("Received data change {} event with transaction {}", changes, trans.getIdentifier());
-        final AtomicBoolean transactionInError = new AtomicBoolean(false);
-        for (final DataTreeModification<T> change : changes) {
-            try {
-                routeChanged(change, trans);
-            } catch (final RuntimeException e) {
-                LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change, trans.getIdentifier(), this, e);
-                // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
-                // trans.submit() must be called first to unlock the current transaction chain, to make the chain closable
-                // so we cannot exit the #onDataTreeChanged() yet
-                transactionInError.set(true);
-                break;
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public synchronized void onDataTreeChanged(final List<DataTreeModification<T>> changes) {
+        if (networkTopologyTransaction) {
+            if (closed.get()) {
+                LOG.trace("Transaction chain was already closed, skipping update.");
+                return;
             }
-        }
-        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                // as we are enforcing trans.submit(), in some cases the transaction execution actually could be successfully even when an
-                // exception is captured, thus #onTransactionChainFailed() never get invoked. Though the transaction chain remains usable,
-                // the data loss will not be able to be recovered. Thus we schedule a listener restart here
-                if (transactionInError.get()) {
-                    LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart of listener {}", trans
-                        .getIdentifier(), AbstractTopologyBuilder.this);
-                    scheduleListenerRestart();
-                } else {
-                    LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+            // check if the transaction chain needed to be restarted due to a previous error
+            if (restartTransactionChainOnDemand()) {
+                LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
+                return;
+            }
+            final ReadWriteTransaction trans = chain.newReadWriteTransaction();
+            LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
+            final AtomicBoolean transactionInError = new AtomicBoolean(false);
+            for (final DataTreeModification<T> change : changes) {
+                try {
+                    routeChanged(change, trans);
+                } catch (final RuntimeException exc) {
+                    LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
+                            trans.getIdentifier(), this, exc);
+                    // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic
+                    // change.
+                    // trans.commit() must be called first to unlock the current transaction chain, to make the chain
+                    // closable so we cannot exit the #onDataTreeChanged() yet
+                    transactionInError.set(true);
+                    break;
                 }
             }
+            trans.commit().addCallback(new FutureCallback<CommitInfo>() {
+                @Override
+                public void onSuccess(final CommitInfo result) {
+                    // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
+                    // successfully even when an exception is captured, thus #onTransactionChainFailed() never get
+                    // invoked. Though the transaction chain remains usable,
+                    // the data loss will not be able to be recovered. Thus we schedule a listener restart here
+                    if (transactionInError.get()) {
+                        LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a"
+                                + " restart of listener {}", trans
+                            .getIdentifier(), AbstractTopologyBuilder.this);
+                        scheduleListenerRestart();
+                    } else {
+                        LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+                    }
+                }
 
-            @Override
-            public void onFailure(final Throwable t) {
-                // we do nothing but print out the log. Transaction chain restart will be done in #onTransactionChainFailed()
-                LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(), AbstractTopologyBuilder.this, t);
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    // we do nothing but print out the log. Transaction chain restart will be done in
+                    // #onTransactionChainFailed()
+                    LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
+                            AbstractTopologyBuilder.this, throwable);
+                }
+            }, MoreExecutors.directExecutor());
+        } else {
+            for (final DataTreeModification<T> change : changes) {
+                routeChanged(change, null);
             }
-        }, MoreExecutors.directExecutor());
+        }
     }
 
     @VisibleForTesting
     protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
         final DataObjectModification<T> root = change.getRootNode();
         switch (root.getModificationType()) {
-        case DELETE:
-            removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
-            break;
-        case SUBTREE_MODIFIED:
-        case WRITE:
-            if (root.getDataBefore() != null) {
+            case DELETE:
                 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
-            }
-            createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
-            break;
-        default:
-            throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
+                break;
+            case SUBTREE_MODIFIED:
+            case WRITE:
+                if (root.getDataBefore() != null) {
+                    removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
+                }
+                createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
+                break;
+            default:
+                throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
         }
     }
 
     private synchronized void initOperationalTopology() {
-        Preconditions.checkNotNull(this.chain, "A valid transaction chain must be provided.");
-        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
-        trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
-            new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE).setTopologyTypes(this.topologyTypes)
-                                 .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build(), true);
-        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
+        requireNonNull(chain, "A valid transaction chain must be provided.");
+        final WriteTransaction trans = chain.newWriteOnlyTransaction();
+        trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, topology,
+                new TopologyBuilder().withKey(topologyKey).setServerProvided(Boolean.TRUE)
+                        .setTopologyTypes(topologyTypes)
+                        .setLink(Map.of()).setNode(Map.of()).build());
+        trans.commit().addCallback(new FutureCallback<CommitInfo>() {
             @Override
-            public void onSuccess(final Void result) {
+            public void onSuccess(final CommitInfo result) {
                 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
             }
 
             @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("Failed to initialize topology {} (transaction {}) by listener {}", AbstractTopologyBuilder.this.topology,
-                    trans.getIdentifier(), AbstractTopologyBuilder.this, t);
+            public void onFailure(final Throwable throwable) {
+                LOG.error("Failed to initialize topology {} (transaction {}) by listener {}",
+                        AbstractTopologyBuilder.this.topology,
+                        trans.getIdentifier(), AbstractTopologyBuilder.this, throwable);
             }
         }, MoreExecutors.directExecutor());
     }
 
     /**
-     * Destroy the current operational topology data. Note a valid transaction must be provided
-     * @throws TransactionCommitFailedException
+     * Destroy the current operational topology data. Note a valid transaction must be provided.
      */
-    private synchronized ListenableFuture<Void> destroyOperationalTopology() {
-        Preconditions.checkNotNull(this.chain, "A valid transaction chain must be provided.");
-        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
+    private synchronized FluentFuture<? extends CommitInfo> destroyOperationalTopology() {
+        requireNonNull(chain, "A valid transaction chain must be provided.");
+        final WriteTransaction trans = chain.newWriteOnlyTransaction();
         trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
-        final ListenableFuture<Void> future = trans.submit();
-        Futures.addCallback(future, new FutureCallback<Void>() {
+        final FluentFuture<? extends CommitInfo> future = trans.commit();
+        future.addCallback(new FutureCallback<CommitInfo>() {
             @Override
-            public void onSuccess(final Void result) {
+            public void onSuccess(final CommitInfo result) {
                 LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
             }
 
             @Override
-            public void onFailure(final Throwable t) {
+            public void onFailure(final Throwable throwable) {
                 LOG.error("Unable to reset operational topology {} (transaction {})",
-                    AbstractTopologyBuilder.this.topology, trans.getIdentifier(), t);
+                    AbstractTopologyBuilder.this.topology, trans.getIdentifier(), throwable);
             }
         }, MoreExecutors.directExecutor());
         clearTopology();
@@ -269,19 +290,21 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     }
 
     /**
-     * Reset a transaction chain by closing the current chain and starting a new one
+     * Reset a transaction chain by closing the current chain and starting a new one.
      */
     private synchronized void initTransactionChain() {
         LOG.debug("Initializing transaction chain for topology {}", this);
-        Preconditions.checkState(this.chain == null, "Transaction chain has to be closed before being initialized");
-        this.chain = this.dataProvider.createTransactionChain(this);
+        Preconditions.checkState(chain == null,
+                "Transaction chain has to be closed before being initialized");
+        chain = dataProvider.createMergingTransactionChain();
+        chain.addCallback(this);
     }
 
     /**
-     * Destroy the current transaction chain
+     * Destroy the current transaction chain.
      */
     private synchronized void destroyTransactionChain() {
-        if (this.chain != null) {
+        if (chain != null) {
             LOG.debug("Destroy transaction chain for topology {}", this);
             // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
             // and the transaction factory cannot be reopen even if we recreate the transaction chain
@@ -292,19 +315,20 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
 //                this.chain.close();
 //            } catch (Exception e) {
 //                // the close() may not succeed when the transaction chain is locked
-//                LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain, getInstanceIdentifier());
+//                LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain,
+//                  getInstanceIdentifier());
 //            }
-            this.chain = null;
+            chain = null;
         }
     }
 
     /**
-     * Reset the data change listener to its initial status
+     * Reset the data change listener to its initial status.
      * By resetting the listener we will be able to recover all the data lost before
      */
     @VisibleForTesting
     protected synchronized void resetListener() {
-        Preconditions.checkNotNull(this.listenerRegistration, "Listener on topology %s hasn't been initialized.", this);
+        requireNonNull(listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
         LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
         // unregister current listener to prevent incoming data tree change first
         unregisterDataChangeListener();
@@ -332,14 +356,13 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
     /**
      * There are a few reasons we want to schedule a listener restart in a delayed manner:
      * 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
-     *    rebuilding the whole linkstate topology again and again
+     * rebuilding the whole linkstate topology again and again
      * 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
-     *    data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
-     *    is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
-     *    will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
-     *    will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
-     *    little overhead and it's okay to be restarted within a small time window
-     *
+     * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
+     * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
+     * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
+     * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
+     * little overhead and it's okay to be restarted within a small time window.
      * Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
      * done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
      * complete set of existing changes
@@ -348,12 +371,13 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
      */
     @VisibleForTesting
     protected synchronized boolean restartTransactionChainOnDemand() {
-        if (this.listenerScheduledRestartTime > 0) {
-            // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener, otherwise we should only reset the transaction chain
-            if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
+        if (listenerScheduledRestartTime > 0) {
+            // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener,
+            // otherwise we should only reset the transaction chain
+            if (System.currentTimeMillis() > listenerScheduledRestartTime) {
                 // reset the the restart timer
-                this.listenerScheduledRestartTime = 0;
-                this.listenerScheduledRestartEnforceCounter = 0;
+                listenerScheduledRestartTime = 0;
+                listenerScheduledRestartEnforceCounter = 0;
                 resetListener();
                 return true;
             }
@@ -365,25 +389,27 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements Cluste
 
     @VisibleForTesting
     protected synchronized void scheduleListenerRestart() {
-        if (0 == this.listenerScheduledRestartTime) {
-            this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
-        } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
-            && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
-            // if the transaction failure happens again, we will delay the listener restart up to #LISTENER_RESET_LIMIT_IN_MILLSEC times
-            this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
+        if (0 == listenerScheduledRestartTime) {
+            listenerScheduledRestartTime = System.currentTimeMillis() + listenerResetLimitInMillsec;
+        } else if (System.currentTimeMillis() > listenerScheduledRestartTime
+            && ++listenerScheduledRestartEnforceCounter < listenerResetEnforceCounter) {
+            // if the transaction failure happens again, we will delay the listener restart up to
+            // #LISTENER_RESET_LIMIT_IN_MILLSEC times
+            listenerScheduledRestartTime += listenerResetLimitInMillsec;
         }
-        LOG.debug("A listener restart was scheduled at {} (current system time is {})", this.listenerScheduledRestartTime, System.currentTimeMillis());
+        LOG.debug("A listener restart was scheduled at {} (current system time is {})",
+                listenerScheduledRestartTime, System.currentTimeMillis());
     }
 
     @Override
-    public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-        LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(), transaction != null ? transaction.getIdentifier() : null, cause);
+    public final synchronized void onFailure(final Throwable cause) {
+        LOG.error("Topology builder for {} failed", getInstanceIdentifier(), cause);
         scheduleListenerRestart();
         restartTransactionChainOnDemand();
     }
 
     @Override
-    public final void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+    public final void onSuccess(final Empty value) {
         LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
     }
 }