/** * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.md.statistics.manager.impl; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.SettableFuture; /** * statistics-manager * org.opendaylight.controller.md.statistics.manager.impl * * StatRpcMsgManagerImpl * Class register and provide all RPC Statistics Device Services and implement pre-defined * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor... * * In next Class implement process for joining multipart messages. * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg. * One Weak map is used for holding all Multipart Messages and second is used for possible input * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ... * * @author avishnoi@in.ibm.com Vaclav Demcak * */ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class); private final Cache> txCache; private final int queueCapacity = 5000; private final OpendaylightGroupStatisticsService groupStatsService; private final OpendaylightMeterStatisticsService meterStatsService; private final OpendaylightFlowStatisticsService flowStatsService; private final OpendaylightPortStatisticsService portStatsService; private final OpendaylightFlowTableStatisticsService flowTableStatsService; private final OpendaylightQueueStatisticsService queueStatsService; private BlockingQueue statsRpcJobQueue; private volatile boolean finishing = false; public StatRpcMsgManagerImpl (final StatisticsManager manager, final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) { Preconditions.checkArgument(manager != null, "StatisticManager can not be null!"); Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !"); groupStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class), "OpendaylightGroupStatisticsService can not be null!"); meterStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class), "OpendaylightMeterStatisticsService can not be null!"); flowStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class), "OpendaylightFlowStatisticsService can not be null!"); portStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class), "OpendaylightPortStatisticsService can not be null!"); flowTableStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class), "OpendaylightFlowTableStatisticsService can not be null!"); queueStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class), "OpendaylightQueueStatisticsService can not be null!"); statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity); /* nr. 7 is here nr. of possible statistic which are waiting for notification * - check it in StatPermCollectorImpl method collectStatCrossNetwork */ txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS) .maximumSize(10000).build(); } @Override public void close() { finishing = true; statsRpcJobQueue = null; } @Override public void run() { /* Neverending cyle - wait for finishing */ while ( ! finishing) { try { statsRpcJobQueue.take().call(); } catch (final Exception e) { LOG.warn("Stat Element RPC executor fail!", e); } } // Drain all rpcCall, making sure any blocked threads are unblocked while ( ! statsRpcJobQueue.isEmpty()) { statsRpcJobQueue.poll(); } } private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) { final boolean success = statsRpcJobQueue.offer(getAllStatJob); if ( ! success) { LOG.warn("Put RPC request getAllStat fail! Queue is full."); } } private void addStatJob(final RpcJobsQueue getStatJob) { final boolean success = statsRpcJobQueue.offer(getStatJob); if ( ! success) { LOG.debug("Put RPC request for getStat fail! Queue is full."); } } @Override public void registrationRpcFutureCallBack( final Future> future, final D inputObj, final NodeRef nodeRef, final SettableFuture resultTransId) { Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future), new FutureCallback>() { @Override public void onSuccess(final RpcResult result) { final TransactionId id = result.getResult().getTransactionId(); final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); if (id == null) { String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})"); LOG.warn("Node [{}] does not support statistics request type : {}", nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2))); } else { if (resultTransId != null) { resultTransId.set(id); } final String cacheKey = buildCacheKey(id, nodeKey.getId()); final TransactionCacheContainer container = new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId()); txCache.put(cacheKey, container); } } @Override public void onFailure(final Throwable t) { LOG.warn("Response Registration for Statistics RPC call fail!", t); } }); } private String buildCacheKey(final TransactionId id, final NodeId nodeId) { return String.valueOf(id.getValue()) + "-" + nodeId.getValue(); } @Override public Future>> getTransactionCacheContainer( final TransactionId id, final NodeId nodeId) { Preconditions.checkArgument(id != null, "TransactionId can not be null!"); Preconditions.checkArgument(nodeId != null, "NodeId can not be null!"); final String key = buildCacheKey(id, nodeId); final SettableFuture>> result = SettableFuture.create(); final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() { @Override public Void call() throws Exception { final Optional> resultContainer = Optional.> fromNullable(txCache.getIfPresent(key)); if (resultContainer.isPresent()) { txCache.invalidate(key); } result.set(resultContainer); return null; } }; addStatJob(getTransactionCacheContainer); return result; } @Override public Future isExpectedStatistics(final TransactionId id, final NodeId nodeId) { Preconditions.checkArgument(id != null, "TransactionId can not be null!"); Preconditions.checkArgument(nodeId != null, "NodeId can not be null!"); final String key = buildCacheKey(id, nodeId); final SettableFuture checkStatId = SettableFuture.create(); final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() { @Override public Void call() throws Exception { final Optional> result = Optional.> fromNullable(txCache.getIfPresent(key)); checkStatId.set(Boolean.valueOf(result.isPresent())); return null; } }; addStatJob(isExpecedStatistics); return checkStatId; } @Override public void addNotification(final TransactionAware notification, final NodeId nodeId) { Preconditions.checkArgument(notification != null, "TransactionAware can not be null!"); Preconditions.checkArgument(nodeId != null, "NodeId can not be null!"); final RpcJobsQueue addNotification = new RpcJobsQueue() { @Override public Void call() throws Exception { final TransactionId txId = notification.getTransactionId(); final String key = buildCacheKey(txId, nodeId); final TransactionCacheContainer container = (txCache.getIfPresent(key)); if (container != null) { container.addNotif(notification); } return null; } }; addStatJob(addNotification); } @Override public Future getAllGroupsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetAllGroupStatisticsInputBuilder builder = new GetAllGroupStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService .getAllGroupStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGroupStat); return result; } @Override public Future getAllMetersStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetAllMeterStatisticsInputBuilder builder = new GetAllMeterStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService .getAllMeterStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllMeterStat); return result; } @Override public Future getAllFlowsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowStatsService .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllFlowStat); return result; } @Override public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); Preconditions.checkArgument(tableId != null, "TableId can not be null!"); final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder = new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); builder.setNode(nodeRef); builder.setTableId(tableId); final TableBuilder tbuilder = new TableBuilder(); tbuilder.setId(tableId.getValue()); tbuilder.setKey(new TableKey(tableId.getValue())); registrationRpcFutureCallBack(flowStatsService .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null); return null; } }; addGetAllStatJob(getAggregateFlowStat); } @Override public Future getAllPortsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetAllNodeConnectorsStatisticsInputBuilder builder = new GetAllNodeConnectorsStatisticsInputBuilder(); builder.setNode(nodeRef); final Future> rpc = portStatsService.getAllNodeConnectorsStatistics(builder.build()); registrationRpcFutureCallBack(rpc, null, nodeRef, result); return null; } }; addGetAllStatJob(getAllPortsStat); return result; } @Override public Future getAllTablesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllTableStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetFlowTablesStatisticsInputBuilder builder = new GetFlowTablesStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowTableStatsService .getFlowTablesStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllTableStat); return result; } @Override public Future getAllQueueStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetAllQueuesStatisticsFromAllPortsInputBuilder builder = new GetAllQueuesStatisticsFromAllPortsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(queueStatsService .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllQueueStat); return result; } @Override public Future getAllMeterConfigStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetAllMeterConfigStatisticsInputBuilder builder = new GetAllMeterConfigStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(qetAllMeterConfStat); return result; } @Override public void getGroupFeaturesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() { @Override public Void call() throws Exception { /* RPC input */ final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder(); input.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null); return null; } }; addStatJob(getGroupFeaturesStat); } @Override public void getMeterFeaturesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() { @Override public Void call() throws Exception { /* RPC input */ final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder(); input.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null); return null; } }; addStatJob(getMeterFeaturesStat); } @Override public Future getAllGroupsConfStats(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() { @Override public Void call() throws Exception { final GetGroupDescriptionInputBuilder builder = new GetGroupDescriptionInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService .getGroupDescription(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGropConfStat); return result; } public class TransactionCacheContainerImpl implements TransactionCacheContainer { private final TransactionId id; private final NodeId nId; private final List notifications; private final Optional confInput; public TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) { this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!"); notifications = new CopyOnWriteArrayList(); confInput = Optional.fromNullable(input); nId = nodeId; } @Override public void addNotif(final T notif) { notifications.add(notif); } @Override public TransactionId getId() { return id; } @Override public NodeId getNodeId() { return nId; } @Override public List getNotifications() { return notifications; } @Override public Optional getConfInput() { return confInput; } } }