Relocate ted-name
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / TopologyStatsProvider.java
1 /*
2  * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Stopwatch;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.ThreadFactoryBuilder;
17 import io.netty.util.Timeout;
18 import io.netty.util.Timer;
19 import io.netty.util.TimerTask;
20 import java.lang.invoke.MethodHandles;
21 import java.lang.invoke.VarHandle;
22 import java.util.Set;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
31 import org.opendaylight.yangtools.concepts.NoOpObjectRegistration;
32 import org.opendaylight.yangtools.concepts.ObjectRegistration;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 final class TopologyStatsProvider implements SessionStateRegistry {
37     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
38
39     private final Set<Task> tasks = ConcurrentHashMap.newKeySet();
40     private final ExecutorService executor;
41     private final Timer timer;
42
43     TopologyStatsProvider(final Timer timer) {
44         this.timer = requireNonNull(timer);
45         executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
46             .setDaemon(true)
47             .setNameFormat("odl-pcep-stats-%d")
48             .build());
49
50         LOG.info("TopologyStatsProvider started");
51     }
52
53     void shutdown() {
54         if (executor.isShutdown()) {
55             LOG.debug("TopologyStatsProvider already shut down");
56             return;
57         }
58
59         LOG.info("Closing TopologyStatsProvider service.");
60         final var toRun = executor.shutdownNow();
61         while (!tasks.isEmpty()) {
62             tasks.forEach(Task::close);
63         }
64         toRun.forEach(Runnable::run);
65     }
66
67     @Override
68     public ObjectRegistration<SessionStateUpdater> bind(final SessionStateUpdater sessionState) {
69         if (executor.isShutdown()) {
70             LOG.debug("Ignoring bind of Pcep Node {}", sessionState);
71             return NoOpObjectRegistration.of(sessionState);
72         }
73
74         final var task = new Task(sessionState);
75         tasks.add(task);
76         return task;
77     }
78
79     private final class Task extends AbstractObjectRegistration<SessionStateUpdater> implements TimerTask {
80         private static final VarHandle STATE;
81
82         static {
83             try {
84                 STATE = MethodHandles.lookup().findVarHandle(Task.class, "state", Object.class);
85             } catch (NoSuchFieldException | IllegalAccessException e) {
86                 throw new ExceptionInInitializerError(e);
87             }
88         }
89
90         private volatile Object state;
91
92         Task(final @NonNull SessionStateUpdater instance) {
93             super(instance);
94
95             final long updateInterval = instance.updateInterval();
96             if (updateInterval > 0) {
97                 state = timer.newTimeout(this, updateInterval, TimeUnit.NANOSECONDS);
98             } else {
99                 LOG.debug("Task {} has non-positive interval {}, not scheduling it", this, updateInterval);
100             }
101         }
102
103         @Override
104         public void run(final Timeout timeout) {
105             if (notClosed()) {
106                 LOG.debug("Task {} is closed, ignoring timeout {}", this, timeout);
107                 return;
108             }
109
110             final var witness = STATE.compareAndExchange(this, timeout, null);
111             if (witness != timeout) {
112                 LOG.debug("Task {} ignoring unexpected timeout {} in state {}", this, timeout, witness);
113                 return;
114             }
115
116             final var sw = Stopwatch.createStarted();
117             state = executor.submit(() -> updateStatistics(sw));
118         }
119
120         private void updateStatistics(final Stopwatch sw) {
121             LOG.debug("Resumed processing task {} after {}", this, sw);
122             if (isClosed()) {
123                 // Already closed
124                 return;
125             }
126
127             final var prevState = state;
128             if (prevState instanceof Future<?> execFuture && !execFuture.isDone()) {
129                 final var future = getInstance().updateStatistics();
130                 LOG.debug("Task {} update submitted in {}", this, sw);
131                 state = future;
132                 future.addCallback(new FutureCallback<CommitInfo>() {
133                     @Override
134                     public void onSuccess(final CommitInfo result) {
135                         LOG.debug("Task {} update completed in {}", this, sw);
136                         reschedule(future, sw.elapsed(TimeUnit.NANOSECONDS));
137                     }
138
139                     @Override
140                     public void onFailure(final Throwable cause) {
141                         LOG.debug("Task {} update failed in {}", this, sw, cause);
142                         reschedule(future, 0);
143                     }
144                 }, executor);
145             } else {
146                 LOG.debug("Task {} ignoring unexpected update in state {}", this, prevState);
147             }
148         }
149
150         private void reschedule(final Object expectedState, final long elapsedNanos) {
151             if (isClosed()) {
152                 // Already closed
153                 return;
154             }
155             var witness = STATE.compareAndExchange(this, expectedState, null);
156             if (witness != expectedState) {
157                 LOG.debug("Task {} ignoring reschedule in unexpected state {}", this, witness);
158                 return;
159             }
160
161             final var updateInterval = getInstance().updateInterval();
162             if (updateInterval > 0) {
163                 long remainingNanos = updateInterval - elapsedNanos;
164                 if (remainingNanos < 0) {
165                     remainingNanos = updateInterval;
166                 }
167                 state = timer.newTimeout(this, remainingNanos, TimeUnit.NANOSECONDS);
168             } else {
169                 LOG.debug("Task {} has non-positive interval {}, skipping reschedule", this, updateInterval);
170             }
171         }
172
173         @Override
174         protected void removeRegistration() {
175             tasks.remove(this);
176
177             final var prevState = state;
178             if (prevState instanceof Timeout timeout) {
179                 timeout.cancel();
180                 STATE.compareAndSet(this, prevState, null);
181             } else if (prevState instanceof Future<?> future) {
182                 if (!(future instanceof FluentFuture)) {
183                     future.cancel(false);
184                     STATE.compareAndSet(this, prevState, null);
185                 }
186             } else {
187                 LOG.warn("Task {} in unexpected state {}", this, prevState);
188             }
189             getInstance().removeStatistics().addCallback(new FutureCallback<CommitInfo>() {
190                 @Override
191                 public void onSuccess(final CommitInfo result) {
192                     LOG.debug("Task {} removed state", this);
193                 }
194
195                 @Override
196                 public void onFailure(final Throwable cause) {
197                     LOG.warn("Task {} failed to remove state", this, cause);
198                 }
199             }, MoreExecutors.directExecutor());
200         }
201     }
202 }