/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import com.google.common.base.Optional;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* statistics-manager
* org.opendaylight.openflowplugin.applications.statistics.manager.impl
*
* StatPermCollectorImpl
* Thread base statistic collector. Class holds internal map for all registered
* (means connected) nodes with List of Switch capabilities;
* Statistics collecting process get cross whole Network Device by device
* and statistic by statistic (follow Switch capabilities to prevent unnecessary
* ask) Next statistic start collecting by notification or by timeout.
*
* @author @author avishnoi@in.ibm.com Vaclav Demcak
*
*/
public class StatPermCollectorImpl implements StatPermCollector {
private static final Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
private static final long STAT_COLLECT_TIME_OUT = 3000L;
/**
sleep 5 second before collecting all statistics cycles is important
for loading all Nodes to Operational/DS
*/
private static final long WAIT_BEFORE_COLLECTING_STATS = 5000;
private final ExecutorService statNetCollectorServ;
private final StatisticsManager manager;
private final int maxNodeForCollector;
private final long minReqNetInterval;
private final String name;
private final Object statCollectorLock = new Object();
private final Object statNodeHolderLock = new Object();
private final Object transNotifyLock = new Object();
private Map, StatNodeInfoHolder> statNodeHolder =
Collections., StatNodeInfoHolder> emptyMap();
private volatile boolean wakeMe = false;
private volatile boolean finishing = false;
private TransactionId actualTransactionId;
public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
final int maxNodeForCollectors) {
this.manager = Preconditions.checkNotNull(manager, "StatisticsManager can not be null!");
name = "odl-stat-collector-" + nr;
minReqNetInterval = minReqNetInterv;
final ThreadFactory threadFact = new ThreadFactoryBuilder()
.setNameFormat(name + "-thread-%d").build();
statNetCollectorServ = Executors.newSingleThreadExecutor(threadFact);
maxNodeForCollector = maxNodeForCollectors;
LOG.trace("StatCollector {} start successful!", name);
}
/**
* finish collecting statistics
*/
@Override
public void close() {
statNodeHolder = Collections., StatNodeInfoHolder> emptyMap();
finishing = true;
collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
@Override
public boolean hasActiveNodes() {
return ( ! statNodeHolder.isEmpty());
}
@Override
public boolean isProvidedFlowNodeActive(
final InstanceIdentifier flowNode) {
return statNodeHolder.containsKey(flowNode);
}
@Override
public boolean connectedNodeRegistration(final InstanceIdentifier ident,
final List statTypes, final Short nrOfSwitchTables) {
if (isNodeIdentValidForUse(ident) && ! statNodeHolder.containsKey(ident)) {
synchronized (statNodeHolderLock) {
final boolean startStatCollecting = statNodeHolder.size() == 0;
if ( ! statNodeHolder.containsKey(ident)) {
if (statNodeHolder.size() >= maxNodeForCollector) {
return false;
}
final Map, StatNodeInfoHolder> statNode =
new HashMap<>(statNodeHolder);
final NodeRef nodeRef = new NodeRef(ident);
final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(nodeRef,
statTypes, nrOfSwitchTables);
statNode.put(ident, nodeInfoHolder);
statNodeHolder = Collections.unmodifiableMap(statNode);
}
if (startStatCollecting) {
finishing = false;
statNetCollectorServ.execute(this);
}
}
}
return true;
}
@Override
public boolean disconnectedNodeUnregistration(final InstanceIdentifier ident) {
if (isNodeIdentValidForUse(ident) && statNodeHolder.containsKey(ident)) {
synchronized (statNodeHolderLock) {
if (statNodeHolder.containsKey(ident)) {
final Map, StatNodeInfoHolder> statNode =
new HashMap<>(statNodeHolder);
statNode.remove(ident);
statNodeHolder = Collections.unmodifiableMap(statNode);
}
if (statNodeHolder.isEmpty()) {
finishing = true;
collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
return true;
}
}
return false;
}
@Override
public boolean registerAdditionalNodeFeature(final InstanceIdentifier ident,
final StatCapabTypes statCapab) {
if (isNodeIdentValidForUse(ident)) {
if ( ! statNodeHolder.containsKey(ident)) {
return false;
}
final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
if ( ! statNode.getStatMarkers().contains(statCapab)) {
synchronized (statNodeHolderLock) {
if ( ! statNode.getStatMarkers().contains(statCapab)) {
final List statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
statCapabForEdit.add(statCapab);
final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
final Map, StatNodeInfoHolder> statNodes =
new HashMap<>(statNodeHolder);
statNodes.put(ident, nodeInfoHolder);
statNodeHolder = Collections.unmodifiableMap(statNodes);
}
}
}
}
return true;
}
@Override
public void collectNextStatistics(final TransactionId xid) {
if (checkTransactionId(xid) && wakeMe) {
synchronized (statCollectorLock) {
if (wakeMe) {
LOG.trace("STAT-COLLECTOR is notified to conntinue");
statCollectorLock.notify();
}
}
}
}
@Override
public void run() {
try {
Thread.sleep(WAIT_BEFORE_COLLECTING_STATS);
}
catch (final InterruptedException e1) {
// NOOP
}
LOG.debug("StatCollector {} Start collecting!", name);
/* Neverending cyle - wait for finishing */
while ( ! finishing) {
boolean collecting = false;
final long startTime = System.currentTimeMillis();
if ( ! statNodeHolder.isEmpty()) {
collecting = true;
collectStatCrossNetwork();
collecting = false;
}
if ( ! collecting) {
final long statFinalTime = System.currentTimeMillis() - startTime;
LOG.debug("STAT-MANAGER {}: last all NET statistics collection cost {} ms", name, statFinalTime);
if (statFinalTime < minReqNetInterval) {
LOG.trace("statCollector is about to make a collecting sleep");
synchronized (statCollectorLock) {
wakeMe = true;
try {
final long waitTime = minReqNetInterval - statFinalTime;
statCollectorLock.wait(waitTime);
LOG.trace("STAT-MANAGER : statCollector {} is waking up from a collecting sleep for {} ms", name, waitTime);
} catch (final InterruptedException e) {
LOG.warn("statCollector has been interrupted during collecting sleep", e);
} finally {
wakeMe = false;
}
}
}
}
}
}
private void waitingForNotification() {
synchronized (statCollectorLock) {
wakeMe = true;
try {
statCollectorLock.wait(STAT_COLLECT_TIME_OUT);
LOG.trace("statCollector is waking up from a wait stat Response sleep");
} catch (final InterruptedException e) {
LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
} finally {
setActualTransactionId(null);
wakeMe = false;
}
}
}
private void collectStatCrossNetwork() {
for (final Entry, StatNodeInfoHolder> nodeEntity : statNodeHolder.entrySet()) {
final NodeKey nodeKey = nodeEntity.getKey().firstKeyOf(Node.class);
if (!this.isThisInstanceNodeOwner(nodeKey.getId())) {
continue;
}
LOG.trace("collectStatCrossNetwork: Controller is owner of the " +
"node {}, so collecting the statistics.",nodeKey);
final List listNeededStat = nodeEntity.getValue().getStatMarkers();
final NodeRef actualNodeRef = nodeEntity.getValue().getNodeRef();
final Short maxTables = nodeEntity.getValue().getMaxTables();
for (final StatCapabTypes statMarker : listNeededStat) {
if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
break;
}
try {
switch (statMarker) {
case PORT_STATS:
LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
waitingForNotification();
break;
case QUEUE_STATS:
LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
waitingForNotification();
break;
case TABLE_STATS:
LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
waitingForNotification();
break;
case GROUP_STATS:
LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
waitingForNotification();
setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
waitingForNotification();
break;
case METER_STATS:
LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
waitingForNotification();
setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
waitingForNotification();
break;
case FLOW_STATS:
LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
waitingForNotification();
/*LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
for (short i = 0; i < maxTables; i++) {
final TableId tableId = new TableId(i);
manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
}*/
break;
default:
/* Exception for programmers in implementation cycle */
throw new IllegalStateException("Not implemented ASK for " + statMarker);
}
} catch (InterruptedException | ExecutionException ex) {
LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
continue;
}
}
}
}
private boolean isThisInstanceNodeOwner(NodeId nodeId) {
return manager.getNodeRegistrator().isFlowCapableNodeOwner(nodeId);
}
private class StatNodeInfoHolder {
private final NodeRef nodeRef;
private final List statMarkers;
private final Short maxTables;
public StatNodeInfoHolder(final NodeRef nodeRef,
final List statMarkers, final Short maxTables) {
this.nodeRef = nodeRef;
this.maxTables = maxTables;
this.statMarkers = statMarkers;
}
public final NodeRef getNodeRef() {
return nodeRef;
}
public final List getStatMarkers() {
return statMarkers;
}
public final Short getMaxTables() {
return maxTables;
}
}
private boolean isNodeIdentValidForUse(final InstanceIdentifier ident) {
if (ident == null) {
LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
return false;
}
if (ident.isWildcarded()) {
LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
return false;
}
return true;
}
private boolean checkTransactionId(final TransactionId xid) {
synchronized (transNotifyLock) {
return actualTransactionId != null && actualTransactionId.equals(xid);
}
}
private void setActualTransactionId(final TransactionId transactionId) {
synchronized (transNotifyLock) {
actualTransactionId = transactionId;
}
}
}