Re-create transaction chain upon failure in TopologyStatsProvider 35/96035/1
authorOleksii Mozghovyi <oleksii.mozghovyi@pantheon.tech>
Tue, 27 Apr 2021 17:16:18 +0000 (20:16 +0300)
committerRobert Varga <nite@hq.sk>
Mon, 10 May 2021 16:39:09 +0000 (16:39 +0000)
Transaction failures can occur as a side-effect of various datastore
failure scenarios. In case of TopologyStatsProviderImpl, such a failure
requires manual intervention to restart the component (and thus
re-establish exports).

Rework transaction chain lifecycle, so that we close it down on failures
and establish a new chain when we need it for the periodic updates
and/or unbind operations.

Since we are modifying lifecycle, also make sure we work in terms of a
TimerTask with an externally-provided timer -- we really do not care how
we are scheduled.

JIRA: BGPCEP-920
Change-Id: I6f2be6a5c14ac6e191988c3b7bfd56b78b94abb6
Signed-off-by: Oleksii Mozghovyi <oleksii.mozghovyi@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-stats/pom.xml
pcep/topology/topology-stats/src/main/java/org/opendaylight/bgpcep/pcep/topology/stats/provider/TopologyStatsProviderImpl.java
pcep/topology/topology-stats/src/main/resources/OSGI-INF/blueprint/pcep-stats.xml

index 9869c465de70c6bc1f9ea532be64d379f627f5fe..404db4e819b69bbf9583d57f49e71bc413aa96cb 100644 (file)
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.checkerframework</groupId>
+            <artifactId>checker-qual</artifactId>
+        </dependency>
+
         <!--Test dependencies-->
         <dependency>
             <groupId>org.opendaylight.mdsal</groupId>
index 5546c116d339baabedcac4602ad949f32e0de85b..d0289d3743ca9b4b5179cb5dd3ade35732ded915 100644 (file)
@@ -8,22 +8,22 @@
 package org.opendaylight.bgpcep.pcep.topology.stats.provider;
 
 import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.eclipse.jdt.annotation.NonNull;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.Transaction;
@@ -38,92 +38,127 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class TopologyStatsProviderImpl implements TransactionChainListener,
-        TopologySessionStatsRegistry, AutoCloseable {
-
+public final class TopologyStatsProviderImpl extends TimerTask
+        implements TransactionChainListener, TopologySessionStatsRegistry, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProviderImpl.class);
+
+    // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
+    // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
+    // deletion. The other weird thing is that this is concurrent set because of removals only -- additions are always
+    // protected by the lock.
+    //
+    // FIXME: This was introduced to remedy "instance-2" of BGPCEP-901. I think we should change statsMap so that it
+    //        tracks also the intent besides PcepSessionState -- that way we can mark 'we want to remove this' and
+    //        retry in face of failing transactions.
+    private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
     @GuardedBy("this")
     private final Map<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> statsMap = new HashMap<>();
-    private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
-    private final DataBroker dataBroker;
-    private final int timeout;
+    // Note: null indicates we have been shut down
+    @GuardedBy("this")
+    private DataBroker dataBroker;
+    @GuardedBy("this")
     private TransactionChain transactionChain;
-    private ScheduledFuture<?> scheduleTask;
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-    private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    public TopologyStatsProviderImpl(final @NonNull DataBroker dataBroker, final int timeout) {
+    private TopologyStatsProviderImpl(final DataBroker dataBroker) {
         this.dataBroker = requireNonNull(dataBroker);
-        this.timeout = timeout;
     }
 
-    public synchronized void init() {
+    public static AutoCloseable createStarted(final DataBroker dataBroker, final Timer timer,
+            final int updateIntervalSeconds) {
         LOG.info("Initializing TopologyStatsProvider service.");
-        this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
-        final TimerTask task = new TimerTask() {
-            @Override
-            public void run() {
-                updatePcepStats();
-            }
-        };
+        final TopologyStatsProviderImpl ret = new TopologyStatsProviderImpl(dataBroker);
+        timer.scheduleAtFixedRate(ret, 0, TimeUnit.SECONDS.toMillis(updateIntervalSeconds));
+        return ret;
+    }
+
+    @Override
+    public void close() throws InterruptedException, ExecutionException {
+        if (cancel()) {
+            LOG.info("Closing TopologyStatsProvider service.");
+            shutdown();
+        } else {
+            LOG.debug("TopologyStatsProvider already shut down");
+        }
+    }
+
+    private synchronized void shutdown() throws InterruptedException, ExecutionException {
+        // Try to get a transaction chain and indicate we are done
+        final TransactionChain chain = accessChain();
+        transactionChain = null;
+        dataBroker = null;
+
+        if (chain == null) {
+            // Belt & suspenders so we do not error out elsewhere
+            LOG.warn("Cannot acquire transaction chain, skipping cleanup");
+            return;
+        }
 
-        this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
+        // Issue deletes for all registered stats
+        final WriteTransaction wTx = chain.newWriteOnlyTransaction();
+        for (final KeyedInstanceIdentifier<Node, NodeKey> statId : statsMap.keySet()) {
+            wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+        }
+        statsMap.clear();
+
+        // Fire the transaction commit ...
+        final FluentFuture<?> future = wTx.commit();
+        // ... close the transaction chain ...
+        chain.close();
+        // ... and wait for transaction commit to complete
+        LOG.debug("Awaiting finish of TopologyStatsProvider cleanup");
+        future.get();
+    }
+
+    @Holding("this")
+    private @Nullable TransactionChain accessChain() {
+        if (transactionChain == null && dataBroker != null) {
+            // Re-create the chain if we have not been shut down
+            transactionChain = dataBroker.createMergingTransactionChain(this);
+        }
+        return transactionChain;
     }
 
+    @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-            justification = "https://github.com/spotbugs/spotbugs/issues/811")
-    private synchronized void updatePcepStats() {
-        final WriteTransaction tx = TopologyStatsProviderImpl.this.transactionChain.newWriteOnlyTransaction();
+    public synchronized void run() {
+        final TransactionChain chain = accessChain();
+        if (chain == null) {
+            // Already closed, do not bother
+            return;
+        }
 
+        final WriteTransaction tx = chain.newWriteOnlyTransaction();
         try {
-            for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
-                    : this.statsMap.entrySet()) {
-                if (this.statsPendingDelete.contains(entry.getKey())) {
-                    continue;
+            for (Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry : statsMap.entrySet()) {
+                if (!statsPendingDelete.contains(entry.getKey())) {
+                    tx.put(LogicalDatastoreType.OPERATIONAL,
+                        entry.getKey().augmentation(PcepTopologyNodeStatsAug.class),
+                        new PcepTopologyNodeStatsAugBuilder()
+                            .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build())
+                            .build());
                 }
-                final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
-                        .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
-                final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
-                        entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
-                tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
             }
-            tx.commit().addCallback(new FutureCallback<CommitInfo>() {
-                @Override
-                public void onSuccess(final CommitInfo result) {
-                    LOG.debug("Successfully committed Topology stats update");
-                }
-
-                @Override
-                public void onFailure(final Throwable ex) {
-                    LOG.error("Failed to commit Topology stats update", ex);
-                }
-            }, MoreExecutors.directExecutor());
-        } catch (final Exception e) {
+        } catch (Exception e) {
             LOG.warn("Failed to prepare Tx for PCEP stats update", e);
             tx.cancel();
+            return;
         }
-    }
 
-    @Override
-    public synchronized void close() throws Exception {
-        if (closed.compareAndSet(false, true)) {
-            LOG.info("Closing TopologyStatsProvider service.");
-            this.scheduleTask.cancel(true);
-            final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
-            for (final KeyedInstanceIdentifier<Node, NodeKey> statId : this.statsMap.keySet()) {
-                wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+            @Override
+            public void onSuccess(final CommitInfo result) {
+                LOG.debug("Successfully committed Topology stats update");
             }
-            wTx.commit().get();
-            this.statsMap.clear();
-            this.transactionChain.close();
-            this.scheduler.shutdown();
-        }
+
+            @Override
+            public void onFailure(final Throwable ex) {
+                LOG.error("Failed to commit Topology stats update", ex);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
     @Override
@@ -131,10 +166,11 @@ public final class TopologyStatsProviderImpl implements TransactionChainListener
             final Transaction transaction, final Throwable cause) {
         LOG.error("Transaction chain {} failed for tx {}",
                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
+        chain.close();
 
-        if (!closed.get()) {
-            transactionChain.close();
-            transactionChain = dataBroker.createMergingTransactionChain(this);
+        // Do not access the transaction chain again, re-recreated it instead
+        if (chain == transactionChain) {
+            transactionChain = null;
         }
     }
 
@@ -146,26 +182,42 @@ public final class TopologyStatsProviderImpl implements TransactionChainListener
     @Override
     public synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
             final PcepSessionState sessionState) {
-        this.statsMap.put(nodeId, sessionState);
+        if (dataBroker != null) {
+            statsMap.put(nodeId, sessionState);
+        } else {
+            LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
+        }
     }
 
     @Override
     public synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
-        this.statsMap.remove(nodeId);
-        this.statsPendingDelete.add(nodeId);
-        final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+        final TransactionChain chain = accessChain();
+        if (chain == null) {
+            // Already closed, do not bother
+            LOG.debug("Ignoring unbind of Pcep Node {}", nodeId);
+            return;
+        }
+
+        final PcepSessionState node = statsMap.remove(nodeId);
+        if (node == null) {
+            LOG.debug("Ignoring duplicate unbind of Pcep Node {}", nodeId);
+            return;
+        }
+
+        statsPendingDelete.add(nodeId);
+        final WriteTransaction wTx = chain.newWriteOnlyTransaction();
         wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
         wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
             @Override
             public void onSuccess(final CommitInfo result) {
                 LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
-                TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
+                statsPendingDelete.remove(nodeId);
             }
 
             @Override
             public void onFailure(final Throwable ex) {
                 LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
-                TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
+                statsPendingDelete.remove(nodeId);
             }
         }, MoreExecutors.directExecutor());
     }
index f5ba87e966705aaedaff13c7d8746223f7933ec4..50f1b64e5c21acd92026fcc2f9fb1346bf996d08 100644 (file)
     </bean>
     <odl:rpc-implementation ref="topologyStatsRpcService"/>
 
+    <bean id="timer" class="java.util.Timer" destroy-method="cancel">
+        <argument value="pcep-topology-stats-timer"/>
+        <argument value="true"/>
+    </bean>
+
     <bean id="topologyStatsRegistry"
           class="org.opendaylight.bgpcep.pcep.topology.stats.provider.TopologyStatsProviderImpl"
-          init-method="init" destroy-method="close">
+          factory-method="createStarted" destroy-method="close">
         <argument ref="dataBroker"/>
+        <argument ref="timer"/>
         <argument>
             <bean factory-ref="pcepStatsConfig" factory-method="getTimer"/>
         </argument>