Register PCEP session to stats handler only after it is fully initialized
[bgpcep.git] / pcep / topology / topology-stats / src / main / java / org / opendaylight / bgpcep / pcep / topology / stats / provider / TopologyStatsProviderImpl.java
index 05791767492c09d47edc98c9e04b47f060cfff4b..6f87680839e61959ed492224b0c8f5e8be047526 100644 (file)
@@ -5,12 +5,14 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.bgpcep.pcep.topology.stats.provider;
 
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TimerTask;
@@ -18,20 +20,21 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionStateBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev171113.PcepTopologyNodeStatsAug;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev171113.PcepTopologyNodeStatsAugBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAug;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -47,18 +50,19 @@ public final class TopologyStatsProviderImpl implements TransactionChainListener
     private final Map<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> statsMap = new HashMap<>();
     private final DataBroker dataBroker;
     private final int timeout;
-    private BindingTransactionChain transactionChain;
+    private TransactionChain transactionChain;
     private ScheduledFuture<?> scheduleTask;
     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    public TopologyStatsProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout) {
+    public TopologyStatsProviderImpl(final @NonNull DataBroker dataBroker, final int timeout) {
         this.dataBroker = requireNonNull(dataBroker);
         this.timeout = timeout;
     }
 
     public synchronized void init() {
-        LOG.info("Initializing TopologyStatsProvider service.", this);
-        this.transactionChain = this.dataBroker.createTransactionChain(this);
+        LOG.info("Initializing TopologyStatsProvider service.");
+        this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
         final TimerTask task = new TimerTask() {
             @Override
             public void run() {
@@ -69,42 +73,68 @@ public final class TopologyStatsProviderImpl implements TransactionChainListener
         this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private synchronized void updatePcepStats() {
         final WriteTransaction tx = TopologyStatsProviderImpl.this.transactionChain.newWriteOnlyTransaction();
 
-        for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
-                : this.statsMap.entrySet()) {
-            final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
-                    .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
-            final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
-                    entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
-            tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
+        try {
+            for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
+                    : this.statsMap.entrySet()) {
+                final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
+                        .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
+                final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
+                        entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
+                tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
+            }
+            tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+                @Override
+                public void onSuccess(final CommitInfo result) {
+                    LOG.debug("Successfully committed Topology stats update");
+                }
+
+                @Override
+                public void onFailure(final Throwable ex) {
+                    LOG.error("Failed to commit Topology stats update", ex);
+                }
+            }, MoreExecutors.directExecutor());
+        } catch (final Exception e) {
+            LOG.warn("Failed to prepare Tx for PCEP stats update", e);
+            tx.cancel();
         }
-        tx.submit();
     }
 
     @Override
     public synchronized void close() throws Exception {
-        LOG.info("Closing TopologyStatsProvider service.", this);
-        this.scheduleTask.cancel(true);
-        final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
-        for (final KeyedInstanceIdentifier<Node, NodeKey> statId : this.statsMap.keySet()) {
-            wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+        if (closed.compareAndSet(false, true)) {
+            LOG.info("Closing TopologyStatsProvider service.");
+            this.scheduleTask.cancel(true);
+            final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+            for (final KeyedInstanceIdentifier<Node, NodeKey> statId : this.statsMap.keySet()) {
+                wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+            }
+            wTx.commit().get();
+            this.statsMap.clear();
+            this.transactionChain.close();
+            this.scheduler.shutdown();
         }
-        wTx.submit().get();
-        this.statsMap.clear();
-        this.transactionChain.close();
-        this.scheduler.shutdown();
     }
 
     @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
-            final Throwable cause) {
-        LOG.error("Transaction chain failed {}.", transaction != null ? transaction.getIdentifier() : null, cause);
+    public synchronized void onTransactionChainFailed(final TransactionChain chain,
+            final Transaction transaction, final Throwable cause) {
+        LOG.error("Transaction chain {} failed for tx {}",
+                chain, transaction != null ? transaction.getIdentifier() : null, cause);
+
+        if (!closed.get()) {
+            transactionChain.close();
+            transactionChain = dataBroker.createMergingTransactionChain(this);
+        }
     }
 
     @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+    public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
         LOG.debug("Transaction chain {} successful.", chain);
     }
 
@@ -120,9 +150,9 @@ public final class TopologyStatsProviderImpl implements TransactionChainListener
         final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
         wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
         try {
-            wTx.submit().get();
+            wTx.commit().get();
         } catch (final InterruptedException | ExecutionException e) {
-            LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId());
+            LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), e);
         }
     }
 }