/**
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.md.statistics.manager.impl;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.SettableFuture;
/**
* statistics-manager
* org.opendaylight.controller.md.statistics.manager.impl
*
* StatRpcMsgManagerImpl
* Class register and provide all RPC Statistics Device Services and implement pre-defined
* wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
*
* In next Class implement process for joining multipart messages.
* Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
* One Weak map is used for holding all Multipart Messages and second is used for possible input
* Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
* TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
*
* @author avishnoi@in.ibm.com Vaclav Demcak
*
*/
public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
private final Cache> txCache;
private final int queueCapacity = 5000;
private final OpendaylightGroupStatisticsService groupStatsService;
private final OpendaylightMeterStatisticsService meterStatsService;
private final OpendaylightFlowStatisticsService flowStatsService;
private final OpendaylightPortStatisticsService portStatsService;
private final OpendaylightFlowTableStatisticsService flowTableStatsService;
private final OpendaylightQueueStatisticsService queueStatsService;
private BlockingQueue statsRpcJobQueue;
private volatile boolean finishing = false;
public StatRpcMsgManagerImpl (final StatisticsManager manager,
final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
groupStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
"OpendaylightGroupStatisticsService can not be null!");
meterStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
"OpendaylightMeterStatisticsService can not be null!");
flowStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
"OpendaylightFlowStatisticsService can not be null!");
portStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
"OpendaylightPortStatisticsService can not be null!");
flowTableStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
"OpendaylightFlowTableStatisticsService can not be null!");
queueStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
"OpendaylightQueueStatisticsService can not be null!");
statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
/* nr. 7 is here nr. of possible statistic which are waiting for notification
* - check it in StatPermCollectorImpl method collectStatCrossNetwork */
txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
.maximumSize(10000).build();
}
@Override
public void close() {
finishing = true;
statsRpcJobQueue = null;
}
@Override
public void run() {
/* Neverending cyle - wait for finishing */
while ( ! finishing) {
try {
statsRpcJobQueue.take().call();
}
catch (final Exception e) {
LOG.warn("Stat Element RPC executor fail!", e);
}
}
// Drain all rpcCall, making sure any blocked threads are unblocked
while ( ! statsRpcJobQueue.isEmpty()) {
statsRpcJobQueue.poll();
}
}
private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
final boolean success = statsRpcJobQueue.offer(getAllStatJob);
if ( ! success) {
LOG.warn("Put RPC request getAllStat fail! Queue is full.");
}
}
private void addStatJob(final RpcJobsQueue getStatJob) {
final boolean success = statsRpcJobQueue.offer(getStatJob);
if ( ! success) {
LOG.debug("Put RPC request for getStat fail! Queue is full.");
}
}
@Override
public void registrationRpcFutureCallBack(
final Future> future, final D inputObj, final NodeRef nodeRef,
final SettableFuture resultTransId) {
Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
new FutureCallback>() {
@Override
public void onSuccess(final RpcResult extends TransactionAware> result) {
final TransactionId id = result.getResult().getTransactionId();
final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
if (id == null) {
String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
LOG.warn("Node [{}] does not support statistics request type : {}",
nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
} else {
if (resultTransId != null) {
resultTransId.set(id);
}
final String cacheKey = buildCacheKey(id, nodeKey.getId());
final TransactionCacheContainer super TransactionAware> container =
new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
txCache.put(cacheKey, container);
}
}
@Override
public void onFailure(final Throwable t) {
LOG.warn("Response Registration for Statistics RPC call fail!", t);
}
});
}
private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
}
@Override
public Future>> getTransactionCacheContainer(
final TransactionId id, final NodeId nodeId) {
Preconditions.checkArgument(id != null, "TransactionId can not be null!");
Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
final String key = buildCacheKey(id, nodeId);
final SettableFuture>> result = SettableFuture.create();
final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final Optional> resultContainer =
Optional.> fromNullable(txCache.getIfPresent(key));
if (resultContainer.isPresent()) {
txCache.invalidate(key);
}
result.set(resultContainer);
return null;
}
};
addStatJob(getTransactionCacheContainer);
return result;
}
@Override
public Future isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
Preconditions.checkArgument(id != null, "TransactionId can not be null!");
Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
final String key = buildCacheKey(id, nodeId);
final SettableFuture checkStatId = SettableFuture.create();
final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final Optional> result =
Optional.> fromNullable(txCache.getIfPresent(key));
checkStatId.set(Boolean.valueOf(result.isPresent()));
return null;
}
};
addStatJob(isExpecedStatistics);
return checkStatId;
}
@Override
public void addNotification(final TransactionAware notification, final NodeId nodeId) {
Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
final RpcJobsQueue addNotification = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final TransactionId txId = notification.getTransactionId();
final String key = buildCacheKey(txId, nodeId);
final TransactionCacheContainer super TransactionAware> container = (txCache.getIfPresent(key));
if (container != null) {
container.addNotif(notification);
}
return null;
}
};
addStatJob(addNotification);
}
@Override
public Future getAllGroupsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetAllGroupStatisticsInputBuilder builder =
new GetAllGroupStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(groupStatsService
.getAllGroupStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllGroupStat);
return result;
}
@Override
public Future getAllMetersStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetAllMeterStatisticsInputBuilder builder =
new GetAllMeterStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(meterStatsService
.getAllMeterStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllMeterStat);
return result;
}
@Override
public Future getAllFlowsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(flowStatsService
.getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllFlowStat);
return result;
}
@Override
public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
Preconditions.checkArgument(tableId != null, "TableId can not be null!");
final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
builder.setNode(nodeRef);
builder.setTableId(tableId);
final TableBuilder tbuilder = new TableBuilder();
tbuilder.setId(tableId.getValue());
tbuilder.setKey(new TableKey(tableId.getValue()));
registrationRpcFutureCallBack(flowStatsService
.getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
return null;
}
};
addGetAllStatJob(getAggregateFlowStat);
}
@Override
public Future getAllPortsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetAllNodeConnectorsStatisticsInputBuilder builder =
new GetAllNodeConnectorsStatisticsInputBuilder();
builder.setNode(nodeRef);
final Future> rpc =
portStatsService.getAllNodeConnectorsStatistics(builder.build());
registrationRpcFutureCallBack(rpc, null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllPortsStat);
return result;
}
@Override
public Future getAllTablesStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetFlowTablesStatisticsInputBuilder builder =
new GetFlowTablesStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(flowTableStatsService
.getFlowTablesStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllTableStat);
return result;
}
@Override
public Future getAllQueueStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
new GetAllQueuesStatisticsFromAllPortsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(queueStatsService
.getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllQueueStat);
return result;
}
@Override
public Future getAllMeterConfigStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetAllMeterConfigStatisticsInputBuilder builder =
new GetAllMeterConfigStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(meterStatsService
.getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(qetAllMeterConfStat);
return result;
}
@Override
public void getGroupFeaturesStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
/* RPC input */
final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
input.setNode(nodeRef);
registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
return null;
}
};
addStatJob(getGroupFeaturesStat);
}
@Override
public void getMeterFeaturesStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
/* RPC input */
final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
input.setNode(nodeRef);
registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
return null;
}
};
addStatJob(getMeterFeaturesStat);
}
@Override
public Future getAllGroupsConfStats(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final SettableFuture result = SettableFuture.create();
final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
final GetGroupDescriptionInputBuilder builder =
new GetGroupDescriptionInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(groupStatsService
.getGroupDescription(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllGropConfStat);
return result;
}
public class TransactionCacheContainerImpl implements TransactionCacheContainer {
private final TransactionId id;
private final NodeId nId;
private final List notifications;
private final Optional extends DataObject> confInput;
public TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
notifications = new CopyOnWriteArrayList();
confInput = Optional.fromNullable(input);
nId = nodeId;
}
@Override
public void addNotif(final T notif) {
notifications.add(notif);
}
@Override
public TransactionId getId() {
return id;
}
@Override
public NodeId getNodeId() {
return nId;
}
@Override
public List getNotifications() {
return notifications;
}
@Override
public Optional extends DataObject> getConfInput() {
return confInput;
}
}
}