package org.opendaylight.bgpcep.pcep.topology.stats.provider;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.eclipse.jdt.annotation.NonNull;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.Transaction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class TopologyStatsProviderImpl implements TransactionChainListener,
- TopologySessionStatsRegistry, AutoCloseable {
-
+public final class TopologyStatsProviderImpl extends TimerTask
+ implements TransactionChainListener, TopologySessionStatsRegistry, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProviderImpl.class);
+
+ // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
+ // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
+ // deletion. The other weird thing is that this is concurrent set because of removals only -- additions are always
+ // protected by the lock.
+ //
+ // FIXME: This was introduced to remedy "instance-2" of BGPCEP-901. I think we should change statsMap so that it
+ // tracks also the intent besides PcepSessionState -- that way we can mark 'we want to remove this' and
+ // retry in face of failing transactions.
+ private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
@GuardedBy("this")
private final Map<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> statsMap = new HashMap<>();
- private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
- private final DataBroker dataBroker;
- private final int timeout;
+ // Note: null indicates we have been shut down
+ @GuardedBy("this")
+ private DataBroker dataBroker;
+ @GuardedBy("this")
private TransactionChain transactionChain;
- private ScheduledFuture<?> scheduleTask;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- private final AtomicBoolean closed = new AtomicBoolean(false);
- public TopologyStatsProviderImpl(final @NonNull DataBroker dataBroker, final int timeout) {
+ private TopologyStatsProviderImpl(final DataBroker dataBroker) {
this.dataBroker = requireNonNull(dataBroker);
- this.timeout = timeout;
}
- public synchronized void init() {
+ public static AutoCloseable createStarted(final DataBroker dataBroker, final Timer timer,
+ final int updateIntervalSeconds) {
LOG.info("Initializing TopologyStatsProvider service.");
- this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
- final TimerTask task = new TimerTask() {
- @Override
- public void run() {
- updatePcepStats();
- }
- };
+ final TopologyStatsProviderImpl ret = new TopologyStatsProviderImpl(dataBroker);
+ timer.scheduleAtFixedRate(ret, 0, TimeUnit.SECONDS.toMillis(updateIntervalSeconds));
+ return ret;
+ }
+
+ @Override
+ public void close() throws InterruptedException, ExecutionException {
+ if (cancel()) {
+ LOG.info("Closing TopologyStatsProvider service.");
+ shutdown();
+ } else {
+ LOG.debug("TopologyStatsProvider already shut down");
+ }
+ }
+
+ private synchronized void shutdown() throws InterruptedException, ExecutionException {
+ // Try to get a transaction chain and indicate we are done
+ final TransactionChain chain = accessChain();
+ transactionChain = null;
+ dataBroker = null;
+
+ if (chain == null) {
+ // Belt & suspenders so we do not error out elsewhere
+ LOG.warn("Cannot acquire transaction chain, skipping cleanup");
+ return;
+ }
- this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
+ // Issue deletes for all registered stats
+ final WriteTransaction wTx = chain.newWriteOnlyTransaction();
+ for (final KeyedInstanceIdentifier<Node, NodeKey> statId : statsMap.keySet()) {
+ wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+ }
+ statsMap.clear();
+
+ // Fire the transaction commit ...
+ final FluentFuture<?> future = wTx.commit();
+ // ... close the transaction chain ...
+ chain.close();
+ // ... and wait for transaction commit to complete
+ LOG.debug("Awaiting finish of TopologyStatsProvider cleanup");
+ future.get();
+ }
+
+ @Holding("this")
+ private @Nullable TransactionChain accessChain() {
+ if (transactionChain == null && dataBroker != null) {
+ // Re-create the chain if we have not been shut down
+ transactionChain = dataBroker.createMergingTransactionChain(this);
+ }
+ return transactionChain;
}
+ @Override
@SuppressWarnings("checkstyle:IllegalCatch")
- @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "https://github.com/spotbugs/spotbugs/issues/811")
- private synchronized void updatePcepStats() {
- final WriteTransaction tx = TopologyStatsProviderImpl.this.transactionChain.newWriteOnlyTransaction();
+ public synchronized void run() {
+ final TransactionChain chain = accessChain();
+ if (chain == null) {
+ // Already closed, do not bother
+ return;
+ }
+ final WriteTransaction tx = chain.newWriteOnlyTransaction();
try {
- for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
- : this.statsMap.entrySet()) {
- if (this.statsPendingDelete.contains(entry.getKey())) {
- continue;
+ for (Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry : statsMap.entrySet()) {
+ if (!statsPendingDelete.contains(entry.getKey())) {
+ tx.put(LogicalDatastoreType.OPERATIONAL,
+ entry.getKey().augmentation(PcepTopologyNodeStatsAug.class),
+ new PcepTopologyNodeStatsAugBuilder()
+ .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build())
+ .build());
}
- final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
- .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
- final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
- entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
- tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
}
- tx.commit().addCallback(new FutureCallback<CommitInfo>() {
- @Override
- public void onSuccess(final CommitInfo result) {
- LOG.debug("Successfully committed Topology stats update");
- }
-
- @Override
- public void onFailure(final Throwable ex) {
- LOG.error("Failed to commit Topology stats update", ex);
- }
- }, MoreExecutors.directExecutor());
- } catch (final Exception e) {
+ } catch (Exception e) {
LOG.warn("Failed to prepare Tx for PCEP stats update", e);
tx.cancel();
+ return;
}
- }
- @Override
- public synchronized void close() throws Exception {
- if (closed.compareAndSet(false, true)) {
- LOG.info("Closing TopologyStatsProvider service.");
- this.scheduleTask.cancel(true);
- final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
- for (final KeyedInstanceIdentifier<Node, NodeKey> statId : this.statsMap.keySet()) {
- wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.debug("Successfully committed Topology stats update");
}
- wTx.commit().get();
- this.statsMap.clear();
- this.transactionChain.close();
- this.scheduler.shutdown();
- }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.error("Failed to commit Topology stats update", ex);
+ }
+ }, MoreExecutors.directExecutor());
}
@Override
final Transaction transaction, final Throwable cause) {
LOG.error("Transaction chain {} failed for tx {}",
chain, transaction != null ? transaction.getIdentifier() : null, cause);
+ chain.close();
- if (!closed.get()) {
- transactionChain.close();
- transactionChain = dataBroker.createMergingTransactionChain(this);
+ // Do not access the transaction chain again, re-recreated it instead
+ if (chain == transactionChain) {
+ transactionChain = null;
}
}
@Override
public synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
final PcepSessionState sessionState) {
- this.statsMap.put(nodeId, sessionState);
+ if (dataBroker != null) {
+ statsMap.put(nodeId, sessionState);
+ } else {
+ LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
+ }
}
@Override
public synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
- this.statsMap.remove(nodeId);
- this.statsPendingDelete.add(nodeId);
- final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+ final TransactionChain chain = accessChain();
+ if (chain == null) {
+ // Already closed, do not bother
+ LOG.debug("Ignoring unbind of Pcep Node {}", nodeId);
+ return;
+ }
+
+ final PcepSessionState node = statsMap.remove(nodeId);
+ if (node == null) {
+ LOG.debug("Ignoring duplicate unbind of Pcep Node {}", nodeId);
+ return;
+ }
+
+ statsPendingDelete.add(nodeId);
+ final WriteTransaction wTx = chain.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
- TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
+ statsPendingDelete.remove(nodeId);
}
@Override
public void onFailure(final Throwable ex) {
LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
- TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
+ statsPendingDelete.remove(nodeId);
}
}, MoreExecutors.directExecutor());
}