2 * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.pcep.topology.provider;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Stopwatch;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.ThreadFactoryBuilder;
17 import io.netty.util.Timeout;
18 import io.netty.util.Timer;
19 import io.netty.util.TimerTask;
20 import java.lang.invoke.MethodHandles;
21 import java.lang.invoke.VarHandle;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
31 import org.opendaylight.yangtools.concepts.NoOpObjectRegistration;
32 import org.opendaylight.yangtools.concepts.ObjectRegistration;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 final class TopologyStatsProvider implements SessionStateRegistry {
37 private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
39 private final Set<Task> tasks = ConcurrentHashMap.newKeySet();
40 private final ExecutorService executor;
41 private final Timer timer;
43 TopologyStatsProvider(final Timer timer) {
44 this.timer = requireNonNull(timer);
45 executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
47 .setNameFormat("odl-pcep-stats-%d")
50 LOG.info("TopologyStatsProvider started");
54 if (executor.isShutdown()) {
55 LOG.debug("TopologyStatsProvider already shut down");
59 LOG.info("Closing TopologyStatsProvider service.");
60 final var toRun = executor.shutdownNow();
61 while (!tasks.isEmpty()) {
62 tasks.forEach(Task::close);
64 toRun.forEach(Runnable::run);
68 public ObjectRegistration<SessionStateUpdater> bind(final SessionStateUpdater sessionState) {
69 if (executor.isShutdown()) {
70 LOG.debug("Ignoring bind of Pcep Node {}", sessionState);
71 return NoOpObjectRegistration.of(sessionState);
74 final var task = new Task(sessionState);
79 private final class Task extends AbstractObjectRegistration<SessionStateUpdater> implements TimerTask {
80 private static final VarHandle STATE;
84 STATE = MethodHandles.lookup().findVarHandle(Task.class, "state", Object.class);
85 } catch (NoSuchFieldException | IllegalAccessException e) {
86 throw new ExceptionInInitializerError(e);
90 private volatile Object state;
92 Task(final @NonNull SessionStateUpdater instance) {
95 final long updateInterval = instance.updateInterval();
96 if (updateInterval > 0) {
97 state = timer.newTimeout(this, updateInterval, TimeUnit.NANOSECONDS);
99 LOG.debug("Task {} has non-positive interval {}, not scheduling it", this, updateInterval);
104 public void run(final Timeout timeout) {
106 LOG.debug("Task {} is closed, ignoring timeout {}", this, timeout);
110 final var witness = STATE.compareAndExchange(this, timeout, null);
111 if (witness != timeout) {
112 LOG.debug("Task {} ignoring unexpected timeout {} in state {}", this, timeout, witness);
116 final var sw = Stopwatch.createStarted();
117 state = executor.submit(() -> updateStatistics(sw));
120 private void updateStatistics(final Stopwatch sw) {
121 LOG.debug("Resumed processing task {} after {}", this, sw);
127 final var prevState = state;
128 if (prevState instanceof Future<?> execFuture && !execFuture.isDone()) {
129 final var future = getInstance().updateStatistics();
130 LOG.debug("Task {} update submitted in {}", this, sw);
132 future.addCallback(new FutureCallback<CommitInfo>() {
134 public void onSuccess(final CommitInfo result) {
135 LOG.debug("Task {} update completed in {}", this, sw);
136 reschedule(future, sw.elapsed(TimeUnit.NANOSECONDS));
140 public void onFailure(final Throwable cause) {
141 LOG.debug("Task {} update failed in {}", this, sw, cause);
142 reschedule(future, 0);
146 LOG.debug("Task {} ignoring unexpected update in state {}", this, prevState);
150 private void reschedule(final Object expectedState, final long elapsedNanos) {
155 var witness = STATE.compareAndExchange(this, expectedState, null);
156 if (witness != expectedState) {
157 LOG.debug("Task {} ignoring reschedule in unexpected state {}", this, witness);
161 final var updateInterval = getInstance().updateInterval();
162 if (updateInterval > 0) {
163 long remainingNanos = updateInterval - elapsedNanos;
164 if (remainingNanos < 0) {
165 remainingNanos = updateInterval;
167 state = timer.newTimeout(this, remainingNanos, TimeUnit.NANOSECONDS);
169 LOG.debug("Task {} has non-positive interval {}, skipping reschedule", this, updateInterval);
174 protected void removeRegistration() {
177 final var prevState = state;
178 if (prevState instanceof Timeout timeout) {
180 STATE.compareAndSet(this, prevState, null);
181 } else if (prevState instanceof Future<?> future) {
182 if (!(future instanceof FluentFuture)) {
183 future.cancel(false);
184 STATE.compareAndSet(this, prevState, null);
187 LOG.warn("Task {} in unexpected state {}", this, prevState);
189 getInstance().removeStatistics().addCallback(new FutureCallback<CommitInfo>() {
191 public void onSuccess(final CommitInfo result) {
192 LOG.debug("Task {} removed state", this);
196 public void onFailure(final Throwable cause) {
197 LOG.warn("Task {} failed to remove state", this, cause);
199 }, MoreExecutors.directExecutor());