Prevent deadlock when updating PCEP stats when Tx chain fails
[bgpcep.git] / pcep / topology / topology-stats / src / main / java / org / opendaylight / bgpcep / pcep / topology / stats / provider / TopologyStatsProviderImpl.java
1 /*
2  * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.stats.provider;
9
10 import static java.util.Objects.requireNonNull;
11 import static java.util.concurrent.TimeUnit.SECONDS;
12
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.TimerTask;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.Transaction;
30 import org.opendaylight.mdsal.binding.api.TransactionChain;
31 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
32 import org.opendaylight.mdsal.binding.api.WriteTransaction;
33 import org.opendaylight.mdsal.common.api.CommitInfo;
34 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionStateBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAug;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public final class TopologyStatsProviderImpl implements TransactionChainListener,
47         TopologySessionStatsRegistry, AutoCloseable {
48
49     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProviderImpl.class);
50     @GuardedBy("this")
51     private final Map<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> statsMap = new HashMap<>();
52     private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
53     private final DataBroker dataBroker;
54     private final int timeout;
55     private TransactionChain transactionChain;
56     private ScheduledFuture<?> scheduleTask;
57     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
58     private final AtomicBoolean closed = new AtomicBoolean(false);
59
60     public TopologyStatsProviderImpl(final @NonNull DataBroker dataBroker, final int timeout) {
61         this.dataBroker = requireNonNull(dataBroker);
62         this.timeout = timeout;
63     }
64
65     public synchronized void init() {
66         LOG.info("Initializing TopologyStatsProvider service.");
67         this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
68         final TimerTask task = new TimerTask() {
69             @Override
70             public void run() {
71                 updatePcepStats();
72             }
73         };
74
75         this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
76     }
77
78     @SuppressWarnings("checkstyle:IllegalCatch")
79     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
80             justification = "https://github.com/spotbugs/spotbugs/issues/811")
81     private synchronized void updatePcepStats() {
82         final WriteTransaction tx = TopologyStatsProviderImpl.this.transactionChain.newWriteOnlyTransaction();
83
84         try {
85             for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
86                     : this.statsMap.entrySet()) {
87                 if (this.statsPendingDelete.contains(entry.getKey())) {
88                     continue;
89                 }
90                 final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
91                         .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
92                 final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
93                         entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
94                 tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
95             }
96             tx.commit().addCallback(new FutureCallback<CommitInfo>() {
97                 @Override
98                 public void onSuccess(final CommitInfo result) {
99                     LOG.debug("Successfully committed Topology stats update");
100                 }
101
102                 @Override
103                 public void onFailure(final Throwable ex) {
104                     LOG.error("Failed to commit Topology stats update", ex);
105                 }
106             }, MoreExecutors.directExecutor());
107         } catch (final Exception e) {
108             LOG.warn("Failed to prepare Tx for PCEP stats update", e);
109             tx.cancel();
110         }
111     }
112
113     @Override
114     public synchronized void close() throws Exception {
115         if (closed.compareAndSet(false, true)) {
116             LOG.info("Closing TopologyStatsProvider service.");
117             this.scheduleTask.cancel(true);
118             final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
119             for (final KeyedInstanceIdentifier<Node, NodeKey> statId : this.statsMap.keySet()) {
120                 wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
121             }
122             wTx.commit().get();
123             this.statsMap.clear();
124             this.transactionChain.close();
125             this.scheduler.shutdown();
126         }
127     }
128
129     @Override
130     public synchronized void onTransactionChainFailed(final TransactionChain chain,
131             final Transaction transaction, final Throwable cause) {
132         LOG.error("Transaction chain {} failed for tx {}",
133                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
134
135         if (!closed.get()) {
136             transactionChain.close();
137             transactionChain = dataBroker.createMergingTransactionChain(this);
138         }
139     }
140
141     @Override
142     public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
143         LOG.debug("Transaction chain {} successful.", chain);
144     }
145
146     @Override
147     public synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
148             final PcepSessionState sessionState) {
149         this.statsMap.put(nodeId, sessionState);
150     }
151
152     @Override
153     public synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
154         this.statsMap.remove(nodeId);
155         this.statsPendingDelete.add(nodeId);
156         final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
157         wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
158         wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
159             @Override
160             public void onSuccess(final CommitInfo result) {
161                 LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
162                 TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
163             }
164
165             @Override
166             public void onFailure(final Throwable ex) {
167                 LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
168                 TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
169             }
170         }, MoreExecutors.directExecutor());
171     }
172 }