Integrate TopologyStatsProviderImpl
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / TopologyStatsProvider.java
1 /*
2  * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.Set;
18 import java.util.TimerTask;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.Transaction;
31 import org.opendaylight.mdsal.binding.api.TransactionChain;
32 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionStateBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAug;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
42 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
43 import org.opendaylight.yangtools.concepts.NoOpObjectRegistration;
44 import org.opendaylight.yangtools.concepts.ObjectRegistration;
45 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 final class TopologyStatsProvider implements SessionStateRegistry, TransactionChainListener {
50     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
51
52     // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
53     // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
54     // deletion. The other weird thing is that this is concurrent set because of removals only -- additions are always
55     // protected by the lock.
56     //
57     // FIXME: This was introduced to remedy "instance-2" of BGPCEP-901. I think we should change statsMap so that it
58     //        tracks also the intent besides PcepSessionState -- that way we can mark 'we want to remove this' and
59     //        retry in face of failing transactions.
60     private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
61     @GuardedBy("this")
62     private final Map<KeyedInstanceIdentifier<Node, NodeKey>, Reg<?>> statsMap = new HashMap<>();
63     // Note: null indicates we have been shut down
64     @GuardedBy("this")
65     private DataBroker dataBroker;
66     @GuardedBy("this")
67     private TransactionChain transactionChain;
68     @GuardedBy("this")
69     private final ScheduledFuture<?> scheduleTask;
70
71     TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds) {
72         this(dataBroker, updateIntervalSeconds, Executors.newScheduledThreadPool(1));
73     }
74
75     TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds,
76             final ScheduledExecutorService scheduler) {
77         this.dataBroker = requireNonNull(dataBroker);
78         LOG.info("Initializing TopologyStatsProvider service.");
79         scheduleTask = scheduler.scheduleAtFixedRate(new TimerTask() {
80             @Override
81             public void run() {
82                 updateStats();
83             }
84         }, 0, updateIntervalSeconds, TimeUnit.SECONDS);
85     }
86
87     // FIXME: there should be no further tasks, hence this should not be needed
88     // FIXME: if it ends up being needed, it needs to be asynchronous
89     void shutdown() throws InterruptedException, ExecutionException {
90         if (scheduleTask.cancel(true)) {
91             LOG.info("Closing TopologyStatsProvider service.");
92             lockedShutdown();
93         } else {
94             LOG.debug("TopologyStatsProvider already shut down");
95         }
96     }
97
98     private synchronized void lockedShutdown() throws InterruptedException, ExecutionException {
99         // Try to get a transaction chain and indicate we are done
100         final TransactionChain chain = accessChain();
101         transactionChain = null;
102         dataBroker = null;
103
104         if (chain == null) {
105             // Belt & suspenders so we do not error out elsewhere
106             LOG.warn("Cannot acquire transaction chain, skipping cleanup");
107             return;
108         }
109
110         // Issue deletes for all registered stats
111         final WriteTransaction wTx = chain.newWriteOnlyTransaction();
112         for (final KeyedInstanceIdentifier<Node, NodeKey> statId : statsMap.keySet()) {
113             wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
114         }
115         statsMap.clear();
116
117         // Fire the transaction commit ...
118         final FluentFuture<?> future = wTx.commit();
119         // ... close the transaction chain ...
120         chain.close();
121         // ... and wait for transaction commit to complete
122         LOG.debug("Awaiting finish of TopologyStatsProvider cleanup");
123         future.get();
124     }
125
126     @Holding("this")
127     private @Nullable TransactionChain accessChain() {
128         if (transactionChain == null && dataBroker != null) {
129             // Re-create the chain if we have not been shut down
130             transactionChain = dataBroker.createMergingTransactionChain(this);
131         }
132         return transactionChain;
133     }
134
135     @SuppressWarnings("checkstyle:IllegalCatch")
136     private synchronized void updateStats() {
137         final TransactionChain chain = accessChain();
138         if (chain == null) {
139             // Already closed, do not bother
140             return;
141         }
142
143         final WriteTransaction tx = chain.newWriteOnlyTransaction();
144         try {
145             for (var entry : statsMap.entrySet()) {
146                 if (!statsPendingDelete.contains(entry.getKey())) {
147                     final var reg = entry.getValue();
148                     if (reg.notClosed()) {
149                         tx.put(LogicalDatastoreType.OPERATIONAL,
150                             entry.getKey().augmentation(PcepTopologyNodeStatsAug.class),
151                             new PcepTopologyNodeStatsAugBuilder()
152                                 .setPcepSessionState(new PcepSessionStateBuilder(reg.getInstance()).build())
153                                 .build());
154                     }
155                 }
156             }
157         } catch (Exception e) {
158             LOG.warn("Failed to prepare Tx for PCEP stats update", e);
159             tx.cancel();
160             return;
161         }
162
163         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
164             @Override
165             public void onSuccess(final CommitInfo result) {
166                 LOG.debug("Successfully committed Topology stats update");
167             }
168
169             @Override
170             public void onFailure(final Throwable ex) {
171                 LOG.error("Failed to commit Topology stats update", ex);
172             }
173         }, MoreExecutors.directExecutor());
174     }
175
176     @Override
177     public synchronized void onTransactionChainFailed(final TransactionChain chain,
178             final Transaction transaction, final Throwable cause) {
179         LOG.error("Transaction chain {} failed for tx {}",
180                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
181         chain.close();
182
183         // Do not access the transaction chain again, re-recreated it instead
184         if (chain == transactionChain) {
185             transactionChain = null;
186         }
187     }
188
189     @Override
190     public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
191         LOG.debug("Transaction chain {} successful.", chain);
192     }
193
194     @Override
195     public synchronized <T extends PcepSessionState> ObjectRegistration<T> bind(
196             final KeyedInstanceIdentifier<Node, NodeKey> nodeId, final T sessionState) {
197         if (dataBroker == null) {
198             LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
199             return NoOpObjectRegistration.of(sessionState);
200         }
201
202         final var ret = new Reg<>(sessionState, nodeId);
203         // FIXME: a replace should never happen, and hence regs are just a Set (which can be concurrent and this method
204         //        does not need synchronization
205         statsMap.put(nodeId, ret);
206         return ret;
207     }
208
209     private synchronized void removeRegistration(final @NonNull Reg<?> reg) {
210         final var nodeId = reg.nodeId;
211
212         if (!statsMap.remove(nodeId, reg)) {
213             // Already replaced by a subsequent bind()
214             LOG.debug("Ignoring overridden unbind of Pcep Node {}", nodeId);
215             return;
216         }
217
218         final TransactionChain chain = accessChain();
219         if (chain == null) {
220             // Already closed, do not bother
221             LOG.debug("Ignoring unbind of Pcep Node {}", nodeId);
222             return;
223         }
224
225         statsPendingDelete.add(nodeId);
226         final WriteTransaction wTx = chain.newWriteOnlyTransaction();
227         wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
228         wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
229             @Override
230             public void onSuccess(final CommitInfo result) {
231                 LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
232                 statsPendingDelete.remove(nodeId);
233             }
234
235             @Override
236             public void onFailure(final Throwable ex) {
237                 LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
238                 statsPendingDelete.remove(nodeId);
239             }
240         }, MoreExecutors.directExecutor());
241     }
242
243     private final class Reg<T extends PcepSessionState> extends AbstractObjectRegistration<T> {
244         private final @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodeId;
245
246         Reg(final @NonNull T instance, final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
247             super(instance);
248             this.nodeId = requireNonNull(nodeId);
249         }
250
251         @Override
252         protected void removeRegistration() {
253             TopologyStatsProvider.this.removeRegistration(this);
254         }
255     }
256 }