import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.bgpcep.topology.TopologyReference;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.protocol.bgp.rib.RibReference;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.LocRib;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.AddressFamily;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.LocRib;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.SubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@GuardedBy("this")
private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
@GuardedBy("this")
- private BindingTransactionChain chain = null;
- @GuardedBy("this")
- private boolean closed = false;
+ private TransactionChain chain = null;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
@GuardedBy("this")
@VisibleForTesting
protected long listenerScheduledRestartTime = 0;
this.safi = safi;
this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
this.listenerResetEnforceCounter = listenerResetEnforceCounter;
- this.topology = InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, this.topologyKey).build();
+ this.topology = InstanceIdentifier.builder(NetworkTopology.class)
+ .child(Topology.class, this.topologyKey).build();
}
protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
final Class<? extends SubsequentAddressFamily> safi) {
- this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC, LISTENER_RESET_ENFORCE_COUNTER);
+ this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
+ LISTENER_RESET_ENFORCE_COUNTER);
}
public final synchronized void start() {
- LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology, this.afi, this.safi);
+ LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology,
+ this.afi, this.safi);
initTransactionChain();
initOperationalTopology();
registerDataChangeListener();
}
/**
- * Register to data tree change listener
+ * Register to data tree change listener.
*/
private synchronized void registerDataChangeListener() {
- Preconditions.checkState(this.listenerRegistration == null, "Topology Listener on topology %s has been registered before.", this.getInstanceIdentifier());
- final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier().child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
- final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, getRouteWildcard(tablesId));
+ Preconditions.checkState(this.listenerRegistration == null,
+ "Topology Listener on topology %s has been registered before.",
+ this.getInstanceIdentifier());
+ final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier()
+ .child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
+ final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
+ getRouteWildcard(tablesId));
this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
- LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(), this.listenerScheduledRestartTime);
+ LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(),
+ this.listenerScheduledRestartTime);
}
/**
- * Unregister to data tree change listener
+ * Unregister to data tree change listener.
*/
- private final synchronized void unregisterDataChangeListener() {
+ private synchronized void unregisterDataChangeListener() {
if (this.listenerRegistration != null) {
LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
this.listenerRegistration.close();
return this.topology;
}
- public final synchronized ListenableFuture<Void> close() {
- if (this.closed) {
+ public final synchronized FluentFuture<? extends CommitInfo> close() {
+ if (this.closed.getAndSet(true)) {
LOG.trace("Transaction chain was already closed.");
- Futures.immediateFuture(null);
+ return CommitInfo.emptyFluentFuture();
}
- this.closed = true;
LOG.info("Shutting down builder for {}", getInstanceIdentifier());
unregisterDataChangeListener();
- final ListenableFuture<Void> future = destroyOperationalTopology();
+ final FluentFuture<? extends CommitInfo> future = destroyOperationalTopology();
destroyTransactionChain();
return future;
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
- if (this.closed) {
+ if (this.closed.get()) {
LOG.trace("Transaction chain was already closed, skipping update.");
return;
}
return;
}
final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
- LOG.debug("Received data change {} event with transaction {}", changes, trans.getIdentifier());
+ LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
final AtomicBoolean transactionInError = new AtomicBoolean(false);
for (final DataTreeModification<T> change : changes) {
try {
routeChanged(change, trans);
- } catch (final RuntimeException e) {
- LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change, trans.getIdentifier(), this, e);
+ } catch (final RuntimeException exc) {
+ LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
+ trans.getIdentifier(), this, exc);
// trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
- // trans.submit() must be called first to unlock the current transaction chain, to make the chain closable
- // so we cannot exit the #onDataTreeChanged() yet
+ // trans.commit() must be called first to unlock the current transaction chain, to make the chain
+ // closable so we cannot exit the #onDataTreeChanged() yet
transactionInError.set(true);
break;
}
}
- Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
+ trans.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
- // as we are enforcing trans.submit(), in some cases the transaction execution actually could be successfully even when an
- // exception is captured, thus #onTransactionChainFailed() never get invoked. Though the transaction chain remains usable,
+ public void onSuccess(final CommitInfo result) {
+ // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
+ // successfully even when an exception is captured, thus #onTransactionChainFailed() never get invoked.
+ // Though the transaction chain remains usable,
// the data loss will not be able to be recovered. Thus we schedule a listener restart here
if (transactionInError.get()) {
- LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart of listener {}", trans
+ LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart"
+ + " of listener {}", trans
.getIdentifier(), AbstractTopologyBuilder.this);
scheduleListenerRestart();
} else {
}
@Override
- public void onFailure(final Throwable t) {
- // we do nothing but print out the log. Transaction chain restart will be done in #onTransactionChainFailed()
- LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(), AbstractTopologyBuilder.this, t);
+ public void onFailure(final Throwable throwable) {
+ // we do nothing but print out the log. Transaction chain restart will be done in
+ // #onTransactionChainFailed()
+ LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
+ AbstractTopologyBuilder.this, throwable);
}
}, MoreExecutors.directExecutor());
}
protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
final DataObjectModification<T> root = change.getRootNode();
switch (root.getModificationType()) {
- case DELETE:
- removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
- break;
- case SUBTREE_MODIFIED:
- case WRITE:
- if (root.getDataBefore() != null) {
+ case DELETE:
removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
- }
- createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
- break;
- default:
- throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
+ break;
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ if (root.getDataBefore() != null) {
+ removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
+ }
+ createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
}
}
requireNonNull(this.chain, "A valid transaction chain must be provided.");
final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
- new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE).setTopologyTypes(this.topologyTypes)
- .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build(), true);
- Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
+ new TopologyBuilder().withKey(this.topologyKey).setServerProvided(Boolean.TRUE)
+ .setTopologyTypes(this.topologyTypes)
+ .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build(), true);
+ trans.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
}
@Override
- public void onFailure(final Throwable t) {
- LOG.error("Failed to initialize topology {} (transaction {}) by listener {}", AbstractTopologyBuilder.this.topology,
- trans.getIdentifier(), AbstractTopologyBuilder.this, t);
+ public void onFailure(final Throwable throwable) {
+ LOG.error("Failed to initialize topology {} (transaction {}) by listener {}",
+ AbstractTopologyBuilder.this.topology,
+ trans.getIdentifier(), AbstractTopologyBuilder.this, throwable);
}
}, MoreExecutors.directExecutor());
}
/**
- * Destroy the current operational topology data. Note a valid transaction must be provided
- * @throws TransactionCommitFailedException
+ * Destroy the current operational topology data. Note a valid transaction must be provided.
*/
- private synchronized ListenableFuture<Void> destroyOperationalTopology() {
+ private synchronized FluentFuture<? extends CommitInfo> destroyOperationalTopology() {
requireNonNull(this.chain, "A valid transaction chain must be provided.");
final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
- final ListenableFuture<Void> future = trans.submit();
- Futures.addCallback(future, new FutureCallback<Void>() {
+ final FluentFuture<? extends CommitInfo> future = trans.commit();
+ future.addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
}
@Override
- public void onFailure(final Throwable t) {
+ public void onFailure(final Throwable throwable) {
LOG.error("Unable to reset operational topology {} (transaction {})",
- AbstractTopologyBuilder.this.topology, trans.getIdentifier(), t);
+ AbstractTopologyBuilder.this.topology, trans.getIdentifier(), throwable);
}
}, MoreExecutors.directExecutor());
clearTopology();
}
/**
- * Reset a transaction chain by closing the current chain and starting a new one
+ * Reset a transaction chain by closing the current chain and starting a new one.
*/
private synchronized void initTransactionChain() {
LOG.debug("Initializing transaction chain for topology {}", this);
- Preconditions.checkState(this.chain == null, "Transaction chain has to be closed before being initialized");
- this.chain = this.dataProvider.createTransactionChain(this);
+ Preconditions.checkState(this.chain == null,
+ "Transaction chain has to be closed before being initialized");
+ this.chain = this.dataProvider.createMergingTransactionChain(this);
}
/**
- * Destroy the current transaction chain
+ * Destroy the current transaction chain.
*/
private synchronized void destroyTransactionChain() {
if (this.chain != null) {
// this.chain.close();
// } catch (Exception e) {
// // the close() may not succeed when the transaction chain is locked
-// LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain, getInstanceIdentifier());
+// LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain,
+// getInstanceIdentifier());
// }
this.chain = null;
}
}
/**
- * Reset the data change listener to its initial status
+ * Reset the data change listener to its initial status.
* By resetting the listener we will be able to recover all the data lost before
*/
@VisibleForTesting
/**
* There are a few reasons we want to schedule a listener restart in a delayed manner:
* 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
- * rebuilding the whole linkstate topology again and again
+ * rebuilding the whole linkstate topology again and again
* 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
- * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
- * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
- * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
- * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
- * little overhead and it's okay to be restarted within a small time window
- *
+ * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
+ * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
+ * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
+ * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
+ * little overhead and it's okay to be restarted within a small time window.
* Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
* done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
* complete set of existing changes
@VisibleForTesting
protected synchronized boolean restartTransactionChainOnDemand() {
if (this.listenerScheduledRestartTime > 0) {
- // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener, otherwise we should only reset the transaction chain
+ // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener,
+ // otherwise we should only reset the transaction chain
if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
// reset the the restart timer
this.listenerScheduledRestartTime = 0;
this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
} else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
&& ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
- // if the transaction failure happens again, we will delay the listener restart up to #LISTENER_RESET_LIMIT_IN_MILLSEC times
+ // if the transaction failure happens again, we will delay the listener restart up to
+ // #LISTENER_RESET_LIMIT_IN_MILLSEC times
this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
}
- LOG.debug("A listener restart was scheduled at {} (current system time is {})", this.listenerScheduledRestartTime, System.currentTimeMillis());
+ LOG.debug("A listener restart was scheduled at {} (current system time is {})",
+ this.listenerScheduledRestartTime, System.currentTimeMillis());
}
@Override
- public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
- LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(), transaction != null ? transaction.getIdentifier() : null, cause);
+ public final synchronized void onTransactionChainFailed(final TransactionChain transactionChain,
+ final Transaction transaction, final Throwable cause) {
+ LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(),
+ transaction != null ? transaction.getIdentifier() : null, cause);
scheduleListenerRestart();
restartTransactionChainOnDemand();
}
@Override
- public final void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+ public final void onTransactionChainSuccessful(final TransactionChain transactionChain) {
LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
}
}