2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.text.SimpleDateFormat;
17 import java.util.Collections;
18 import java.util.Date;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.stream.Collectors;
24 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
29 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
30 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
32 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
33 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
34 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
35 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
36 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
37 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
53 import org.opendaylight.yangtools.yang.binding.DataContainer;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * Utils for gathering statistics.
61 public final class StatisticsGatheringUtils {
63 private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
64 private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
65 private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
67 private StatisticsGatheringUtils() {
68 throw new IllegalStateException("This class should not be instantiated.");
71 static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
72 final StatisticsGatherer<T> statisticsGatheringService,
73 final DeviceInfo deviceInfo,
74 final MultipartType type,
75 final TxFacade txFacade,
76 final DeviceRegistry registry,
77 final ConvertorExecutor convertorExecutor,
78 final MultipartWriterProvider statisticsWriterProvider,
79 final ListeningExecutorService executorService) {
80 return Futures.transformAsync(
81 statisticsGatheringService.getStatisticsOfType(
82 new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
84 rpcResult -> executorService.submit(() -> {
85 final boolean rpcResultIsNull = rpcResult == null;
87 if (!rpcResultIsNull && rpcResult.isSuccessful()) {
88 LOG.debug("Stats reply successfully received for node {} of type {}",
89 deviceInfo.getNodeId(), type);
91 // TODO: in case the result value is null then multipart data probably got processed
92 // TODO: on the fly. This contract should by clearly stated and enforced.
93 // TODO: Now simple true value is returned
94 if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
96 final List<DataContainer> allMultipartData = rpcResult
99 .map(reply -> MultipartReplyTranslatorUtil
100 .translate(reply, deviceInfo, convertorExecutor, null))
101 .filter(java.util.Optional::isPresent)
102 .map(java.util.Optional::get)
103 .collect(Collectors.toList());
105 return processStatistics(
111 statisticsWriterProvider);
112 } catch (final Exception e) {
113 LOG.warn("Stats processing of type {} for node {} failed with error: {}",
114 type, deviceInfo, e);
117 LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
120 LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
121 rpcResultIsNull ? "" : rpcResult.getErrors());
128 private static boolean processStatistics(final MultipartType type,
129 final List<? extends DataContainer> statistics,
130 final TxFacade txFacade,
131 final DeviceRegistry deviceRegistry,
132 final DeviceInfo deviceInfo,
133 final MultipartWriterProvider statisticsWriterProvider) {
134 final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
135 .getNodeInstanceIdentifier()
136 .augmentation(FlowCapableNode.class);
140 deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
142 case OFPMPMETERCONFIG:
143 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
146 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
152 if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
153 txFacade.submitTransaction();
157 deviceRegistry.getDeviceFlowRegistry().processMarks();
159 case OFPMPMETERCONFIG:
160 deviceRegistry.getDeviceMeterRegistry().processMarks();
163 deviceRegistry.getDeviceGroupRegistry().processMarks();
169 LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
173 LOG.warn("Stats processing of type {} for node {} "
174 + "failed during write-to-tx step", type, deviceInfo);
178 @SuppressWarnings("checkstyle:IllegalCatch")
179 private static boolean writeStatistics(final MultipartType type,
180 final List<? extends DataContainer> statistics,
181 final DeviceInfo deviceInfo,
182 final MultipartWriterProvider statisticsWriterProvider) {
183 final AtomicBoolean result = new AtomicBoolean(false);
186 statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
187 final boolean write = p.write(stat, false);
193 } catch (final Exception ex) {
194 LOG.warn("Stats processing of type {} for node {} "
195 + "failed during write-to-tx step", type, deviceInfo, ex);
201 public static void deleteAllKnownFlows(final TxFacade txFacade,
202 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
203 final DeviceFlowRegistry deviceFlowRegistry) {
204 if (!txFacade.isTransactionsEnabled()) {
208 final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
211 Futures.transform(Futures
212 .catchingAsync(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), Throwable.class,
214 // we wish to close readTx for fallBack
216 return Futures.immediateFailedFuture(t);
217 }), (Function<Optional<FlowCapableNode>, Void>)
219 // we have to read actual tables with all information before we set empty Flow list,
220 // merge is expensive and not applicable for lists
221 if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
222 for (final Table tableData : flowCapNodeOpt.get().getTable()) {
224 new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
225 final InstanceIdentifier<Table> iiToTable =
226 instanceIdentifier.child(Table.class, tableData.getKey());
227 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
234 } catch (InterruptedException | ExecutionException ex) {
235 LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex);
239 public static void deleteAllKnownMeters(final TxFacade txFacade,
240 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
241 final DeviceMeterRegistry meterRegistry) {
242 meterRegistry.forEach(meterId -> txFacade
244 LogicalDatastoreType.OPERATIONAL,
245 instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
248 public static void deleteAllKnownGroups(final TxFacade txFacade,
249 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
250 final DeviceGroupRegistry groupRegistry) {
251 groupRegistry.forEach(groupId -> txFacade
253 LogicalDatastoreType.OPERATIONAL,
254 instanceIdentifier.child(Group.class, new GroupKey(groupId))));
258 * Writes snapshot gathering start timestamp + cleans end mark.
260 * @param deviceInfo device info
261 * @param txFacade tx manager
263 static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
264 final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
265 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
267 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
268 final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
269 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
270 .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
272 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
275 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
276 } catch (final TransactionChainClosedException e) {
277 LOG.warn("Can't write to transaction, transaction chain probably closed.");
278 LOG.trace("Write to transaction exception: ", e);
281 txFacade.submitTransaction();
285 * Writes snapshot gathering end timestamp + outcome.
287 * @param deviceInfo device info
288 * @param txFacade tx manager
289 * @param succeeded outcome of currently finished gathering
291 static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo, final TxFacade txFacade, final boolean succeeded) {
292 final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
293 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
294 .child(SnapshotGatheringStatusEnd.class);
296 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
297 final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
298 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
299 .setSucceeded(succeeded)
302 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
303 } catch (TransactionChainClosedException e) {
304 LOG.warn("Can't write to transaction, transaction chain probably closed.");
305 LOG.trace("Write to transaction exception: ", e);
308 txFacade.submitTransaction();