68c6651e8bf9bb51c7722a1ef46c4401414e60f2
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.statistics;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.text.SimpleDateFormat;
16 import java.util.Collections;
17 import java.util.Date;
18 import java.util.List;
19 import java.util.Optional;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.stream.Collectors;
23 import org.opendaylight.mdsal.binding.api.ReadTransaction;
24 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
28 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
29 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
30 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
32 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
33 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
34 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
35 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
36 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
52 import org.opendaylight.yangtools.yang.binding.DataContainer;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * Utils for gathering statistics.
59  */
60 public final class StatisticsGatheringUtils {
61
62     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
63     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
64     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
65
66     private StatisticsGatheringUtils() {
67         throw new IllegalStateException("This class should not be instantiated.");
68     }
69
70     static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
71             final StatisticsGatherer<T> statisticsGatheringService, final DeviceInfo deviceInfo,
72             final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry,
73             final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider,
74             final ListeningExecutorService executorService) {
75         return Futures.transformAsync(statisticsGatheringService.getStatisticsOfType(
76            new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
77             rpcResult -> executorService.submit(() -> {
78                 final boolean rpcResultIsNull = rpcResult == null;
79
80                 if (!rpcResultIsNull && rpcResult.isSuccessful()) {
81                     LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
82                         // TODO: in case the result value is null then multipart data probably got processed
83                         // TODO: on the fly. This contract should by clearly stated and enforced.
84                         // TODO: Now simple true value is returned
85                     if (rpcResult.getResult() != null && !rpcResult.getResult().isEmpty()) {
86                         final List<DataContainer> allMultipartData = rpcResult.getResult().stream()
87                                 .map(reply -> MultipartReplyTranslatorUtil
88                                                     .translate(reply, deviceInfo, convertorExecutor, null))
89                                 .filter(java.util.Optional::isPresent).map(java.util.Optional::get)
90                                             .collect(Collectors.toList());
91
92                         return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
93                                         statisticsWriterProvider);
94                     } else {
95                         LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
96                     }
97                 } else {
98                     LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
99                                 rpcResultIsNull ? "" : rpcResult.getErrors());
100                 }
101                 return false;
102             }), MoreExecutors.directExecutor());
103     }
104
105     private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
106                                              final TxFacade txFacade, final DeviceRegistry deviceRegistry,
107                                              final DeviceInfo deviceInfo,
108                                              final MultipartWriterProvider statisticsWriterProvider) {
109         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
110                 .augmentation(FlowCapableNode.class);
111
112         switch (type) {
113             case OFPMPFLOW:
114                 deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
115                 deviceRegistry.getDeviceFlowRegistry().processMarks();
116                 break;
117             case OFPMPMETERCONFIG:
118                 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
119                 deviceRegistry.getDeviceMeterRegistry().processMarks();
120                 break;
121             case OFPMPGROUPDESC:
122                 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
123                 deviceRegistry.getDeviceGroupRegistry().processMarks();
124                 break;
125             default:
126                 // no operation
127         }
128
129         if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
130             txFacade.submitTransaction();
131
132             LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
133             return true;
134         }
135
136         LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);
137         return false;
138     }
139
140     @SuppressWarnings("checkstyle:IllegalCatch")
141     private static boolean writeStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
142                                            final DeviceInfo deviceInfo,
143                                            final MultipartWriterProvider statisticsWriterProvider) {
144         final AtomicBoolean result = new AtomicBoolean(false);
145
146         try {
147             statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
148                 final boolean write = p.write(stat, false);
149
150                 if (!result.get()) {
151                     result.set(write);
152                 }
153             }));
154         } catch (final Exception ex) {
155             LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo,
156                      ex);
157         }
158
159         return result.get();
160     }
161
162     public static void deleteAllKnownFlows(final TxFacade txFacade,
163                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
164                                            final DeviceFlowRegistry deviceFlowRegistry) {
165         if (!txFacade.isTransactionsEnabled()) {
166             return;
167         }
168
169         final ListenableFuture<Optional<FlowCapableNode>> future;
170         try (ReadTransaction readTx = txFacade.getReadTransaction()) {
171             future = readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier);
172         }
173
174         try {
175             Futures.transform(Futures.catchingAsync(future, Throwable.class, Futures::immediateFailedFuture,
176                 MoreExecutors.directExecutor()), (Function<Optional<FlowCapableNode>, Void>) flowCapNodeOpt -> {
177                     // we have to read actual tables with all information before we set empty Flow list,
178                     // merge is expensive and not applicable for lists
179                     if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
180                         for (final Table tableData : flowCapNodeOpt.get().getTable()) {
181                             final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
182                             final InstanceIdentifier<Table> iiToTable = instanceIdentifier
183                                 .child(Table.class, tableData.key());
184                             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
185                         }
186                     }
187                     return null;
188                 }, MoreExecutors.directExecutor()).get();
189         } catch (InterruptedException | ExecutionException ex) {
190             LOG.debug("Failed to delete {} flows", deviceFlowRegistry.size(), ex);
191         }
192     }
193
194     public static void deleteAllKnownMeters(final TxFacade txFacade,
195                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
196                                             final DeviceMeterRegistry meterRegistry) {
197         meterRegistry.forEach(meterId -> {
198             txFacade
199                     .addDeleteToTxChain(
200                             LogicalDatastoreType.OPERATIONAL,
201                             instanceIdentifier.child(Meter.class, new MeterKey(meterId)));
202             meterRegistry.addMark(meterId);
203         });
204     }
205
206     public static void deleteAllKnownGroups(final TxFacade txFacade,
207                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
208                                             final DeviceGroupRegistry groupRegistry) {
209         LOG.debug("deleteAllKnownGroups on device targetType {}", instanceIdentifier.getTargetType());
210         groupRegistry.forEach(groupId -> {
211             txFacade
212                     .addDeleteToTxChain(
213                             LogicalDatastoreType.OPERATIONAL,
214                             instanceIdentifier.child(Group.class, new GroupKey(groupId)));
215             groupRegistry.addMark(groupId);
216         });
217     }
218
219     /**
220      * Writes snapshot gathering start timestamp + cleans end mark.
221      *
222      * @param deviceInfo device info
223      * @param txFacade tx manager
224      */
225     static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
226         final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
227                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
228
229         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
230         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
231                 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
232                         .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
233                         .build())
234                 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
235                 .build();
236         try {
237             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
238         } catch (final TransactionChainClosedException e) {
239             LOG.warn("Can't write to transaction, transaction chain probably closed.");
240             LOG.trace("Write to transaction exception: ", e);
241         }
242
243         txFacade.submitTransaction();
244     }
245
246     /**
247      * Writes snapshot gathering end timestamp + outcome.
248      *
249      * @param deviceInfo device info
250      * @param txFacade tx manager
251      * @param succeeded     outcome of currently finished gathering
252      */
253     static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo,
254                                            final TxFacade txFacade, final boolean succeeded) {
255         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
256                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
257                 .child(SnapshotGatheringStatusEnd.class);
258
259         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
260         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
261                 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
262                 .setSucceeded(succeeded)
263                 .build();
264         try {
265             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
266         } catch (TransactionChainClosedException e) {
267             LOG.warn("Can't write to transaction, transaction chain probably closed.");
268             LOG.trace("Write to transaction exception: ", e);
269         }
270
271         txFacade.submitTransaction();
272     }
273 }