Eliminate network-pcep-topology-config
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / TopologyStatsProviderImpl.java
1 /*
2  * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.Map.Entry;
18 import java.util.Set;
19 import java.util.TimerTask;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.checkerframework.checker.lock.qual.Holding;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.Transaction;
31 import org.opendaylight.mdsal.binding.api.TransactionChain;
32 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionStateBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAug;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
42 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public final class TopologyStatsProviderImpl implements TopologySessionStatsRegistry, TransactionChainListener,
47         AutoCloseable {
48     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProviderImpl.class);
49
50     // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
51     // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
52     // deletion. The other weird thing is that this is concurrent set because of removals only -- additions are always
53     // protected by the lock.
54     //
55     // FIXME: This was introduced to remedy "instance-2" of BGPCEP-901. I think we should change statsMap so that it
56     //        tracks also the intent besides PcepSessionState -- that way we can mark 'we want to remove this' and
57     //        retry in face of failing transactions.
58     private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
59     @GuardedBy("this")
60     private final Map<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> statsMap = new HashMap<>();
61     // Note: null indicates we have been shut down
62     @GuardedBy("this")
63     private DataBroker dataBroker;
64     @GuardedBy("this")
65     private TransactionChain transactionChain;
66     @GuardedBy("this")
67     private final ScheduledFuture<?> scheduleTask;
68
69     public TopologyStatsProviderImpl(final DataBroker dataBroker, final int updateIntervalSeconds) {
70         this(dataBroker, updateIntervalSeconds, Executors.newScheduledThreadPool(1));
71     }
72
73     public TopologyStatsProviderImpl(final DataBroker dataBroker, final int updateIntervalSeconds,
74             final ScheduledExecutorService scheduler) {
75         this.dataBroker = requireNonNull(dataBroker);
76         LOG.info("Initializing TopologyStatsProvider service.");
77         final TimerTask task = new TimerTask() {
78             @Override
79             public void run() {
80                 synchronized (TopologyStatsProviderImpl.this) {
81                     updateStats();
82                 }
83             }
84         };
85         scheduleTask = scheduler.scheduleAtFixedRate(task, 0, updateIntervalSeconds, TimeUnit.SECONDS);
86     }
87
88     @Override
89     public void close() throws InterruptedException, ExecutionException {
90         if (scheduleTask.cancel(true)) {
91             LOG.info("Closing TopologyStatsProvider service.");
92             shutdown();
93         } else {
94             LOG.debug("TopologyStatsProvider already shut down");
95         }
96     }
97
98     private synchronized void shutdown() throws InterruptedException, ExecutionException {
99         // Try to get a transaction chain and indicate we are done
100         final TransactionChain chain = accessChain();
101         transactionChain = null;
102         dataBroker = null;
103
104         if (chain == null) {
105             // Belt & suspenders so we do not error out elsewhere
106             LOG.warn("Cannot acquire transaction chain, skipping cleanup");
107             return;
108         }
109
110         // Issue deletes for all registered stats
111         final WriteTransaction wTx = chain.newWriteOnlyTransaction();
112         for (final KeyedInstanceIdentifier<Node, NodeKey> statId : statsMap.keySet()) {
113             wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
114         }
115         statsMap.clear();
116
117         // Fire the transaction commit ...
118         final FluentFuture<?> future = wTx.commit();
119         // ... close the transaction chain ...
120         chain.close();
121         // ... and wait for transaction commit to complete
122         LOG.debug("Awaiting finish of TopologyStatsProvider cleanup");
123         future.get();
124     }
125
126     @Holding("this")
127     private @Nullable TransactionChain accessChain() {
128         if (transactionChain == null && dataBroker != null) {
129             // Re-create the chain if we have not been shut down
130             transactionChain = dataBroker.createMergingTransactionChain(this);
131         }
132         return transactionChain;
133     }
134
135     @SuppressWarnings("checkstyle:IllegalCatch")
136     public synchronized void updateStats() {
137         final TransactionChain chain = accessChain();
138         if (chain == null) {
139             // Already closed, do not bother
140             return;
141         }
142
143         final WriteTransaction tx = chain.newWriteOnlyTransaction();
144         try {
145             for (Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry : statsMap.entrySet()) {
146                 if (!statsPendingDelete.contains(entry.getKey())) {
147                     tx.put(LogicalDatastoreType.OPERATIONAL,
148                             entry.getKey().augmentation(PcepTopologyNodeStatsAug.class),
149                             new PcepTopologyNodeStatsAugBuilder()
150                                     .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build())
151                                     .build());
152                 }
153             }
154         } catch (Exception e) {
155             LOG.warn("Failed to prepare Tx for PCEP stats update", e);
156             tx.cancel();
157             return;
158         }
159
160         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
161             @Override
162             public void onSuccess(final CommitInfo result) {
163                 LOG.debug("Successfully committed Topology stats update");
164             }
165
166             @Override
167             public void onFailure(final Throwable ex) {
168                 LOG.error("Failed to commit Topology stats update", ex);
169             }
170         }, MoreExecutors.directExecutor());
171     }
172
173     @Override
174     public synchronized void onTransactionChainFailed(final TransactionChain chain,
175             final Transaction transaction, final Throwable cause) {
176         LOG.error("Transaction chain {} failed for tx {}",
177                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
178         chain.close();
179
180         // Do not access the transaction chain again, re-recreated it instead
181         if (chain == transactionChain) {
182             transactionChain = null;
183         }
184     }
185
186     @Override
187     public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
188         LOG.debug("Transaction chain {} successful.", chain);
189     }
190
191     @Override
192     public synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
193             final PcepSessionState sessionState) {
194         if (dataBroker != null) {
195             statsMap.put(nodeId, sessionState);
196         } else {
197             LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
198         }
199     }
200
201     @Override
202     public synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
203         final TransactionChain chain = accessChain();
204         if (chain == null) {
205             // Already closed, do not bother
206             LOG.debug("Ignoring unbind of Pcep Node {}", nodeId);
207             return;
208         }
209
210         final PcepSessionState node = statsMap.remove(nodeId);
211         if (node == null) {
212             LOG.debug("Ignoring duplicate unbind of Pcep Node {}", nodeId);
213             return;
214         }
215
216         statsPendingDelete.add(nodeId);
217         final WriteTransaction wTx = chain.newWriteOnlyTransaction();
218         wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
219         wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
220             @Override
221             public void onSuccess(final CommitInfo result) {
222                 LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
223                 statsPendingDelete.remove(nodeId);
224             }
225
226             @Override
227             public void onFailure(final Throwable ex) {
228                 LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
229                 statsPendingDelete.remove(nodeId);
230             }
231         }, MoreExecutors.directExecutor());
232     }
233 }