/* TODO move it to ConfigSubsystem */
private static final long DEFAULT_MIN_REQUEST_NET_MONITOR_INTERVAL = 3000L;
- private static final int MAX_NODES_FOR_COLLECTOR = 8;
+ private static final int MAX_NODES_FOR_COLLECTOR = 16;
private StatisticsManager statsProvider;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
.setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
- final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = fNodeIdent
- .child(Table.class, table.getKey()).augmentation(AggregateFlowStatisticsData.class);
+ final InstanceIdentifier<Table> tableRef = fNodeIdent.child(Table.class, table.getKey());
+ final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = tableRef
+ .augmentation(AggregateFlowStatisticsData.class);
Optional<FlowCapableNode> fNode = Optional.absent();
try {
fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
return;
}
if (fNode.isPresent()) {
- tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats, true);
+ ensureTable(tx, table.getId(), tableRef);
+ tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats);
}
}
}
});
}
+ public void ensureTable(final ReadWriteTransaction tx, final Short tableId, final InstanceIdentifier<Table> tableRef) {
+ final Table tableNew = new TableBuilder().setId(tableId).build();
+ tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
+ }
+
@Override
public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
final TransactionId transId = notification.getTransactionId();
}
}
- private void ensureTable(final ReadWriteTransaction tx) {
+ private void ensureTableFowHashIdMapping(final ReadWriteTransaction tx) {
if( ! tableEnsured) {
+ ensureTable(tx, tableKey.getId(), tableRef);
final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
.setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
- tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping, true);
+ tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping);
tableEnsured = true;
}
}
}
void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
- ensureTable(trans);
+ ensureTableFowHashIdMapping(trans);
final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
if (flowKey == null) {
void removeUnreportedFlows(final ReadWriteTransaction tx) {
final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
+ final List<InstanceIdentifier<Flow>> listMissingConfigFlows = notStatReportedConfigFlows();
final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
} else {
nodeDeleteMap.remove(flowRef);
}
+ } else {
+ if (listMissingConfigFlows.remove(flowRef)) {
+ break; // we probably lost some multipart msg
+ }
}
final InstanceIdentifier<FlowHashIdMap> flHashIdent =
tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
}
}
+
+ List<InstanceIdentifier<Flow>> notStatReportedConfigFlows() {
+ if (configFlows != null) {
+ final List<InstanceIdentifier<Flow>> returnList = new ArrayList<>(configFlows.size());
+ for (final Flow confFlow : configFlows) {
+ final InstanceIdentifier<Flow> confFlowIdent = tableRef.child(Flow.class, confFlow.getKey());
+ returnList.add(confFlowIdent);
+ }
+ return returnList;
+ }
+ return Collections.emptyList();
+ }
}
}
LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
}
if (node.isPresent()) {
- tx.put(LogicalDatastoreType.OPERATIONAL, groupFeatureIdent, stats, true);
+ tx.put(LogicalDatastoreType.OPERATIONAL, groupFeatureIdent, stats);
}
}
});
LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
}
if (fNode.isPresent()) {
- trans.put(LogicalDatastoreType.OPERATIONAL, gsIdent, stats, true);
+ trans.put(LogicalDatastoreType.OPERATIONAL, gsIdent, stats);
}
}
}
LOG.trace("Read Operational/DS for FlowCapableNode fail! Node {} doesn't exist.", fNodeIdent);
return;
}
- final List<Group> existGroups = fNode.get().getGroup().isEmpty()
- ? Collections.<Group> emptyList() : fNode.get().getGroup();
+ final List<Group> existGroups = fNode.get().getGroup() != null
+ ? fNode.get().getGroup() : Collections.<Group> emptyList();
/* Add all existed groups paths - no updated paths has to be removed */
for (final Group group : existGroups) {
if (deviceGroupKeys.remove(group.getKey())) {
LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
}
if (node.isPresent()) {
- tx.put(LogicalDatastoreType.OPERATIONAL, meterFeatureIdent, stats, true);
+ tx.put(LogicalDatastoreType.OPERATIONAL, meterFeatureIdent, stats);
}
}
});
LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
}
if (fNode.isPresent()) {
- trans.put(LogicalDatastoreType.OPERATIONAL, msIdent, stats, true);
+ trans.put(LogicalDatastoreType.OPERATIONAL, msIdent, stats);
}
}
}
LOG.trace("Read Operational/DS for FlowCapableNode fail! Node {} doesn't exist.", fNodeIdent);
return;
}
- final List<Meter> existMeters = fNode.get().getMeter().isEmpty()
- ? Collections.<Meter> emptyList() : fNode.get().getMeter();
+ final List<Meter> existMeters = fNode.get().getMeter() != null
+ ? fNode.get().getMeter() : Collections.<Meter> emptyList();
/* Add all existed groups paths - no updated paths has to be removed */
for (final Meter meter : existMeters) {
if (deviceMeterKeys.remove(meter.getKey())) {
.child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
.augmentation(FlowCapableNodeConnector.class)
.child(Queue.class, qKey).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
- trans.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build(), true);
+ trans.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build());
}
}
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
return;
}
for (final NodeConnectorStatisticsAndPortNumberMap nConnectPort : portStats) {
- final FlowCapableNodeConnectorStatisticsData stats = new FlowCapableNodeConnectorStatisticsDataBuilder()
- .setFlowCapableNodeConnectorStatistics(new FlowCapableNodeConnectorStatisticsBuilder(nConnectPort).build()).build();
+ final FlowCapableNodeConnectorStatistics stats = new FlowCapableNodeConnectorStatisticsBuilder(nConnectPort).build();
final NodeConnectorKey key = new NodeConnectorKey(nConnectPort.getNodeConnectorId());
final InstanceIdentifier<NodeConnector> nodeConnectorIdent = nodeIdent.child(NodeConnector.class, key);
final InstanceIdentifier<FlowCapableNodeConnectorStatisticsData> nodeConnStatIdent = nodeConnectorIdent
.augmentation(FlowCapableNodeConnectorStatisticsData.class);
+ final InstanceIdentifier<FlowCapableNodeConnectorStatistics> flowCapNodeConnStatIdent =
+ nodeConnStatIdent.child(FlowCapableNodeConnectorStatistics.class);
Optional<NodeConnector> fNodeConector;
try {
fNodeConector = tx.read(LogicalDatastoreType.OPERATIONAL, nodeConnectorIdent).checkedGet();
fNodeConector = Optional.absent();
}
if (fNodeConector.isPresent()) {
- tx.put(LogicalDatastoreType.OPERATIONAL, nodeConnStatIdent, stats);
+ tx.merge(LogicalDatastoreType.OPERATIONAL, nodeConnectorIdent, new NodeConnectorBuilder().setId(key.getId()).build());
+ tx.merge(LogicalDatastoreType.OPERATIONAL, nodeConnStatIdent, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
+ tx.put(LogicalDatastoreType.OPERATIONAL, flowCapNodeConnStatIdent, stats);
}
}
}
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
return;
}
for (final FlowTableAndStatisticsMap tableStat : tableStats) {
- final FlowTableStatisticsData stats = new FlowTableStatisticsDataBuilder()
- .setFlowTableStatistics(new FlowTableStatisticsBuilder(tableStat).build()).build();
-
- final InstanceIdentifier<FlowTableStatisticsData> tStatIdent = fNodeIdent
- .child(Table.class, new TableKey(tableStat.getTableId().getValue()))
+ final InstanceIdentifier<Table> tableIdent = fNodeIdent
+ .child(Table.class, new TableKey(tableStat.getTableId().getValue()));
+ final Table table = new TableBuilder().setId(tableStat.getTableId().getValue()).build();
+ trans.merge(LogicalDatastoreType.OPERATIONAL, tableIdent, table);
+ final InstanceIdentifier<FlowTableStatisticsData> tableStatIdent = tableIdent
.augmentation(FlowTableStatisticsData.class);
- trans.put(LogicalDatastoreType.OPERATIONAL, tStatIdent, stats, true);
+ trans.merge(LogicalDatastoreType.OPERATIONAL, tableStatIdent, new FlowTableStatisticsDataBuilder().build());
+
+ final FlowTableStatistics stats = new FlowTableStatisticsBuilder(tableStat).build();
+ final InstanceIdentifier<FlowTableStatistics> tStatIdent = tableStatIdent.child(FlowTableStatistics.class);
+ trans.put(LogicalDatastoreType.OPERATIONAL, tStatIdent, stats);
}
}
}
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
private final long maxLifeForRequest = 50; /* 50 second */
private final int queueCapacity = 5000;
- private final StatisticsManager manager;
private final OpendaylightGroupStatisticsService groupStatsService;
private final OpendaylightMeterStatisticsService meterStatsService;
public StatRpcMsgManagerImpl (final StatisticsManager manager,
final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
- this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!");
+ Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
groupStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
@Override
public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
-
- manager.enqueue(new StatDataStoreOperation() {
+ Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ Preconditions.checkArgument(tableId != null, "TableId can not be null!");
+ final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
@Override
- public void applyOperation(final ReadWriteTransaction tx) {
- final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
- @Override
- public Void call() throws Exception {
- final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
- new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
- builder.setNode(nodeRef);
- builder.setTableId(tableId);
-
- final TableBuilder tbuilder = new TableBuilder();
- tbuilder.setId(tableId.getValue());
- tbuilder.setKey(new TableKey(tableId.getValue()));
- registrationRpcFutureCallBack(flowStatsService
- .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
- return null;
- }
- };
- addGetAllStatJob(getAggregateFlowStat);
+ public Void call() throws Exception {
+ final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
+ new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+ builder.setNode(nodeRef);
+ builder.setTableId(tableId);
+
+ final TableBuilder tbuilder = new TableBuilder();
+ tbuilder.setId(tableId.getValue());
+ tbuilder.setKey(new TableKey(tableId.getValue()));
+ registrationRpcFutureCallBack(flowStatsService
+ .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
+ return null;
}
- });
+ };
+ addGetAllStatJob(getAggregateFlowStat);
}
@Override
private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private static final int QUEUE_DEPTH = 1000;
- private static final int MAX_BATCH = 1;
+ private static final int QUEUE_DEPTH = 5000;
+ private static final int MAX_BATCH = 100;
private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
@Override
public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
for (final StatPermCollector collector : statCollectors) {
if (collector.disconnectedNodeUnregistration(nodeIdent)) {
if ( ! collector.hasActiveNodes()) {