Fix codestyle
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.text.SimpleDateFormat;
17 import java.util.Collections;
18 import java.util.Date;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.stream.Collectors;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
32 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
33 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
34 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
35 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
36 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
37 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
38 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
39 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
55 import org.opendaylight.yangtools.yang.binding.DataContainer;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * Utils for gathering statistics.
63  */
64 public final class StatisticsGatheringUtils {
65
66     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
67     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
68     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
69
70     private StatisticsGatheringUtils() {
71         throw new IllegalStateException("This class should not be instantiated.");
72     }
73
74     static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
75                                                             final StatisticsGatherer<T> statisticsGatheringService,
76                                                             final DeviceInfo deviceInfo,
77                                                             final MultipartType type,
78                                                             final TxFacade txFacade,
79                                                             final DeviceRegistry registry,
80                                                             final ConvertorExecutor convertorExecutor,
81                                                             final MultipartWriterProvider statisticsWriterProvider) {
82         return Futures.transformAsync(
83                 statisticsGatheringService.getStatisticsOfType(
84                         new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
85                         type),
86                 new AsyncFunction<RpcResult<List<T>>, Boolean>() {
87                     @Nullable
88                     @Override
89                     @SuppressWarnings("checkstyle:IllegalCatch")
90                     public ListenableFuture<Boolean> apply(@Nonnull final RpcResult<List<T>> rpcResult) {
91                         boolean isMultipartProcessed = Boolean.TRUE;
92
93                         if (rpcResult.isSuccessful()) {
94                             LOG.debug("Stats reply successfully received for node {} of type {}",
95                                     deviceInfo.getNodeId(), type);
96
97                             // TODO: in case the result value is null then multipart data probably got processed
98                             // TODO: on the fly. This contract should by clearly stated and enforced.
99                             // TODO: Now simple true value is returned
100                             if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
101                                 final List<DataContainer> allMultipartData;
102
103                                 try {
104                                     allMultipartData = rpcResult
105                                             .getResult()
106                                             .stream()
107                                             .map(reply ->  MultipartReplyTranslatorUtil
108                                                     .translate(reply, deviceInfo, convertorExecutor, null))
109                                             .filter(java.util.Optional::isPresent)
110                                             .map(java.util.Optional::get)
111                                             .collect(Collectors.toList());
112                                 } catch (final Exception e) {
113                                     LOG.warn("Stats processing of type {} for node {} "
114                                                     + "failed during transformation step",
115                                             type, deviceInfo, e);
116                                     return Futures.immediateFailedFuture(e);
117                                 }
118
119                                 try {
120                                     return Futures.immediateFuture(processStatistics(
121                                             type,
122                                             allMultipartData,
123                                             txFacade,
124                                             registry,
125                                             deviceInfo,
126                                             statisticsWriterProvider));
127                                 } catch (final Exception e) {
128                                     LOG.warn("Stats processing of type {} for node {} failed during processing step",
129                                             type, deviceInfo.getNodeId(), e);
130                                     return Futures.immediateFailedFuture(e);
131                                 }
132                             } else {
133                                 LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
134                             }
135                         } else {
136                             LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
137                                     rpcResult.getErrors());
138                             isMultipartProcessed = Boolean.FALSE;
139                         }
140
141                         return Futures.immediateFuture(isMultipartProcessed);
142                     }
143                 });
144     }
145
146     private static boolean processStatistics(final MultipartType type,
147                                              final List<? extends DataContainer> statistics,
148                                              final TxFacade txFacade,
149                                              final DeviceRegistry deviceRegistry,
150                                              final DeviceInfo deviceInfo,
151                                              final MultipartWriterProvider statisticsWriterProvider) {
152         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
153                 .getNodeInstanceIdentifier()
154                 .augmentation(FlowCapableNode.class);
155
156         switch (type) {
157             case OFPMPFLOW:
158                 deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
159                 break;
160             case OFPMPMETERCONFIG:
161                 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
162                 break;
163             case OFPMPGROUPDESC:
164                 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
165                 break;
166             default:
167                 // no operation
168         }
169
170         if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
171             txFacade.submitTransaction();
172
173             switch (type) {
174                 case OFPMPFLOW:
175                     deviceRegistry.getDeviceFlowRegistry().processMarks();
176                     break;
177                 case OFPMPMETERCONFIG:
178                     deviceRegistry.getDeviceMeterRegistry().processMarks();
179                     break;
180                 case OFPMPGROUPDESC:
181                     deviceRegistry.getDeviceGroupRegistry().processMarks();
182                     break;
183                 default:
184                     // no operation
185             }
186
187             LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
188             return true;
189         }
190
191         LOG.warn("Stats processing of type {} for node {} "
192                 + "failed during write-to-tx step", type, deviceInfo);
193         return false;
194     }
195
196     @SuppressWarnings("checkstyle:IllegalCatch")
197     private static boolean writeStatistics(final MultipartType type,
198                                            final List<? extends DataContainer> statistics,
199                                            final DeviceInfo deviceInfo,
200                                            final MultipartWriterProvider statisticsWriterProvider) {
201         final AtomicBoolean result = new AtomicBoolean(false);
202
203         try {
204             statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
205                 final boolean write = p.write(stat, false);
206
207                 if (!result.get()) {
208                     result.set(write);
209                 }
210             }));
211         } catch (final Exception ex) {
212             LOG.warn("Stats processing of type {} for node {} "
213                     + "failed during write-to-tx step", type, deviceInfo, ex);
214         }
215
216         return result.get();
217     }
218
219     public static void deleteAllKnownFlows(final TxFacade txFacade,
220                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
221                                            final DeviceFlowRegistry deviceFlowRegistry) {
222         if (!txFacade.isTransactionsEnabled()) {
223             return;
224         }
225
226         final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
227
228         try {
229             Futures.transform(Futures
230                 .catchingAsync(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), Throwable.class,
231                     t -> {
232                         // we wish to close readTx for fallBack
233                         readTx.close();
234                         return Futures.immediateFailedFuture(t);
235                     }), (Function<Optional<FlowCapableNode>, Void>)
236                 flowCapNodeOpt -> {
237                     // we have to read actual tables with all information before we set empty Flow list,
238                     // merge is expensive and not applicable for lists
239                     if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
240                         for (final Table tableData : flowCapNodeOpt.get().getTable()) {
241                             final Table table =
242                                     new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
243                             final InstanceIdentifier<Table> iiToTable =
244                                     instanceIdentifier.child(Table.class, tableData.getKey());
245                             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
246                         }
247                     }
248
249                     readTx.close();
250                     return null;
251                 }).get();
252         } catch (InterruptedException | ExecutionException ex) {
253             LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex);
254         }
255     }
256
257     public static void deleteAllKnownMeters(final TxFacade txFacade,
258                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
259                                             final DeviceMeterRegistry meterRegistry) {
260         meterRegistry.forEach(meterId -> txFacade
261                 .addDeleteToTxChain(
262                         LogicalDatastoreType.OPERATIONAL,
263                         instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
264     }
265
266     public static void deleteAllKnownGroups(final TxFacade txFacade,
267                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
268                                             final DeviceGroupRegistry groupRegistry) {
269         groupRegistry.forEach(groupId -> txFacade
270                 .addDeleteToTxChain(
271                         LogicalDatastoreType.OPERATIONAL,
272                         instanceIdentifier.child(Group.class, new GroupKey(groupId))));
273     }
274
275     /**
276      * Writes snapshot gathering start timestamp + cleans end mark.
277      *
278      * @param deviceInfo device info
279      * @param txFacade tx manager
280      */
281     static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
282         final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
283                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
284
285         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
286         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
287                 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
288                         .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
289                         .build())
290                 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
291                 .build();
292         try {
293             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
294         } catch (final TransactionChainClosedException e) {
295             LOG.warn("Can't write to transaction, transaction chain probably closed.");
296             LOG.trace("Write to transaction exception: ", e);
297         }
298
299         txFacade.submitTransaction();
300     }
301
302     /**
303      * Writes snapshot gathering end timestamp + outcome.
304      *
305      * @param deviceInfo device info
306      * @param txFacade tx manager
307      * @param succeeded     outcome of currently finished gathering
308      */
309     static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo, final TxFacade txFacade, final boolean succeeded) {
310         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
311                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
312                 .child(SnapshotGatheringStatusEnd.class);
313
314         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
315         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
316                 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
317                 .setSucceeded(succeeded)
318                 .build();
319         try {
320             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
321         } catch (TransactionChainClosedException e) {
322             LOG.warn("Can't write to transaction, transaction chain probably closed.");
323             LOG.trace("Write to transaction exception: ", e);
324         }
325
326         txFacade.submitTransaction();
327     }
328 }