2 * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.pcep.topology.provider;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.HashMap;
18 import java.util.TimerTask;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.Transaction;
31 import org.opendaylight.mdsal.binding.api.TransactionChain;
32 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionStateBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAug;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
42 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
43 import org.opendaylight.yangtools.concepts.NoOpObjectRegistration;
44 import org.opendaylight.yangtools.concepts.ObjectRegistration;
45 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 final class TopologyStatsProvider implements SessionStateRegistry, TransactionChainListener {
50 private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
52 // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
53 // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
54 // deletion. The other weird thing is that this is concurrent set because of removals only -- additions are always
55 // protected by the lock.
57 // FIXME: This was introduced to remedy "instance-2" of BGPCEP-901. I think we should change statsMap so that it
58 // tracks also the intent besides PcepSessionState -- that way we can mark 'we want to remove this' and
59 // retry in face of failing transactions.
60 private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
62 private final Map<KeyedInstanceIdentifier<Node, NodeKey>, Reg<?>> statsMap = new HashMap<>();
63 // Note: null indicates we have been shut down
65 private DataBroker dataBroker;
67 private TransactionChain transactionChain;
69 private final ScheduledFuture<?> scheduleTask;
71 TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds) {
72 this(dataBroker, updateIntervalSeconds, Executors.newScheduledThreadPool(1));
75 TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds,
76 final ScheduledExecutorService scheduler) {
77 this.dataBroker = requireNonNull(dataBroker);
78 LOG.info("Initializing TopologyStatsProvider service.");
79 scheduleTask = scheduler.scheduleAtFixedRate(new TimerTask() {
84 }, 0, updateIntervalSeconds, TimeUnit.SECONDS);
87 // FIXME: there should be no further tasks, hence this should not be needed
88 // FIXME: if it ends up being needed, it needs to be asynchronous
89 void shutdown() throws InterruptedException, ExecutionException {
90 if (scheduleTask.cancel(true)) {
91 LOG.info("Closing TopologyStatsProvider service.");
94 LOG.debug("TopologyStatsProvider already shut down");
98 private synchronized void lockedShutdown() throws InterruptedException, ExecutionException {
99 // Try to get a transaction chain and indicate we are done
100 final TransactionChain chain = accessChain();
101 transactionChain = null;
105 // Belt & suspenders so we do not error out elsewhere
106 LOG.warn("Cannot acquire transaction chain, skipping cleanup");
110 // Issue deletes for all registered stats
111 final WriteTransaction wTx = chain.newWriteOnlyTransaction();
112 for (final KeyedInstanceIdentifier<Node, NodeKey> statId : statsMap.keySet()) {
113 wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
117 // Fire the transaction commit ...
118 final FluentFuture<?> future = wTx.commit();
119 // ... close the transaction chain ...
121 // ... and wait for transaction commit to complete
122 LOG.debug("Awaiting finish of TopologyStatsProvider cleanup");
127 private @Nullable TransactionChain accessChain() {
128 if (transactionChain == null && dataBroker != null) {
129 // Re-create the chain if we have not been shut down
130 transactionChain = dataBroker.createMergingTransactionChain(this);
132 return transactionChain;
135 @SuppressWarnings("checkstyle:IllegalCatch")
136 private synchronized void updateStats() {
137 final TransactionChain chain = accessChain();
139 // Already closed, do not bother
143 final WriteTransaction tx = chain.newWriteOnlyTransaction();
145 for (var entry : statsMap.entrySet()) {
146 if (!statsPendingDelete.contains(entry.getKey())) {
147 final var reg = entry.getValue();
148 if (reg.notClosed()) {
149 tx.put(LogicalDatastoreType.OPERATIONAL,
150 entry.getKey().augmentation(PcepTopologyNodeStatsAug.class),
151 new PcepTopologyNodeStatsAugBuilder()
152 .setPcepSessionState(new PcepSessionStateBuilder(reg.getInstance()).build())
157 } catch (Exception e) {
158 LOG.warn("Failed to prepare Tx for PCEP stats update", e);
163 tx.commit().addCallback(new FutureCallback<CommitInfo>() {
165 public void onSuccess(final CommitInfo result) {
166 LOG.debug("Successfully committed Topology stats update");
170 public void onFailure(final Throwable ex) {
171 LOG.error("Failed to commit Topology stats update", ex);
173 }, MoreExecutors.directExecutor());
177 public synchronized void onTransactionChainFailed(final TransactionChain chain,
178 final Transaction transaction, final Throwable cause) {
179 LOG.error("Transaction chain {} failed for tx {}",
180 chain, transaction != null ? transaction.getIdentifier() : null, cause);
183 // Do not access the transaction chain again, re-recreated it instead
184 if (chain == transactionChain) {
185 transactionChain = null;
190 public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
191 LOG.debug("Transaction chain {} successful.", chain);
195 public synchronized <T extends PcepSessionState> ObjectRegistration<T> bind(
196 final KeyedInstanceIdentifier<Node, NodeKey> nodeId, final T sessionState) {
197 if (dataBroker == null) {
198 LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
199 return NoOpObjectRegistration.of(sessionState);
202 final var ret = new Reg<>(sessionState, nodeId);
203 // FIXME: a replace should never happen, and hence regs are just a Set (which can be concurrent and this method
204 // does not need synchronization
205 statsMap.put(nodeId, ret);
209 private synchronized void removeRegistration(final @NonNull Reg<?> reg) {
210 final var nodeId = reg.nodeId;
212 if (!statsMap.remove(nodeId, reg)) {
213 // Already replaced by a subsequent bind()
214 LOG.debug("Ignoring overridden unbind of Pcep Node {}", nodeId);
218 final TransactionChain chain = accessChain();
220 // Already closed, do not bother
221 LOG.debug("Ignoring unbind of Pcep Node {}", nodeId);
225 statsPendingDelete.add(nodeId);
226 final WriteTransaction wTx = chain.newWriteOnlyTransaction();
227 wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
228 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
230 public void onSuccess(final CommitInfo result) {
231 LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
232 statsPendingDelete.remove(nodeId);
236 public void onFailure(final Throwable ex) {
237 LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
238 statsPendingDelete.remove(nodeId);
240 }, MoreExecutors.directExecutor());
243 private final class Reg<T extends PcepSessionState> extends AbstractObjectRegistration<T> {
244 private final @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodeId;
246 Reg(final @NonNull T instance, final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
248 this.nodeId = requireNonNull(nodeId);
252 protected void removeRegistration() {
253 TopologyStatsProvider.this.removeRegistration(this);