import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
import org.opendaylight.mdsal.binding.api.DataTreeModification;
import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
-import org.opendaylight.mdsal.binding.api.Transaction;
import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractTopologyBuilder<T extends Route> implements ClusteredDataTreeChangeListener<T>,
- TopologyReference, TransactionChainListener {
+public abstract class AbstractTopologyBuilder<T extends Route>
+ implements ClusteredDataTreeChangeListener<T>, TopologyReference, FutureCallback<Empty> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
// we limit the listener reset interval to be 5 min at most
private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
private final int listenerResetEnforceCounter;
@GuardedBy("this")
- private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
+ private Registration listenerRegistration = null;
@GuardedBy("this")
private TransactionChain chain = null;
private final AtomicBoolean closed = new AtomicBoolean(false);
final int listenerResetEnforceCounter) {
this.dataProvider = dataProvider;
this.locRibReference = requireNonNull(locRibReference);
- this.topologyKey = new TopologyKey(requireNonNull(topologyId));
- this.topologyTypes = types;
+ topologyKey = new TopologyKey(requireNonNull(topologyId));
+ topologyTypes = types;
this.afi = afi;
this.safi = safi;
this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
this.listenerResetEnforceCounter = listenerResetEnforceCounter;
- this.topology = InstanceIdentifier.builder(NetworkTopology.class)
- .child(Topology.class, this.topologyKey).build();
+ topology = InstanceIdentifier.builder(NetworkTopology.class)
+ .child(Topology.class, topologyKey)
+ .build();
}
protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
}
public final synchronized void start() {
- LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology,
- this.afi, this.safi);
+ LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", locRibReference, topology,
+ afi, safi);
initTransactionChain();
initOperationalTopology();
registerDataChangeListener();
* Register to data tree change listener.
*/
private synchronized void registerDataChangeListener() {
- Preconditions.checkState(this.listenerRegistration == null,
+ Preconditions.checkState(listenerRegistration == null,
"Topology Listener on topology %s has been registered before.",
this.getInstanceIdentifier());
- final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier()
- .child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
+ final InstanceIdentifier<Tables> tablesId = locRibReference.getInstanceIdentifier()
+ .child(LocRib.class).child(Tables.class, new TablesKey(afi, safi));
final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
getRouteWildcard(tablesId));
- this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
+ listenerRegistration = dataProvider.registerDataTreeChangeListener(id, this);
LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(),
- this.listenerScheduledRestartTime);
+ listenerScheduledRestartTime);
}
/**
* Unregister to data tree change listener.
*/
private synchronized void unregisterDataChangeListener() {
- if (this.listenerRegistration != null) {
+ if (listenerRegistration != null) {
LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
- this.listenerRegistration.close();
- this.listenerRegistration = null;
+ listenerRegistration.close();
+ listenerRegistration = null;
}
}
@Override
public final InstanceIdentifier<Topology> getInstanceIdentifier() {
- return this.topology;
+ return topology;
}
public final synchronized FluentFuture<? extends CommitInfo> close() {
- if (this.closed.getAndSet(true)) {
+ if (closed.getAndSet(true)) {
LOG.trace("Transaction chain was already closed.");
return CommitInfo.emptyFluentFuture();
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
+ public synchronized void onDataTreeChanged(final List<DataTreeModification<T>> changes) {
if (networkTopologyTransaction) {
- if (this.closed.get()) {
+ if (closed.get()) {
LOG.trace("Transaction chain was already closed, skipping update.");
return;
}
LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
return;
}
- final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
+ final ReadWriteTransaction trans = chain.newReadWriteTransaction();
LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
final AtomicBoolean transactionInError = new AtomicBoolean(false);
for (final DataTreeModification<T> change : changes) {
}
private synchronized void initOperationalTopology() {
- requireNonNull(this.chain, "A valid transaction chain must be provided.");
- final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
- trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, this.topology,
- new TopologyBuilder().withKey(this.topologyKey).setServerProvided(Boolean.TRUE)
- .setTopologyTypes(this.topologyTypes)
+ requireNonNull(chain, "A valid transaction chain must be provided.");
+ final WriteTransaction trans = chain.newWriteOnlyTransaction();
+ trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, topology,
+ new TopologyBuilder().withKey(topologyKey).setServerProvided(Boolean.TRUE)
+ .setTopologyTypes(topologyTypes)
.setLink(Map.of()).setNode(Map.of()).build());
trans.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
* Destroy the current operational topology data. Note a valid transaction must be provided.
*/
private synchronized FluentFuture<? extends CommitInfo> destroyOperationalTopology() {
- requireNonNull(this.chain, "A valid transaction chain must be provided.");
- final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
+ requireNonNull(chain, "A valid transaction chain must be provided.");
+ final WriteTransaction trans = chain.newWriteOnlyTransaction();
trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
final FluentFuture<? extends CommitInfo> future = trans.commit();
future.addCallback(new FutureCallback<CommitInfo>() {
*/
private synchronized void initTransactionChain() {
LOG.debug("Initializing transaction chain for topology {}", this);
- Preconditions.checkState(this.chain == null,
+ Preconditions.checkState(chain == null,
"Transaction chain has to be closed before being initialized");
- this.chain = this.dataProvider.createMergingTransactionChain(this);
+ chain = dataProvider.createMergingTransactionChain();
+ chain.addCallback(this);
}
/**
* Destroy the current transaction chain.
*/
private synchronized void destroyTransactionChain() {
- if (this.chain != null) {
+ if (chain != null) {
LOG.debug("Destroy transaction chain for topology {}", this);
// we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
// and the transaction factory cannot be reopen even if we recreate the transaction chain
// LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain,
// getInstanceIdentifier());
// }
- this.chain = null;
+ chain = null;
}
}
*/
@VisibleForTesting
protected synchronized void resetListener() {
- requireNonNull(this.listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
+ requireNonNull(listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
// unregister current listener to prevent incoming data tree change first
unregisterDataChangeListener();
*/
@VisibleForTesting
protected synchronized boolean restartTransactionChainOnDemand() {
- if (this.listenerScheduledRestartTime > 0) {
+ if (listenerScheduledRestartTime > 0) {
// when the #this.listenerScheduledRestartTime timer timed out we can reset the listener,
// otherwise we should only reset the transaction chain
- if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
+ if (System.currentTimeMillis() > listenerScheduledRestartTime) {
// reset the the restart timer
- this.listenerScheduledRestartTime = 0;
- this.listenerScheduledRestartEnforceCounter = 0;
+ listenerScheduledRestartTime = 0;
+ listenerScheduledRestartEnforceCounter = 0;
resetListener();
return true;
}
@VisibleForTesting
protected synchronized void scheduleListenerRestart() {
- if (0 == this.listenerScheduledRestartTime) {
- this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
- } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
- && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
+ if (0 == listenerScheduledRestartTime) {
+ listenerScheduledRestartTime = System.currentTimeMillis() + listenerResetLimitInMillsec;
+ } else if (System.currentTimeMillis() > listenerScheduledRestartTime
+ && ++listenerScheduledRestartEnforceCounter < listenerResetEnforceCounter) {
// if the transaction failure happens again, we will delay the listener restart up to
// #LISTENER_RESET_LIMIT_IN_MILLSEC times
- this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
+ listenerScheduledRestartTime += listenerResetLimitInMillsec;
}
LOG.debug("A listener restart was scheduled at {} (current system time is {})",
- this.listenerScheduledRestartTime, System.currentTimeMillis());
+ listenerScheduledRestartTime, System.currentTimeMillis());
}
@Override
- public final synchronized void onTransactionChainFailed(final TransactionChain transactionChain,
- final Transaction transaction, final Throwable cause) {
- LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(),
- transaction != null ? transaction.getIdentifier() : null, cause);
+ public final synchronized void onFailure(final Throwable cause) {
+ LOG.error("Topology builder for {} failed", getInstanceIdentifier(), cause);
scheduleListenerRestart();
restartTransactionChainOnDemand();
}
@Override
- public final void onTransactionChainSuccessful(final TransactionChain transactionChain) {
+ public final void onSuccess(final Empty value) {
LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
}
}