X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Fhoneynode%2F2.2.1%2Fnetconf-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fimpl%2Fosgi%2FNetconfSessionMonitoringService.java;fp=tests%2Fhoneynode%2F2.2.1%2Fnetconf-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fimpl%2Fosgi%2FNetconfSessionMonitoringService.java;h=639daa803d35a2627f18acb120133e868484d01a;hb=b90fdacec82f92e07d14ed1df31e3fe53275c676;hp=0000000000000000000000000000000000000000;hpb=1030996d2cfc0cec3c6c2b5d8da37c6188122372;p=transportpce.git diff --git a/tests/honeynode/2.2.1/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java b/tests/honeynode/2.2.1/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java new file mode 100644 index 000000000..639daa803 --- /dev/null +++ b/tests/honeynode/2.2.1/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.impl.osgi; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; +import org.opendaylight.netconf.api.monitoring.NetconfManagementSession; +import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService; +import org.opendaylight.netconf.api.monitoring.SessionEvent; +import org.opendaylight.netconf.api.monitoring.SessionListener; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements {@link SessionListener} to receive updates about Netconf sessions. Instance notifies its + * listeners + * about session start and end. It also publishes on regular interval list of sessions, + * where events like rpc or notification happened. + */ +class NetconfSessionMonitoringService implements SessionListener, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionMonitoringService.class); + + private final Set sessions = Sets.newHashSet(); + private final Set changedSessions = Sets.newHashSet(); + private final Set listeners = Sets.newHashSet(); + private final ScheduledExecutorService executor; + private final long updateInterval; + private boolean running; + + /** + * Constructor for {@code NetconfSessionMonitoringService}. + * + * @param schedulingThreadPool thread pool for scheduling session stats updates. If not present, updates won't be + * scheduled. + * @param updateInterval update interval. If is less than 0, updates won't be scheduled + */ + NetconfSessionMonitoringService(Optional schedulingThreadPool, long updateInterval) { + this.updateInterval = updateInterval; + if (schedulingThreadPool.isPresent() && updateInterval > 0) { + this.executor = schedulingThreadPool.get().getExecutor(); + LOG.info("/netconf-state/sessions will be updated every {} seconds.", updateInterval); + } else { + LOG.info("Scheduling thread pool is present = {}, " + + "update interval {}: /netconf-state/sessions won't be updated.", + schedulingThreadPool.isPresent(), updateInterval); + this.executor = null; + } + } + + synchronized Sessions getSessions() { + final Collection managementSessions = Collections2.transform(sessions, + NetconfManagementSession::toManagementSession); + return new SessionsBuilder().setSession(ImmutableList.copyOf(managementSessions)).build(); + } + + @Override + public synchronized void onSessionUp(final NetconfManagementSession session) { + LOG.debug("Session {} up", session); + Preconditions.checkState(!sessions.contains(session), "Session %s was already added", session); + sessions.add(session); + notifySessionUp(session); + } + + @Override + public synchronized void onSessionDown(final NetconfManagementSession session) { + LOG.debug("Session {} down", session); + Preconditions.checkState(sessions.contains(session), "Session %s not present", session); + sessions.remove(session); + changedSessions.remove(session); + notifySessionDown(session); + } + + @Override + public synchronized void onSessionEvent(SessionEvent event) { + changedSessions.add(event.getSession()); + } + + synchronized AutoCloseable registerListener(final NetconfMonitoringService.SessionsListener listener) { + listeners.add(listener); + if (!running) { + startUpdateSessionStats(); + } + return () -> listeners.remove(listener); + } + + @Override + public synchronized void close() { + stopUpdateSessionStats(); + listeners.clear(); + sessions.clear(); + } + + private synchronized void updateSessionStats() { + if (changedSessions.isEmpty()) { + return; + } + final List changed = changedSessions.stream() + .map(NetconfManagementSession::toManagementSession) + .collect(Collectors.toList()); + final ImmutableList sessionImmutableList = ImmutableList.copyOf(changed); + for (NetconfMonitoringService.SessionsListener listener : listeners) { + listener.onSessionsUpdated(sessionImmutableList); + } + changedSessions.clear(); + } + + private void notifySessionUp(NetconfManagementSession managementSession) { + Session session = managementSession.toManagementSession(); + for (NetconfMonitoringService.SessionsListener listener : listeners) { + listener.onSessionStarted(session); + } + } + + private void notifySessionDown(NetconfManagementSession managementSession) { + Session session = managementSession.toManagementSession(); + for (NetconfMonitoringService.SessionsListener listener : listeners) { + listener.onSessionEnded(session); + } + } + + private void startUpdateSessionStats() { + if (executor != null) { + executor.scheduleAtFixedRate(this::updateSessionStats, 1, updateInterval, TimeUnit.SECONDS); + running = true; + } + } + + private void stopUpdateSessionStats() { + if (executor != null) { + executor.shutdownNow(); + running = false; + } + } +}