Re-create transaction chain upon failure in TopologyStatsProvider
[bgpcep.git] / pcep / topology / topology-stats / src / main / java / org / opendaylight / bgpcep / pcep / topology / stats / provider / TopologyStatsProviderImpl.java
index d3ff3eef50c8d5738b765bb256bf1b144e6c7bb5..d0289d3743ca9b4b5179cb5dd3ade35732ded915 100644 (file)
@@ -8,20 +8,22 @@
 package org.opendaylight.bgpcep.pcep.topology.stats.provider;
 
 import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.eclipse.jdt.annotation.NonNull;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.Transaction;
@@ -36,86 +38,127 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class TopologyStatsProviderImpl implements TransactionChainListener,
-        TopologySessionStatsRegistry, AutoCloseable {
-
+public final class TopologyStatsProviderImpl extends TimerTask
+        implements TransactionChainListener, TopologySessionStatsRegistry, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProviderImpl.class);
+
+    // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
+    // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
+    // deletion. The other weird thing is that this is concurrent set because of removals only -- additions are always
+    // protected by the lock.
+    //
+    // FIXME: This was introduced to remedy "instance-2" of BGPCEP-901. I think we should change statsMap so that it
+    //        tracks also the intent besides PcepSessionState -- that way we can mark 'we want to remove this' and
+    //        retry in face of failing transactions.
+    private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
     @GuardedBy("this")
     private final Map<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> statsMap = new HashMap<>();
-    private final DataBroker dataBroker;
-    private final int timeout;
+    // Note: null indicates we have been shut down
+    @GuardedBy("this")
+    private DataBroker dataBroker;
+    @GuardedBy("this")
     private TransactionChain transactionChain;
-    private ScheduledFuture<?> scheduleTask;
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-    private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    public TopologyStatsProviderImpl(final @NonNull DataBroker dataBroker, final int timeout) {
+    private TopologyStatsProviderImpl(final DataBroker dataBroker) {
         this.dataBroker = requireNonNull(dataBroker);
-        this.timeout = timeout;
     }
 
-    public synchronized void init() {
+    public static AutoCloseable createStarted(final DataBroker dataBroker, final Timer timer,
+            final int updateIntervalSeconds) {
         LOG.info("Initializing TopologyStatsProvider service.");
-        this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
-        final TimerTask task = new TimerTask() {
-            @Override
-            public void run() {
-                updatePcepStats();
-            }
-        };
+        final TopologyStatsProviderImpl ret = new TopologyStatsProviderImpl(dataBroker);
+        timer.scheduleAtFixedRate(ret, 0, TimeUnit.SECONDS.toMillis(updateIntervalSeconds));
+        return ret;
+    }
 
-        this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
+    @Override
+    public void close() throws InterruptedException, ExecutionException {
+        if (cancel()) {
+            LOG.info("Closing TopologyStatsProvider service.");
+            shutdown();
+        } else {
+            LOG.debug("TopologyStatsProvider already shut down");
+        }
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private synchronized void updatePcepStats() {
-        final WriteTransaction tx = TopologyStatsProviderImpl.this.transactionChain.newWriteOnlyTransaction();
+    private synchronized void shutdown() throws InterruptedException, ExecutionException {
+        // Try to get a transaction chain and indicate we are done
+        final TransactionChain chain = accessChain();
+        transactionChain = null;
+        dataBroker = null;
 
-        try {
-            for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
-                    : this.statsMap.entrySet()) {
-                final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
-                        .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
-                final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
-                        entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
-                tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
-            }
-            tx.commit().addCallback(new FutureCallback<CommitInfo>() {
-                @Override
-                public void onSuccess(final CommitInfo result) {
-                    LOG.debug("Successfully committed Topology stats update");
-                }
+        if (chain == null) {
+            // Belt & suspenders so we do not error out elsewhere
+            LOG.warn("Cannot acquire transaction chain, skipping cleanup");
+            return;
+        }
 
-                @Override
-                public void onFailure(final Throwable ex) {
-                    LOG.error("Failed to commit Topology stats update", ex);
-                }
-            }, MoreExecutors.directExecutor());
-        } catch (final Exception e) {
-            LOG.warn("Failed to prepare Tx for BGP stats update", e);
-            tx.cancel();
+        // Issue deletes for all registered stats
+        final WriteTransaction wTx = chain.newWriteOnlyTransaction();
+        for (final KeyedInstanceIdentifier<Node, NodeKey> statId : statsMap.keySet()) {
+            wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+        }
+        statsMap.clear();
+
+        // Fire the transaction commit ...
+        final FluentFuture<?> future = wTx.commit();
+        // ... close the transaction chain ...
+        chain.close();
+        // ... and wait for transaction commit to complete
+        LOG.debug("Awaiting finish of TopologyStatsProvider cleanup");
+        future.get();
+    }
+
+    @Holding("this")
+    private @Nullable TransactionChain accessChain() {
+        if (transactionChain == null && dataBroker != null) {
+            // Re-create the chain if we have not been shut down
+            transactionChain = dataBroker.createMergingTransactionChain(this);
         }
+        return transactionChain;
     }
 
     @Override
-    public synchronized void close() throws Exception {
-        if (closed.compareAndSet(false, true)) {
-            LOG.info("Closing TopologyStatsProvider service.");
-            this.scheduleTask.cancel(true);
-            final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
-            for (final KeyedInstanceIdentifier<Node, NodeKey> statId : this.statsMap.keySet()) {
-                wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public synchronized void run() {
+        final TransactionChain chain = accessChain();
+        if (chain == null) {
+            // Already closed, do not bother
+            return;
+        }
+
+        final WriteTransaction tx = chain.newWriteOnlyTransaction();
+        try {
+            for (Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry : statsMap.entrySet()) {
+                if (!statsPendingDelete.contains(entry.getKey())) {
+                    tx.put(LogicalDatastoreType.OPERATIONAL,
+                        entry.getKey().augmentation(PcepTopologyNodeStatsAug.class),
+                        new PcepTopologyNodeStatsAugBuilder()
+                            .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build())
+                            .build());
+                }
             }
-            wTx.commit().get();
-            this.statsMap.clear();
-            this.transactionChain.close();
-            this.scheduler.shutdown();
+        } catch (Exception e) {
+            LOG.warn("Failed to prepare Tx for PCEP stats update", e);
+            tx.cancel();
+            return;
         }
+
+        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+            @Override
+            public void onSuccess(final CommitInfo result) {
+                LOG.debug("Successfully committed Topology stats update");
+            }
+
+            @Override
+            public void onFailure(final Throwable ex) {
+                LOG.error("Failed to commit Topology stats update", ex);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
     @Override
@@ -123,10 +166,11 @@ public final class TopologyStatsProviderImpl implements TransactionChainListener
             final Transaction transaction, final Throwable cause) {
         LOG.error("Transaction chain {} failed for tx {}",
                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
+        chain.close();
 
-        if (!closed.get()) {
-            transactionChain.close();
-            transactionChain = dataBroker.createMergingTransactionChain(this);
+        // Do not access the transaction chain again, re-recreated it instead
+        if (chain == transactionChain) {
+            transactionChain = null;
         }
     }
 
@@ -138,18 +182,43 @@ public final class TopologyStatsProviderImpl implements TransactionChainListener
     @Override
     public synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
             final PcepSessionState sessionState) {
-        this.statsMap.put(nodeId, sessionState);
+        if (dataBroker != null) {
+            statsMap.put(nodeId, sessionState);
+        } else {
+            LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
+        }
     }
 
     @Override
     public synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
-        this.statsMap.remove(nodeId);
-        final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
-        try {
-            wTx.commit().get();
-        } catch (final InterruptedException | ExecutionException e) {
-            LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), e);
+        final TransactionChain chain = accessChain();
+        if (chain == null) {
+            // Already closed, do not bother
+            LOG.debug("Ignoring unbind of Pcep Node {}", nodeId);
+            return;
+        }
+
+        final PcepSessionState node = statsMap.remove(nodeId);
+        if (node == null) {
+            LOG.debug("Ignoring duplicate unbind of Pcep Node {}", nodeId);
+            return;
         }
+
+        statsPendingDelete.add(nodeId);
+        final WriteTransaction wTx = chain.newWriteOnlyTransaction();
+        wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
+        wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
+            @Override
+            public void onSuccess(final CommitInfo result) {
+                LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
+                statsPendingDelete.remove(nodeId);
+            }
+
+            @Override
+            public void onFailure(final Throwable ex) {
+                LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
+                statsPendingDelete.remove(nodeId);
+            }
+        }, MoreExecutors.directExecutor());
     }
 }