2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.statistics;
10 import com.google.common.util.concurrent.Futures;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.ListeningExecutorService;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.text.SimpleDateFormat;
15 import java.util.Collections;
16 import java.util.Date;
17 import java.util.List;
18 import java.util.Optional;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.stream.Collectors;
22 import org.opendaylight.mdsal.binding.api.ReadTransaction;
23 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
26 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
27 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
28 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
29 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
30 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
32 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
33 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
34 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
35 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
51 import org.opendaylight.yangtools.yang.binding.DataContainer;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Utils for gathering statistics.
59 public final class StatisticsGatheringUtils {
61 private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
62 private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
63 private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
65 private StatisticsGatheringUtils() {
66 throw new IllegalStateException("This class should not be instantiated.");
69 static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
70 final StatisticsGatherer<T> statisticsGatheringService, final DeviceInfo deviceInfo,
71 final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry,
72 final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider,
73 final ListeningExecutorService executorService) {
74 return Futures.transformAsync(statisticsGatheringService.getStatisticsOfType(
75 new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
76 rpcResult -> executorService.submit(() -> {
77 final boolean rpcResultIsNull = rpcResult == null;
79 if (!rpcResultIsNull && rpcResult.isSuccessful()) {
80 LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
81 // TODO: in case the result value is null then multipart data probably got processed
82 // TODO: on the fly. This contract should by clearly stated and enforced.
83 // TODO: Now simple true value is returned
84 if (rpcResult.getResult() != null && !rpcResult.getResult().isEmpty()) {
85 final List<DataContainer> allMultipartData = rpcResult.getResult().stream()
86 .map(reply -> MultipartReplyTranslatorUtil
87 .translate(reply, deviceInfo, convertorExecutor, null))
88 .filter(java.util.Optional::isPresent).map(java.util.Optional::get)
89 .collect(Collectors.toList());
91 return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
92 statisticsWriterProvider);
94 LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
97 LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
98 rpcResultIsNull ? "" : rpcResult.getErrors());
101 }), MoreExecutors.directExecutor());
104 @SuppressWarnings("checkstyle:IllegalCatch")
105 private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
106 final TxFacade txFacade, final DeviceRegistry deviceRegistry,
107 final DeviceInfo deviceInfo,
108 final MultipartWriterProvider statisticsWriterProvider) {
109 final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
110 .augmentation(FlowCapableNode.class);
112 txFacade.acquireWriteTransactionLock();
115 deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
116 deviceRegistry.getDeviceFlowRegistry().processMarks();
118 case OFPMPMETERCONFIG:
119 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
120 deviceRegistry.getDeviceMeterRegistry().processMarks();
123 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
124 deviceRegistry.getDeviceGroupRegistry().processMarks();
130 if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
131 txFacade.submitTransaction();
133 LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
136 } catch (Exception e) {
137 LOG.error("Exception while writing statistics to operational inventory for the device {}",
138 deviceInfo.getLOGValue(), e);
140 txFacade.releaseWriteTransactionLock();
143 LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);
147 @SuppressWarnings("checkstyle:IllegalCatch")
148 private static boolean writeStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
149 final DeviceInfo deviceInfo,
150 final MultipartWriterProvider statisticsWriterProvider) {
151 final AtomicBoolean result = new AtomicBoolean(false);
154 statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
155 final boolean write = p.write(stat, false);
161 } catch (final Exception ex) {
162 LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo,
169 public static void deleteAllKnownFlows(final TxFacade txFacade,
170 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
171 final DeviceFlowRegistry deviceFlowRegistry) {
172 if (!txFacade.isTransactionsEnabled()) {
176 final ListenableFuture<Optional<FlowCapableNode>> future;
177 try (ReadTransaction readTx = txFacade.getReadTransaction()) {
178 future = readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier);
182 Futures.transform(Futures.catchingAsync(future, Throwable.class, Futures::immediateFailedFuture,
183 MoreExecutors.directExecutor()), flowCapNodeOpt -> {
184 // we have to read actual tables with all information before we set empty Flow list,
185 // merge is expensive and not applicable for lists
186 if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
187 for (final Table tableData : flowCapNodeOpt.get().getTable()) {
188 final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
189 final InstanceIdentifier<Table> iiToTable = instanceIdentifier
190 .child(Table.class, tableData.key());
191 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
195 }, MoreExecutors.directExecutor()).get();
196 } catch (InterruptedException | ExecutionException ex) {
197 LOG.debug("Failed to delete {} flows", deviceFlowRegistry.size(), ex);
201 public static void deleteAllKnownMeters(final TxFacade txFacade,
202 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
203 final DeviceMeterRegistry meterRegistry) {
204 meterRegistry.forEach(meterId -> {
207 LogicalDatastoreType.OPERATIONAL,
208 instanceIdentifier.child(Meter.class, new MeterKey(meterId)));
209 meterRegistry.addMark(meterId);
213 public static void deleteAllKnownGroups(final TxFacade txFacade,
214 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
215 final DeviceGroupRegistry groupRegistry) {
216 LOG.debug("deleteAllKnownGroups on device targetType {}", instanceIdentifier.getTargetType());
217 groupRegistry.forEach(groupId -> {
220 LogicalDatastoreType.OPERATIONAL,
221 instanceIdentifier.child(Group.class, new GroupKey(groupId)));
222 groupRegistry.addMark(groupId);
227 * Writes snapshot gathering start timestamp + cleans end mark.
229 * @param deviceInfo device info
230 * @param txFacade tx manager
232 static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
233 final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
234 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
236 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
237 final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
238 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
239 .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
241 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
244 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
245 } catch (final TransactionChainClosedException e) {
246 LOG.warn("Can't write to transaction, transaction chain probably closed.");
247 LOG.trace("Write to transaction exception: ", e);
250 txFacade.submitTransaction();
254 * Writes snapshot gathering end timestamp + outcome.
256 * @param deviceInfo device info
257 * @param txFacade tx manager
258 * @param succeeded outcome of currently finished gathering
260 static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo,
261 final TxFacade txFacade, final boolean succeeded) {
262 final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
263 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
264 .child(SnapshotGatheringStatusEnd.class);
266 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
267 final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
268 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
269 .setSucceeded(succeeded)
272 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
273 } catch (TransactionChainClosedException e) {
274 LOG.warn("Can't write to transaction, transaction chain probably closed.");
275 LOG.trace("Write to transaction exception: ", e);
278 txFacade.submitTransaction();