2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.text.SimpleDateFormat;
17 import java.util.Collections;
18 import java.util.Date;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.stream.Collectors;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
25 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
32 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
33 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
34 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
35 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
36 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
37 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
38 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
39 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
55 import org.opendaylight.yangtools.yang.binding.DataContainer;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
62 * Utils for gathering statistics
64 public final class StatisticsGatheringUtils {
66 private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
67 private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
68 private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
70 private StatisticsGatheringUtils() {
71 throw new IllegalStateException("This class should not be instantiated.");
74 static <T extends OfHeader>ListenableFuture<Boolean> gatherStatistics(final StatisticsGatherer<T> statisticsGatheringService,
75 final DeviceInfo deviceInfo,
76 final MultipartType type,
77 final TxFacade txFacade,
78 final DeviceRegistry registry,
79 final Boolean initial,
80 final ConvertorExecutor convertorExecutor,
81 final MultipartWriterProvider statisticsWriterProvider) {
83 final EventIdentifier eventIdentifier;
84 if (MultipartType.OFPMPFLOW.equals(type)) {
85 eventIdentifier = new EventIdentifier(type.toString(), deviceInfo.getNodeId().getValue());
86 EventsTimeCounter.markStart(eventIdentifier);
88 eventIdentifier = null;
91 return Futures.transform(
92 statisticsGatheringService.getStatisticsOfType(
93 new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
95 new AsyncFunction<RpcResult<List<T>>, Boolean>() {
98 public ListenableFuture<Boolean> apply(@Nonnull final RpcResult<List<T>> rpcResult) {
99 boolean isMultipartProcessed = Boolean.TRUE;
101 if (rpcResult.isSuccessful()) {
102 LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
104 // TODO: in case the result value is null then multipart data probably got processed on the fly -
105 // TODO: this contract should by clearly stated and enforced - now simple true value is returned
106 if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
107 final List<DataContainer> allMultipartData;
110 allMultipartData = rpcResult
113 .map(reply -> MultipartReplyTranslatorUtil
114 .translate(reply, deviceInfo, convertorExecutor, null))
115 .filter(java.util.Optional::isPresent)
116 .map(java.util.Optional::get)
117 .collect(Collectors.toList());
118 } catch (final Exception e) {
119 LOG.warn("Stats processing of type {} for node {} failed during transformation step",
120 type, deviceInfo.getLOGValue(), e);
121 return Futures.immediateFailedFuture(e);
125 return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
126 statisticsWriterProvider,
127 eventIdentifier, initial);
128 } catch (final Exception e) {
129 LOG.warn("Stats processing of type {} for node {} failed during processing step",
130 type, deviceInfo.getNodeId(), e);
131 return Futures.immediateFailedFuture(e);
134 LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
137 LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
138 rpcResult.getErrors());
139 isMultipartProcessed = Boolean.FALSE;
142 return Futures.immediateFuture(isMultipartProcessed);
147 private static ListenableFuture<Boolean> processStatistics(final MultipartType type,
148 final List<? extends DataContainer> statistics,
149 final TxFacade txFacade,
150 final DeviceRegistry deviceRegistry,
151 final DeviceInfo deviceInfo,
152 final MultipartWriterProvider statisticsWriterProvider,
153 final EventIdentifier eventIdentifier,
154 final boolean initial) {
156 ListenableFuture<Void> future = Futures.immediateFuture(null);
158 final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
159 .getNodeInstanceIdentifier()
160 .augmentation(FlowCapableNode.class);
164 future = deleteAllKnownFlows(txFacade, instanceIdentifier, initial);
166 case OFPMPMETERCONFIG:
167 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
170 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
174 return Futures.transform(future, (Function<Void, Boolean>) input -> {
175 if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
176 txFacade.submitTransaction();
178 if (MultipartType.OFPMPFLOW.equals(type)) {
179 EventsTimeCounter.markEnd(eventIdentifier);
180 deviceRegistry.getDeviceFlowRegistry().processMarks();
183 LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
187 LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue());
188 return Boolean.FALSE;
192 private static boolean writeStatistics(final MultipartType type,
193 final List<? extends DataContainer> statistics,
194 final DeviceInfo deviceInfo,
195 final MultipartWriterProvider statisticsWriterProvider) {
196 final AtomicBoolean result = new AtomicBoolean(false);
199 statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
200 final boolean write = p.write(stat, false);
206 } catch (final Exception ex) {
207 LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue(), ex);
213 public static ListenableFuture<Void> deleteAllKnownFlows(final TxFacade txFacade,
214 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
215 final boolean initial) {
217 return Futures.immediateFuture(null);
220 final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
221 return Futures.transform(Futures
222 .withFallback(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), t -> {
223 // we wish to close readTx for fallBack
225 return Futures.immediateFailedFuture(t);
226 }), (Function<Optional<FlowCapableNode>, Void>)
228 // we have to read actual tables with all information before we set empty Flow list, merge is expensive and
229 // not applicable for lists
230 if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
231 for (final Table tableData : flowCapNodeOpt.get().getTable()) {
232 final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
233 final InstanceIdentifier<Table> iiToTable = instanceIdentifier.child(Table.class, tableData.getKey());
234 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
243 private static void deleteAllKnownMeters(final TxFacade txFacade,
244 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
245 final DeviceMeterRegistry meterRegistry) {
246 meterRegistry.forEach(meterId -> txFacade
248 LogicalDatastoreType.OPERATIONAL,
249 instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
251 meterRegistry.processMarks();
254 private static void deleteAllKnownGroups(final TxFacade txFacade,
255 final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
256 final DeviceGroupRegistry groupRegistry) {
257 groupRegistry.forEach(groupId -> txFacade
259 LogicalDatastoreType.OPERATIONAL,
260 instanceIdentifier.child(Group.class, new GroupKey(groupId))));
262 groupRegistry.processMarks();
266 * Writes snapshot gathering start timestamp + cleans end mark
268 * @param deviceContext txManager + node path keeper
270 static void markDeviceStateSnapshotStart(final DeviceContext deviceContext) {
271 final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceContext.getDeviceInfo()
272 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
274 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
275 final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
276 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
277 .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
279 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
282 deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
283 } catch (final TransactionChainClosedException e) {
284 LOG.warn("Can't write to transaction, transaction chain probably closed.");
285 LOG.trace("Write to transaction exception: ", e);
288 deviceContext.submitTransaction();
292 * Writes snapshot gathering end timestamp + outcome
294 * @param deviceContext txManager + node path keeper
295 * @param succeeded outcome of currently finished gathering
297 static void markDeviceStateSnapshotEnd(final DeviceContext deviceContext, final boolean succeeded) {
298 final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceContext.getDeviceInfo()
299 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
300 .child(SnapshotGatheringStatusEnd.class);
302 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
303 final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
304 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
305 .setSucceeded(succeeded)
308 deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
309 } catch (TransactionChainClosedException e) {
310 LOG.warn("Can't write to transaction, transaction chain probably closed.");
311 LOG.trace("Write to transaction exception: ", e);
314 deviceContext.submitTransaction();