*/
package org.opendaylight.openflowplugin.impl.statistics;
-import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.text.SimpleDateFormat;
-import java.util.Collections;
import java.util.Date;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.opendaylight.mdsal.binding.api.ReadTransaction;
* Utils for gathering statistics.
*/
public final class StatisticsGatheringUtils {
-
private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
private StatisticsGatheringUtils() {
- throw new IllegalStateException("This class should not be instantiated.");
+ // Hidden on purpose
}
static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
final StatisticsGatherer<T> statisticsGatheringService, final DeviceInfo deviceInfo,
final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry,
final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider,
- final ListeningExecutorService executorService) {
- return Futures.transformAsync(statisticsGatheringService.getStatisticsOfType(
- new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
- rpcResult -> executorService.submit(() -> {
+ final Executor executor) {
+ return Futures.transform(statisticsGatheringService.getStatisticsOfType(
+ new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
+ rpcResult -> {
final boolean rpcResultIsNull = rpcResult == null;
if (!rpcResultIsNull && rpcResult.isSuccessful()) {
// TODO: in case the result value is null then multipart data probably got processed
// TODO: on the fly. This contract should by clearly stated and enforced.
// TODO: Now simple true value is returned
- if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
+ if (rpcResult.getResult() != null && !rpcResult.getResult().isEmpty()) {
final List<DataContainer> allMultipartData = rpcResult.getResult().stream()
.map(reply -> MultipartReplyTranslatorUtil
.translate(reply, deviceInfo, convertorExecutor, null))
- .filter(java.util.Optional::isPresent).map(java.util.Optional::get)
- .collect(Collectors.toList());
+ .filter(Optional::isPresent).map(Optional::orElseThrow)
+ .collect(Collectors.toList());
return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
statisticsWriterProvider);
rpcResultIsNull ? "" : rpcResult.getErrors());
}
return false;
- }), MoreExecutors.directExecutor());
+ }, executor);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
final TxFacade txFacade, final DeviceRegistry deviceRegistry,
final DeviceInfo deviceInfo,
final MultipartWriterProvider statisticsWriterProvider) {
final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
.augmentation(FlowCapableNode.class);
+ try {
+ txFacade.acquireWriteTransactionLock();
+ switch (type) {
+ case OFPMPFLOW:
+ deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
+ deviceRegistry.getDeviceFlowRegistry().processMarks();
+ break;
+ case OFPMPMETERCONFIG:
+ deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
+ deviceRegistry.getDeviceMeterRegistry().processMarks();
+ break;
+ case OFPMPGROUPDESC:
+ deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
+ deviceRegistry.getDeviceGroupRegistry().processMarks();
+ break;
+ default:
+ // no operation
+ }
- switch (type) {
- case OFPMPFLOW:
- deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
- deviceRegistry.getDeviceFlowRegistry().processMarks();
- break;
- case OFPMPMETERCONFIG:
- deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
- deviceRegistry.getDeviceMeterRegistry().processMarks();
- break;
- case OFPMPGROUPDESC:
- deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
- deviceRegistry.getDeviceGroupRegistry().processMarks();
- break;
- default:
- // no operation
- }
-
- if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
- txFacade.submitTransaction();
+ if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
+ txFacade.submitTransaction();
- LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
- return true;
+ LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
+ return true;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while writing statistics to operational inventory for the device {}",
+ deviceInfo.getLOGValue(), e);
+ } finally {
+ txFacade.releaseWriteTransactionLock();
}
LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);
try {
Futures.transform(Futures.catchingAsync(future, Throwable.class, Futures::immediateFailedFuture,
- MoreExecutors.directExecutor()), (Function<Optional<FlowCapableNode>, Void>) flowCapNodeOpt -> {
+ MoreExecutors.directExecutor()), flowCapNodeOpt -> {
// we have to read actual tables with all information before we set empty Flow list,
// merge is expensive and not applicable for lists
if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
- for (final Table tableData : flowCapNodeOpt.get().getTable()) {
- final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
+ for (final Table tableData : flowCapNodeOpt.orElseThrow().nonnullTable().values()) {
+ final Table table = new TableBuilder(tableData).setFlow(Map.of()).build();
final InstanceIdentifier<Table> iiToTable = instanceIdentifier
.child(Table.class, tableData.key());
txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);