X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsGatheringUtils.java;h=7c1128b93f64e7c616f94b5f719f19657f8dcf67;hb=refs%2Fchanges%2F24%2F72524%2F3;hp=22ea5b560fa0f80f0d93ba2dd7b09a677c80c6ba;hpb=888fadc5d77ea2b4d020cd1bcaf62e7aa39f0a2c;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java old mode 100644 new mode 100755 index 22ea5b560f..7c1128b93f --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java @@ -10,115 +10,58 @@ package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureFallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.List; -import javax.annotation.Nullable; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry; import org.opendaylight.openflowplugin.api.openflow.device.TxFacade; import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; -import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey; import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry; import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer; -import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory; -import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter; +import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil; +import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap; -import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap; -import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Utils for gatherig statistics + * Utils for gathering statistics. */ public final class StatisticsGatheringUtils { - private static String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; - + private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class); private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-"; @@ -126,371 +69,164 @@ public final class StatisticsGatheringUtils { throw new IllegalStateException("This class should not be instantiated."); } - //TODO: Flow-,Group- and Meter- registry should be not in device context, consider move it in separate class - static ListenableFuture gatherStatistics(final StatisticsGatherer statisticsGatheringService, - final DeviceInfo deviceInfo, - final MultipartType type, - final TxFacade txFacade, - final DeviceRegistry registry, - final Boolean initial, - final SinglePurposeMultipartReplyTranslator multipartReplyTranslator) { - EventIdentifier wholeProcessEventIdentifier = null; - if (MultipartType.OFPMPFLOW.equals(type)) { - wholeProcessEventIdentifier = new EventIdentifier(type.toString(), deviceInfo.getNodeId().getValue()); - EventsTimeCounter.markStart(wholeProcessEventIdentifier); - } - final EventIdentifier ofpQueuToRequestContextEventIdentifier = new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()); - final ListenableFuture>> statisticsDataInFuture = - JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType( - ofpQueuToRequestContextEventIdentifier, type)); - return transformAndStoreStatisticsData(statisticsDataInFuture, deviceInfo, wholeProcessEventIdentifier, type, txFacade, registry, initial, multipartReplyTranslator); - } - - private static ListenableFuture transformAndStoreStatisticsData(final ListenableFuture>> statisticsDataInFuture, - final DeviceInfo deviceInfo, - final EventIdentifier eventIdentifier, - final MultipartType type, - final TxFacade txFacade, - final DeviceRegistry registry, - final boolean initial, - final SinglePurposeMultipartReplyTranslator multipartReplyTranslator) { - return Futures.transform(statisticsDataInFuture, new AsyncFunction>, Boolean>() { - @Nullable - @Override - public ListenableFuture apply(final RpcResult> rpcResult) { - boolean isMultipartProcessed = Boolean.TRUE; - if (rpcResult.isSuccessful()) { + static ListenableFuture gatherStatistics( + final StatisticsGatherer statisticsGatheringService, final DeviceInfo deviceInfo, + final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry, + final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider, + final ListeningExecutorService executorService) { + return Futures.transformAsync(statisticsGatheringService.getStatisticsOfType( + new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type), + rpcResult -> executorService.submit(() -> { + final boolean rpcResultIsNull = rpcResult == null; + + if (!rpcResultIsNull && rpcResult.isSuccessful()) { LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type); - - // TODO: in case the result value is null then multipart data probably got processed on the fly - - // TODO: this contract should by clearly stated and enforced - now simple true value is returned - if (null != rpcResult.getResult()) { - Iterable allMultipartData = Collections.emptyList(); - DataObject multipartData = null; - - - try { - for (final MultipartReply singleReply : rpcResult.getResult()) { - final List multipartDataList = multipartReplyTranslator.translate( - deviceInfo.getDatapathId(), - deviceInfo.getVersion(), singleReply); - multipartData = multipartDataList.get(0); - allMultipartData = Iterables.concat(allMultipartData, multipartDataList); - } - } catch (final Exception e) { - LOG.warn("stats processing of type {} for node {} failed during transfomation step", - type, deviceInfo.getNodeId(), e); - return Futures.immediateFailedFuture(e); - } - - - try { - if (multipartData instanceof GroupStatisticsUpdated) { - processGroupStatistics((Iterable) allMultipartData, deviceInfo, txFacade); - } else if (multipartData instanceof MeterStatisticsUpdated) { - processMetersStatistics((Iterable) allMultipartData, deviceInfo, txFacade); - } else if (multipartData instanceof NodeConnectorStatisticsUpdate) { - processNodeConnectorStatistics((Iterable) allMultipartData, deviceInfo, txFacade); - } else if (multipartData instanceof FlowTableStatisticsUpdate) { - processFlowTableStatistics((Iterable) allMultipartData, deviceInfo, txFacade); - } else if (multipartData instanceof QueueStatisticsUpdate) { - processQueueStatistics((Iterable) allMultipartData, deviceInfo, txFacade); - } else if (multipartData instanceof FlowsStatisticsUpdate) { - /* FlowStat Processing is realized by NettyThread only by initPhase, otherwise it is realized - * by MD-SAL thread */ - return processFlowStatistics((Iterable) allMultipartData, deviceInfo, txFacade, registry.getDeviceFlowRegistry(), initial, eventIdentifier); - - } else if (multipartData instanceof GroupDescStatsUpdated) { - processGroupDescStats((Iterable) allMultipartData, deviceInfo, txFacade, registry.getDeviceGroupRegistry()); - } else if (multipartData instanceof MeterConfigStatsUpdated) { - processMeterConfigStatsUpdated((Iterable) allMultipartData, deviceInfo, txFacade, registry.getDeviceMeterRegistry()); - } else { - isMultipartProcessed = Boolean.FALSE; - } - } catch (final Exception e) { - LOG.warn("stats processing of type {} for node {} failed during write-to-tx step", - type, deviceInfo.getNodeId(), e); - return Futures.immediateFailedFuture(e); - } - - LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type); - - //TODO : implement experimenter + // TODO: in case the result value is null then multipart data probably got processed + // TODO: on the fly. This contract should by clearly stated and enforced. + // TODO: Now simple true value is returned + if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) { + final List allMultipartData = rpcResult.getResult().stream() + .map(reply -> MultipartReplyTranslatorUtil + .translate(reply, deviceInfo, convertorExecutor, null)) + .filter(java.util.Optional::isPresent).map(java.util.Optional::get) + .collect(Collectors.toList()); + + return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo, + statisticsWriterProvider); } else { LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type); } - } else { - LOG.debug("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type, rpcResult.getErrors()); - isMultipartProcessed = Boolean.FALSE; + LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type, + rpcResultIsNull ? "" : rpcResult.getErrors()); } - return Futures.immediateFuture(isMultipartProcessed); - } - }); + return false; + }), MoreExecutors.directExecutor()); } - private static void processMeterConfigStatsUpdated(final Iterable data, - final DeviceInfo deviceInfo, - final TxFacade txFacade, - final DeviceMeterRegistry meterRegistry) throws Exception { - final InstanceIdentifier fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo); - deleteAllKnownMeters(meterRegistry, fNodeIdent, txFacade); - for (final MeterConfigStatsUpdated meterConfigStatsUpdated : data) { - for (final MeterConfigStats meterConfigStats : meterConfigStatsUpdated.getMeterConfigStats()) { - final MeterId meterId = meterConfigStats.getMeterId(); - final InstanceIdentifier meterInstanceIdentifier = fNodeIdent.child(Meter.class, new MeterKey(meterId)); - - final MeterBuilder meterBuilder = new MeterBuilder(meterConfigStats); - meterBuilder.setKey(new MeterKey(meterId)); - meterBuilder.addAugmentation(NodeMeterStatistics.class, new NodeMeterStatisticsBuilder().build()); - meterRegistry.store(meterId); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, meterInstanceIdentifier, meterBuilder.build()); - } + private static boolean processStatistics(final MultipartType type, final List statistics, + final TxFacade txFacade, final DeviceRegistry deviceRegistry, + final DeviceInfo deviceInfo, + final MultipartWriterProvider statisticsWriterProvider) { + final InstanceIdentifier instanceIdentifier = deviceInfo.getNodeInstanceIdentifier() + .augmentation(FlowCapableNode.class); + + switch (type) { + case OFPMPFLOW: + deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry()); + deviceRegistry.getDeviceFlowRegistry().processMarks(); + break; + case OFPMPMETERCONFIG: + deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry()); + deviceRegistry.getDeviceMeterRegistry().processMarks(); + break; + case OFPMPGROUPDESC: + deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry()); + deviceRegistry.getDeviceGroupRegistry().processMarks(); + break; + default: + // no operation } - txFacade.submitTransaction(); - } - private static ListenableFuture processFlowStatistics(final Iterable data, - final DeviceInfo deviceInfo, - final TxFacade txFacade, - final DeviceFlowRegistry flowRegistry, - final boolean initial, - final EventIdentifier eventIdentifier) { - final ListenableFuture deleteFuture = initial ? Futures.immediateFuture(null) : deleteAllKnownFlows(deviceInfo, - flowRegistry, txFacade); - return Futures.transform(deleteFuture, new Function() { + if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) { + txFacade.submitTransaction(); - @Override - public Boolean apply(final Void input) { - writeFlowStatistics(data, deviceInfo, flowRegistry, txFacade); - txFacade.submitTransaction(); - EventsTimeCounter.markEnd(eventIdentifier); - return Boolean.TRUE; - } - }); - } - - public static void writeFlowStatistics(final Iterable data, - final DeviceInfo deviceInfo, - final DeviceFlowRegistry registry, - final TxFacade txFacade) { - final InstanceIdentifier fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo); - try { - for (final FlowsStatisticsUpdate flowsStatistics : data) { - for (final FlowAndStatisticsMapList flowStat : flowsStatistics.getFlowAndStatisticsMapList()) { - final FlowBuilder flowBuilder = new FlowBuilder(flowStat); - flowBuilder.addAugmentation(FlowStatisticsData.class, refineFlowStatisticsAugmentation(flowStat).build()); - - final short tableId = flowStat.getTableId(); - final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(flowBuilder.build()); - final FlowId flowId = registry.storeIfNecessary(flowRegistryKey); - - final FlowKey flowKey = new FlowKey(flowId); - flowBuilder.setKey(flowKey); - final TableKey tableKey = new TableKey(tableId); - final InstanceIdentifier flowIdent = fNodeIdent.child(Table.class, tableKey).child(Flow.class, flowKey); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build()); - } - } - } catch (Exception e) { - LOG.warn("Not able to write to transaction: {}", e.getMessage()); + LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type); + return true; } - } - /** - * Method extracts flow statistics out of flowAndStatistics model - * - * @param flowAndStats - */ - private static FlowStatisticsDataBuilder refineFlowStatisticsAugmentation(final FlowAndStatisticsMapList flowAndStats) { - final FlowStatisticsBuilder flowStatisticsBuilder = new FlowStatisticsBuilder(flowAndStats); - final FlowStatisticsDataBuilder flowStatisticsDataBld = new FlowStatisticsDataBuilder(); - flowStatisticsDataBld.setFlowStatistics(flowStatisticsBuilder.build()); - return flowStatisticsDataBld; + LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo); + return false; } - public static ListenableFuture deleteAllKnownFlows(final DeviceInfo deviceInfo, - final DeviceFlowRegistry registry, - final TxFacade txFacade) { - final InstanceIdentifier flowCapableNodePath = assembleFlowCapableNodeInstanceIdentifier(deviceInfo); - final ReadOnlyTransaction readTx = txFacade.getReadTransaction(); - final CheckedFuture, ReadFailedException> flowCapableNodeFuture = readTx.read( - LogicalDatastoreType.OPERATIONAL, flowCapableNodePath); - - /* we wish to close readTx for fallBack */ - Futures.withFallback(flowCapableNodeFuture, new FutureFallback>() { - - @Override - public ListenableFuture> create(final Throwable t) throws Exception { - readTx.close(); - return Futures.immediateFailedFuture(t); - } - }); - /* - * we have to read actual tables with all information before we set empty Flow list, merge is expensive and - * not applicable for lists - */ - return Futures.transform(flowCapableNodeFuture, new AsyncFunction, Void>() { - - @Override - public ListenableFuture apply(final Optional flowCapNodeOpt) throws Exception { - if (flowCapNodeOpt.isPresent()) { - for (final Table tableData : flowCapNodeOpt.get().getTable()) { - final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build(); - final InstanceIdentifier iiToTable = flowCapableNodePath.child(Table.class, tableData.getKey()); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table); - } - } - registry.removeMarked(); - readTx.close(); - return Futures.immediateFuture(null); - } + @SuppressWarnings("checkstyle:IllegalCatch") + private static boolean writeStatistics(final MultipartType type, final List statistics, + final DeviceInfo deviceInfo, + final MultipartWriterProvider statisticsWriterProvider) { + final AtomicBoolean result = new AtomicBoolean(false); - }); - } + try { + statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> { + final boolean write = p.write(stat, false); - private static void processQueueStatistics(final Iterable data, final DeviceInfo deviceInfo, final TxFacade txFacade) throws Exception { - // TODO: clean all queues of all node-connectors before writing up-to-date stats - final InstanceIdentifier nodeIdent = deviceInfo.getNodeInstanceIdentifier(); - for (final QueueStatisticsUpdate queueStatisticsUpdate : data) { - for (final QueueIdAndStatisticsMap queueStat : queueStatisticsUpdate.getQueueIdAndStatisticsMap()) { - if (queueStat.getQueueId() != null) { - final FlowCapableNodeConnectorQueueStatistics statChild = - new FlowCapableNodeConnectorQueueStatisticsBuilder(queueStat).build(); - final FlowCapableNodeConnectorQueueStatisticsDataBuilder statBuild = - new FlowCapableNodeConnectorQueueStatisticsDataBuilder(); - statBuild.setFlowCapableNodeConnectorQueueStatistics(statChild); - final QueueKey qKey = new QueueKey(queueStat.getQueueId()); - final InstanceIdentifier queueIdent = nodeIdent - .child(NodeConnector.class, new NodeConnectorKey(queueStat.getNodeConnectorId())) - .augmentation(FlowCapableNodeConnector.class) - .child(Queue.class, qKey); - final QueueBuilder queueBuilder = new QueueBuilder() - .setKey(qKey) - .setQueueId(queueStat.getQueueId()) - // node-connector-id is already contained in parent node and the port-id here is of incompatible format - .addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, statBuild.build()); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, queueIdent, queueBuilder.build()); + if (!result.get()) { + result.set(write); } - } - } - txFacade.submitTransaction(); - } - - private static void processFlowTableStatistics(final Iterable data, final DeviceInfo deviceInfo, final TxFacade txFacade) throws Exception { - final InstanceIdentifier fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo); - for (final FlowTableStatisticsUpdate flowTableStatisticsUpdate : data) { - - for (final FlowTableAndStatisticsMap tableStat : flowTableStatisticsUpdate.getFlowTableAndStatisticsMap()) { - final InstanceIdentifier tStatIdent = fNodeIdent.child(Table.class, new TableKey(tableStat.getTableId().getValue())) - .augmentation(FlowTableStatisticsData.class).child(FlowTableStatistics.class); - final FlowTableStatistics stats = new FlowTableStatisticsBuilder(tableStat).build(); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tStatIdent, stats); - } + })); + } catch (final Exception ex) { + LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo, + ex); } - txFacade.submitTransaction(); - } - private static void processNodeConnectorStatistics(final Iterable data, final DeviceInfo deviceInfo, final TxFacade txFacade) throws Exception { - final InstanceIdentifier nodeIdent = deviceInfo.getNodeInstanceIdentifier(); - for (final NodeConnectorStatisticsUpdate nodeConnectorStatisticsUpdate : data) { - for (final NodeConnectorStatisticsAndPortNumberMap nConnectPort : nodeConnectorStatisticsUpdate.getNodeConnectorStatisticsAndPortNumberMap()) { - final FlowCapableNodeConnectorStatistics stats = new FlowCapableNodeConnectorStatisticsBuilder(nConnectPort).build(); - final NodeConnectorKey key = new NodeConnectorKey(nConnectPort.getNodeConnectorId()); - final InstanceIdentifier nodeConnectorIdent = nodeIdent.child(NodeConnector.class, key); - final InstanceIdentifier nodeConnStatIdent = nodeConnectorIdent - .augmentation(FlowCapableNodeConnectorStatisticsData.class); - final InstanceIdentifier flowCapNodeConnStatIdent = - nodeConnStatIdent.child(FlowCapableNodeConnectorStatistics.class); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, flowCapNodeConnStatIdent, stats); - } - } - txFacade.submitTransaction(); - } - - private static void processMetersStatistics(final Iterable data, - final DeviceInfo deviceInfo, - final TxFacade txFacade) throws Exception { - final InstanceIdentifier fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo); - for (final MeterStatisticsUpdated meterStatisticsUpdated : data) { - for (final MeterStats mStat : meterStatisticsUpdated.getMeterStats()) { - final MeterStatistics stats = new MeterStatisticsBuilder(mStat).build(); - final MeterId meterId = mStat.getMeterId(); - final InstanceIdentifier meterIdent = fNodeIdent.child(Meter.class, new MeterKey(meterId)); - final InstanceIdentifier nodeMeterStatIdent = meterIdent - .augmentation(NodeMeterStatistics.class); - final InstanceIdentifier msIdent = nodeMeterStatIdent.child(MeterStatistics.class); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, msIdent, stats); - } - } - txFacade.submitTransaction(); + return result.get(); } - private static void deleteAllKnownMeters(final DeviceMeterRegistry meterRegistry, final InstanceIdentifier fNodeIdent, final TxFacade txFacade) throws Exception { - for (final MeterId meterId : meterRegistry.getAllMeterIds()) { - final InstanceIdentifier meterIdent = fNodeIdent.child(Meter.class, new MeterKey(meterId)); - txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, meterIdent); + public static void deleteAllKnownFlows(final TxFacade txFacade, + final InstanceIdentifier instanceIdentifier, + final DeviceFlowRegistry deviceFlowRegistry) { + if (!txFacade.isTransactionsEnabled()) { + return; } - meterRegistry.removeMarked(); - } - - private static void processGroupDescStats(final Iterable data, final DeviceInfo deviceInfo, final TxFacade txFacade, final DeviceGroupRegistry groupRegistry) throws Exception { - final InstanceIdentifier fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo); - deleteAllKnownGroups(txFacade, fNodeIdent, groupRegistry); - - for (final GroupDescStatsUpdated groupDescStatsUpdated : data) { - for (final GroupDescStats groupDescStats : groupDescStatsUpdated.getGroupDescStats()) { - final GroupId groupId = groupDescStats.getGroupId(); - - final GroupBuilder groupBuilder = new GroupBuilder(groupDescStats); - groupBuilder.setKey(new GroupKey(groupId)); - groupBuilder.addAugmentation(NodeGroupStatistics.class, new NodeGroupStatisticsBuilder().build()); - - final InstanceIdentifier groupIdent = fNodeIdent.child(Group.class, new GroupKey(groupId)); - groupRegistry.store(groupId); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, groupIdent, groupBuilder.build()); - } + final ListenableFuture> future; + try (ReadOnlyTransaction readTx = txFacade.getReadTransaction()) { + future = readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier); } - txFacade.submitTransaction(); - } - private static void deleteAllKnownGroups(final TxFacade txFacade, final InstanceIdentifier fNodeIdent, final DeviceGroupRegistry groupRegistry) throws Exception { - for (final GroupId groupId : groupRegistry.getAllGroupIds()) { - final InstanceIdentifier groupIdent = fNodeIdent.child(Group.class, new GroupKey(groupId)); - txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, groupIdent); + try { + Futures.transform(Futures.catchingAsync(future, Throwable.class, throwable -> { + return Futures.immediateFailedFuture(throwable); + }, MoreExecutors.directExecutor()), (Function, Void>) flowCapNodeOpt -> { + // we have to read actual tables with all information before we set empty Flow list, + // merge is expensive and not applicable for lists + if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) { + for (final Table tableData : flowCapNodeOpt.get().getTable()) { + final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build(); + final InstanceIdentifier
iiToTable = instanceIdentifier + .child(Table.class, tableData.key()); + txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table); + } + } + return null; + }, MoreExecutors.directExecutor()).get(); + } catch (InterruptedException | ExecutionException ex) { + LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex); } - groupRegistry.removeMarked(); } - private static void processGroupStatistics(final Iterable data, final DeviceInfo deviceInfo, final TxFacade txFacade) throws Exception { - final InstanceIdentifier fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo); - for (final GroupStatisticsUpdated groupStatistics : data) { - for (final GroupStats groupStats : groupStatistics.getGroupStats()) { - - final InstanceIdentifier groupIdent = fNodeIdent.child(Group.class, new GroupKey(groupStats.getGroupId())); - final InstanceIdentifier nGroupStatIdent = groupIdent - .augmentation(NodeGroupStatistics.class); - - final InstanceIdentifier gsIdent = nGroupStatIdent.child(GroupStatistics.class); - final GroupStatistics stats = new GroupStatisticsBuilder(groupStats).build(); - txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, gsIdent, stats); - } - } - txFacade.submitTransaction(); + public static void deleteAllKnownMeters(final TxFacade txFacade, + final InstanceIdentifier instanceIdentifier, + final DeviceMeterRegistry meterRegistry) { + meterRegistry.forEach(meterId -> { + txFacade + .addDeleteToTxChain( + LogicalDatastoreType.OPERATIONAL, + instanceIdentifier.child(Meter.class, new MeterKey(meterId))); + meterRegistry.addMark(meterId); + }); } - private static InstanceIdentifier assembleFlowCapableNodeInstanceIdentifier(final DeviceInfo deviceInfo) { - return deviceInfo.getNodeInstanceIdentifier().augmentation(FlowCapableNode.class); + public static void deleteAllKnownGroups(final TxFacade txFacade, + final InstanceIdentifier instanceIdentifier, + final DeviceGroupRegistry groupRegistry) { + LOG.debug("deleteAllKnownGroups on device targetType {}", instanceIdentifier.getTargetType()); + groupRegistry.forEach(groupId -> { + txFacade + .addDeleteToTxChain( + LogicalDatastoreType.OPERATIONAL, + instanceIdentifier.child(Group.class, new GroupKey(groupId))); + groupRegistry.addMark(groupId); + }); } /** - * Writes snapshot gathering start timestamp + cleans end mark + * Writes snapshot gathering start timestamp + cleans end mark. * - * @param deviceContext txManager + node path keeper + * @param deviceInfo device info + * @param txFacade tx manager */ - static void markDeviceStateSnapshotStart(final DeviceContext deviceContext) { - final InstanceIdentifier statusPath = deviceContext.getDeviceInfo() + static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) { + final InstanceIdentifier statusPath = deviceInfo .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class); final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT); @@ -501,23 +237,25 @@ public final class StatisticsGatheringUtils { .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here .build(); try { - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus); + txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus); } catch (final TransactionChainClosedException e) { LOG.warn("Can't write to transaction, transaction chain probably closed."); LOG.trace("Write to transaction exception: ", e); } - deviceContext.submitTransaction(); + txFacade.submitTransaction(); } /** - * Writes snapshot gathering end timestamp + outcome + * Writes snapshot gathering end timestamp + outcome. * - * @param deviceContext txManager + node path keeper + * @param deviceInfo device info + * @param txFacade tx manager * @param succeeded outcome of currently finished gathering */ - static void markDeviceStateSnapshotEnd(final DeviceContext deviceContext, final boolean succeeded) { - final InstanceIdentifier statusEndPath = deviceContext.getDeviceInfo() + static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo, + final TxFacade txFacade, final boolean succeeded) { + final InstanceIdentifier statusEndPath = deviceInfo .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class) .child(SnapshotGatheringStatusEnd.class); @@ -527,12 +265,12 @@ public final class StatisticsGatheringUtils { .setSucceeded(succeeded) .build(); try { - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus); + txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus); } catch (TransactionChainClosedException e) { LOG.warn("Can't write to transaction, transaction chain probably closed."); LOG.trace("Write to transaction exception: ", e); } - deviceContext.submitTransaction(); + txFacade.submitTransaction(); } }