private final Map<SrpIdNumber, PCEPRequest> requests = new HashMap<>();
@GuardedBy("this")
private final Map<String, ReportedLsp> lspData = new ConcurrentHashMap<>();
- private final TopologySessionStatsRegistry statsProvider;
private final ServerSessionManager serverSessionManager;
+ private final SessionStateRegistry stateRegistry;
private InstanceIdentifier<PathComputationClient> pccIdentifier;
@GuardedBy("this")
@GuardedBy("this")
private boolean triggeredResyncInProcess;
- AbstractTopologySessionListener(final TopologySessionStatsRegistry statsProvider,
+ AbstractTopologySessionListener(final SessionStateRegistry stateRegistry,
final ServerSessionManager serverSessionManager) {
- this.statsProvider = requireNonNull(statsProvider);
+ this.stateRegistry = requireNonNull(stateRegistry);
this.serverSessionManager = requireNonNull(serverSessionManager);
}
state.storeNode(topologyAugment,
new Node1Builder().setPathComputationClient(pccBuilder.build()).build(), psession);
- listenerState = statsProvider.bind(nodeId, new SessionStateImpl(this, psession));
+ listenerState = stateRegistry.bind(nodeId, new SessionStateImpl(this, psession));
LOG.info("Session with {} attached to topology node {}", peerAddress, nodeId);
}
}
*
* @return TopologySessionStateRegistry
*/
- TopologySessionStatsRegistry getStateRegistry();
+ SessionStateRegistry getStateRegistry();
/**
* PCE Server Provider.
/**
* Creates a new stateful topology session listener for given server session manager.
*/
- PCEPTopologySessionListener(final TopologySessionStatsRegistry statsProvider,
+ PCEPTopologySessionListener(final SessionStateRegistry stateRegistry,
final ServerSessionManager serverSessionManager, final PceServerProvider pceServerProvider) {
- super(statsProvider, serverSessionManager);
+ super(stateRegistry, serverSessionManager);
// FIXME: requireNonNull(), except tests need to be updated
this.pceServerProvider = pceServerProvider;
}
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
// Services we are using
final @NonNull InstructionSchedulerFactory instructionSchedulerFactory;
final @NonNull ClusterSingletonServiceProvider singletonService;
- private final @NonNull TopologySessionStatsRegistry stateRegistry;
private final @NonNull RpcProviderService rpcProviderRegistry;
private final @NonNull PceServerProvider pceServerProvider;
private final @NonNull PCEPDispatcher pcepDispatcher;
private final @NonNull DataBroker dataBroker;
+ // Statistics provider
+ private final @NonNull TopologyStatsProvider statsProvider;
+
// Timer used for RPC timeouts and session statistics scheduling
private final @NonNull HashedWheelTimer privateTimer = new HashedWheelTimer();
private final @NonNull Timer timer = new Timer() {
public PCEPTopologyTracker(final DataBroker dataBroker, final ClusterSingletonServiceProvider singletonService,
final RpcProviderService rpcProviderRegistry, final PCEPDispatcher pcepDispatcher,
- final InstructionSchedulerFactory instructionSchedulerFactory,
- final TopologySessionStatsRegistry stateRegistry, final PceServerProvider pceServerProvider) {
+ final InstructionSchedulerFactory instructionSchedulerFactory, final PceServerProvider pceServerProvider,
+ final int updateIntervalSeconds) {
this.dataBroker = requireNonNull(dataBroker);
this.singletonService = requireNonNull(singletonService);
this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
this.pcepDispatcher = requireNonNull(pcepDispatcher);
this.instructionSchedulerFactory = requireNonNull(instructionSchedulerFactory);
- this.stateRegistry = requireNonNull(stateRegistry);
this.pceServerProvider = requireNonNull(pceServerProvider);
+ statsProvider = new TopologyStatsProvider(dataBroker, updateIntervalSeconds);
reg = dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class).child(TopologyTypes.class)
}
@Override
- public TopologySessionStatsRegistry getStateRegistry() {
- return stateRegistry;
+ public SessionStateRegistry getStateRegistry() {
+ return statsProvider;
}
@Override
LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
}
+ try {
+ statsProvider.shutdown();
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to close statistics provider", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for statistics provider shutdown", e);
+ Thread.currentThread().interrupt();
+ }
+
LOG.info("PCEP Topology tracker shut down");
}
/**
* Topology Node Sessions stats handler. Will store Session stats on DS per each Topology Node registered.
*/
-interface TopologySessionStatsRegistry {
+interface SessionStateRegistry {
/**
* Register session to Session stats Registry handler.
*
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class TopologyStatsProviderImpl implements TopologySessionStatsRegistry, TransactionChainListener,
- AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProviderImpl.class);
+final class TopologyStatsProvider implements SessionStateRegistry, TransactionChainListener {
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
// This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
// which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
@GuardedBy("this")
private final ScheduledFuture<?> scheduleTask;
- public TopologyStatsProviderImpl(final DataBroker dataBroker, final int updateIntervalSeconds) {
+ TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds) {
this(dataBroker, updateIntervalSeconds, Executors.newScheduledThreadPool(1));
}
- public TopologyStatsProviderImpl(final DataBroker dataBroker, final int updateIntervalSeconds,
+ TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds,
final ScheduledExecutorService scheduler) {
this.dataBroker = requireNonNull(dataBroker);
LOG.info("Initializing TopologyStatsProvider service.");
}, 0, updateIntervalSeconds, TimeUnit.SECONDS);
}
- @Override
- public void close() throws InterruptedException, ExecutionException {
+ // FIXME: there should be no further tasks, hence this should not be needed
+ // FIXME: if it ends up being needed, it needs to be asynchronous
+ void shutdown() throws InterruptedException, ExecutionException {
if (scheduleTask.cancel(true)) {
LOG.info("Closing TopologyStatsProvider service.");
- shutdown();
+ lockedShutdown();
} else {
LOG.debug("TopologyStatsProvider already shut down");
}
}
- private synchronized void shutdown() throws InterruptedException, ExecutionException {
+ private synchronized void lockedShutdown() throws InterruptedException, ExecutionException {
// Try to get a transaction chain and indicate we are done
final TransactionChain chain = accessChain();
transactionChain = null;
@Override
protected void removeRegistration() {
- TopologyStatsProviderImpl.this.removeRegistration(this);
+ TopologyStatsProvider.this.removeRegistration(this);
}
}
}
<reference id="intructionFactory" interface="org.opendaylight.bgpcep.programming.spi.InstructionSchedulerFactory"/>
<reference id="pceServerProvider" interface="org.opendaylight.bgpcep.pcep.server.PceServerProvider"/>
+ <odl:clustered-app-config id="pcepStatsConfig"
+ binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.pcep.stats.provider.config.rev171113.PcepProvider"/>
+
<bean id="pcepTopologyTracker"
class="org.opendaylight.bgpcep.pcep.topology.provider.PCEPTopologyTracker"
destroy-method="close">
<argument ref="rpcProviderService"/>
<argument ref="pcepDispatcher"/>
<argument ref="intructionFactory"/>
- <argument ref="topologyStatsRegistry"/>
<argument ref="pceServerProvider"/>
+ <argument>
+ <bean factory-ref="pcepStatsConfig" factory-method="getTimer"/>
+ </argument>
</bean>
<bean id="topologyStatsRpcService"
<argument ref="dataBroker"/>
</bean>
<odl:rpc-implementation ref="topologyStatsRpcService"/>
-
- <odl:clustered-app-config id="pcepStatsConfig"
- binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.pcep.stats.provider.config.rev171113.PcepProvider"/>
-
- <bean id="topologyStatsRegistry"
- class="org.opendaylight.bgpcep.pcep.topology.provider.TopologyStatsProviderImpl"
- destroy-method="close">
- <argument ref="dataBroker"/>
- <argument>
- <bean factory-ref="pcepStatsConfig" factory-method="getTimer"/>
- </argument>
- </bean>
</blueprint>
@Mock
private ChannelFuture channelFuture;
@Mock
- private TopologySessionStatsRegistry statsRegistry;
+ private SessionStateRegistry stateRegistry;
@Mock
private PCEPTopologyProviderDependencies topologyDependencies;
@Mock
any(ChannelHandler.class));
doReturn(eventLoop).when(clientListener).eventLoop();
doAnswer(inv -> NoOpObjectRegistration.of(inv.getArgument(1, PcepSessionState.class)))
- .when(statsRegistry).bind(any(), any());
+ .when(stateRegistry).bind(any(), any());
doReturn(null).when(eventLoop).schedule(any(Runnable.class), any(long.class), any(TimeUnit.class));
doReturn(true).when(clientListener).isActive();
final InetSocketAddress ra = new InetSocketAddress(testAddress, 4189);
doReturn(mock(ChannelFuture.class)).when(clientListener).close();
doReturn(getDataBroker()).when(topologyDependencies).getDataBroker();
- doReturn(statsRegistry).when(topologyDependencies).getStateRegistry();
+ doReturn(stateRegistry).when(topologyDependencies).getStateRegistry();
doReturn(timer).when(topologyDependencies).getTimer();
doReturn(null).when(topologyDependencies).getPceServerProvider();