Gather statistics in separate thread
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.text.SimpleDateFormat;
17 import java.util.Collections;
18 import java.util.Date;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.stream.Collectors;
24 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
29 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
30 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
32 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
33 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
34 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
35 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
36 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
37 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
53 import org.opendaylight.yangtools.yang.binding.DataContainer;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Utils for gathering statistics.
60  */
61 public final class StatisticsGatheringUtils {
62
63     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
64     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
65     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
66
67     private StatisticsGatheringUtils() {
68         throw new IllegalStateException("This class should not be instantiated.");
69     }
70
71     static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
72             final StatisticsGatherer<T> statisticsGatheringService,
73             final DeviceInfo deviceInfo,
74             final MultipartType type,
75             final TxFacade txFacade,
76             final DeviceRegistry registry,
77             final ConvertorExecutor convertorExecutor,
78             final MultipartWriterProvider statisticsWriterProvider,
79             final ListeningExecutorService executorService) {
80         return Futures.transformAsync(
81                 statisticsGatheringService.getStatisticsOfType(
82                         new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
83                         type),
84                 rpcResult -> executorService.submit(() -> {
85                     final boolean rpcResultIsNull = rpcResult == null;
86
87                     if (!rpcResultIsNull && rpcResult.isSuccessful()) {
88                         LOG.debug("Stats reply successfully received for node {} of type {}",
89                                 deviceInfo.getNodeId(), type);
90
91                         // TODO: in case the result value is null then multipart data probably got processed
92                         // TODO: on the fly. This contract should by clearly stated and enforced.
93                         // TODO: Now simple true value is returned
94                         if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
95                             try {
96                                 final List<DataContainer> allMultipartData = rpcResult
97                                         .getResult()
98                                         .stream()
99                                         .map(reply ->  MultipartReplyTranslatorUtil
100                                                 .translate(reply, deviceInfo, convertorExecutor, null))
101                                         .filter(java.util.Optional::isPresent)
102                                         .map(java.util.Optional::get)
103                                         .collect(Collectors.toList());
104
105                                 return processStatistics(
106                                         type,
107                                         allMultipartData,
108                                         txFacade,
109                                         registry,
110                                         deviceInfo,
111                                         statisticsWriterProvider);
112                             } catch (final Exception e) {
113                                 LOG.warn("Stats processing of type {} for node {} failed with error: {}",
114                                         type, deviceInfo, e);
115                             }
116                         } else {
117                             LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
118                         }
119                     } else {
120                         LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
121                                 rpcResultIsNull ? "" : rpcResult.getErrors());
122                     }
123
124                     return false;
125                 }));
126     }
127
128     private static boolean processStatistics(final MultipartType type,
129                                              final List<? extends DataContainer> statistics,
130                                              final TxFacade txFacade,
131                                              final DeviceRegistry deviceRegistry,
132                                              final DeviceInfo deviceInfo,
133                                              final MultipartWriterProvider statisticsWriterProvider) {
134         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
135                 .getNodeInstanceIdentifier()
136                 .augmentation(FlowCapableNode.class);
137
138         switch (type) {
139             case OFPMPFLOW:
140                 deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
141                 break;
142             case OFPMPMETERCONFIG:
143                 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
144                 break;
145             case OFPMPGROUPDESC:
146                 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
147                 break;
148             default:
149                 // no operation
150         }
151
152         if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
153             txFacade.submitTransaction();
154
155             switch (type) {
156                 case OFPMPFLOW:
157                     deviceRegistry.getDeviceFlowRegistry().processMarks();
158                     break;
159                 case OFPMPMETERCONFIG:
160                     deviceRegistry.getDeviceMeterRegistry().processMarks();
161                     break;
162                 case OFPMPGROUPDESC:
163                     deviceRegistry.getDeviceGroupRegistry().processMarks();
164                     break;
165                 default:
166                     // no operation
167             }
168
169             LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
170             return true;
171         }
172
173         LOG.warn("Stats processing of type {} for node {} "
174                 + "failed during write-to-tx step", type, deviceInfo);
175         return false;
176     }
177
178     @SuppressWarnings("checkstyle:IllegalCatch")
179     private static boolean writeStatistics(final MultipartType type,
180                                            final List<? extends DataContainer> statistics,
181                                            final DeviceInfo deviceInfo,
182                                            final MultipartWriterProvider statisticsWriterProvider) {
183         final AtomicBoolean result = new AtomicBoolean(false);
184
185         try {
186             statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
187                 final boolean write = p.write(stat, false);
188
189                 if (!result.get()) {
190                     result.set(write);
191                 }
192             }));
193         } catch (final Exception ex) {
194             LOG.warn("Stats processing of type {} for node {} "
195                     + "failed during write-to-tx step", type, deviceInfo, ex);
196         }
197
198         return result.get();
199     }
200
201     public static void deleteAllKnownFlows(final TxFacade txFacade,
202                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
203                                            final DeviceFlowRegistry deviceFlowRegistry) {
204         if (!txFacade.isTransactionsEnabled()) {
205             return;
206         }
207
208         final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
209
210         try {
211             Futures.transform(Futures
212                 .catchingAsync(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), Throwable.class,
213                     t -> {
214                         // we wish to close readTx for fallBack
215                         readTx.close();
216                         return Futures.immediateFailedFuture(t);
217                     }), (Function<Optional<FlowCapableNode>, Void>)
218                 flowCapNodeOpt -> {
219                     // we have to read actual tables with all information before we set empty Flow list,
220                     // merge is expensive and not applicable for lists
221                     if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
222                         for (final Table tableData : flowCapNodeOpt.get().getTable()) {
223                             final Table table =
224                                     new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
225                             final InstanceIdentifier<Table> iiToTable =
226                                     instanceIdentifier.child(Table.class, tableData.getKey());
227                             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
228                         }
229                     }
230
231                     readTx.close();
232                     return null;
233                 }).get();
234         } catch (InterruptedException | ExecutionException ex) {
235             LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex);
236         }
237     }
238
239     public static void deleteAllKnownMeters(final TxFacade txFacade,
240                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
241                                             final DeviceMeterRegistry meterRegistry) {
242         meterRegistry.forEach(meterId -> txFacade
243                 .addDeleteToTxChain(
244                         LogicalDatastoreType.OPERATIONAL,
245                         instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
246     }
247
248     public static void deleteAllKnownGroups(final TxFacade txFacade,
249                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
250                                             final DeviceGroupRegistry groupRegistry) {
251         groupRegistry.forEach(groupId -> txFacade
252                 .addDeleteToTxChain(
253                         LogicalDatastoreType.OPERATIONAL,
254                         instanceIdentifier.child(Group.class, new GroupKey(groupId))));
255     }
256
257     /**
258      * Writes snapshot gathering start timestamp + cleans end mark.
259      *
260      * @param deviceInfo device info
261      * @param txFacade tx manager
262      */
263     static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
264         final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
265                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
266
267         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
268         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
269                 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
270                         .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
271                         .build())
272                 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
273                 .build();
274         try {
275             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
276         } catch (final TransactionChainClosedException e) {
277             LOG.warn("Can't write to transaction, transaction chain probably closed.");
278             LOG.trace("Write to transaction exception: ", e);
279         }
280
281         txFacade.submitTransaction();
282     }
283
284     /**
285      * Writes snapshot gathering end timestamp + outcome.
286      *
287      * @param deviceInfo device info
288      * @param txFacade tx manager
289      * @param succeeded     outcome of currently finished gathering
290      */
291     static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo, final TxFacade txFacade, final boolean succeeded) {
292         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
293                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
294                 .child(SnapshotGatheringStatusEnd.class);
295
296         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
297         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
298                 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
299                 .setSucceeded(succeeded)
300                 .build();
301         try {
302             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
303         } catch (TransactionChainClosedException e) {
304             LOG.warn("Can't write to transaction, transaction chain probably closed.");
305             LOG.trace("Write to transaction exception: ", e);
306         }
307
308         txFacade.submitTransaction();
309     }
310 }