Modernize codebase
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.statistics;
9
10 import com.google.common.util.concurrent.Futures;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.text.SimpleDateFormat;
14 import java.util.Date;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.stream.Collectors;
22 import org.opendaylight.mdsal.binding.api.ReadTransaction;
23 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
26 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
27 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
28 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
29 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
30 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
32 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
33 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
34 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
35 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
51 import org.opendaylight.yangtools.yang.binding.DataContainer;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Utils for gathering statistics.
58  */
59 public final class StatisticsGatheringUtils {
60     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
61     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
62     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
63
64     private StatisticsGatheringUtils() {
65         // Hidden on purpose
66     }
67
68     static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
69             final StatisticsGatherer<T> statisticsGatheringService, final DeviceInfo deviceInfo,
70             final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry,
71             final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider,
72             final Executor executor) {
73         return Futures.transform(statisticsGatheringService.getStatisticsOfType(
74             new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
75             rpcResult -> {
76                 final boolean rpcResultIsNull = rpcResult == null;
77
78                 if (!rpcResultIsNull && rpcResult.isSuccessful()) {
79                     LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
80                         // TODO: in case the result value is null then multipart data probably got processed
81                         // TODO: on the fly. This contract should by clearly stated and enforced.
82                         // TODO: Now simple true value is returned
83                     if (rpcResult.getResult() != null && !rpcResult.getResult().isEmpty()) {
84                         final List<DataContainer> allMultipartData = rpcResult.getResult().stream()
85                                 .map(reply -> MultipartReplyTranslatorUtil
86                                                     .translate(reply, deviceInfo, convertorExecutor, null))
87                                 .filter(Optional::isPresent).map(Optional::orElseThrow)
88                                 .collect(Collectors.toList());
89
90                         return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
91                                         statisticsWriterProvider);
92                     } else {
93                         LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
94                     }
95                 } else {
96                     LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
97                                 rpcResultIsNull ? "" : rpcResult.getErrors());
98                 }
99                 return false;
100             }, executor);
101     }
102
103     @SuppressWarnings("checkstyle:IllegalCatch")
104     private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
105                                              final TxFacade txFacade, final DeviceRegistry deviceRegistry,
106                                              final DeviceInfo deviceInfo,
107                                              final MultipartWriterProvider statisticsWriterProvider) {
108         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
109                 .augmentation(FlowCapableNode.class);
110         try {
111             txFacade.acquireWriteTransactionLock();
112             switch (type) {
113                 case OFPMPFLOW:
114                     deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
115                     deviceRegistry.getDeviceFlowRegistry().processMarks();
116                     break;
117                 case OFPMPMETERCONFIG:
118                     deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
119                     deviceRegistry.getDeviceMeterRegistry().processMarks();
120                     break;
121                 case OFPMPGROUPDESC:
122                     deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
123                     deviceRegistry.getDeviceGroupRegistry().processMarks();
124                     break;
125                 default:
126                     // no operation
127             }
128
129             if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
130                 txFacade.submitTransaction();
131
132                 LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
133                 return true;
134             }
135         } catch (Exception e) {
136             LOG.error("Exception while writing statistics to operational inventory for the device {}",
137                     deviceInfo.getLOGValue(), e);
138         } finally {
139             txFacade.releaseWriteTransactionLock();
140         }
141
142         LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);
143         return false;
144     }
145
146     @SuppressWarnings("checkstyle:IllegalCatch")
147     private static boolean writeStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
148                                            final DeviceInfo deviceInfo,
149                                            final MultipartWriterProvider statisticsWriterProvider) {
150         final AtomicBoolean result = new AtomicBoolean(false);
151
152         try {
153             statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
154                 final boolean write = p.write(stat, false);
155
156                 if (!result.get()) {
157                     result.set(write);
158                 }
159             }));
160         } catch (final Exception ex) {
161             LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo,
162                      ex);
163         }
164
165         return result.get();
166     }
167
168     public static void deleteAllKnownFlows(final TxFacade txFacade,
169                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
170                                            final DeviceFlowRegistry deviceFlowRegistry) {
171         if (!txFacade.isTransactionsEnabled()) {
172             return;
173         }
174
175         final ListenableFuture<Optional<FlowCapableNode>> future;
176         try (ReadTransaction readTx = txFacade.getReadTransaction()) {
177             future = readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier);
178         }
179
180         try {
181             Futures.transform(Futures.catchingAsync(future, Throwable.class, Futures::immediateFailedFuture,
182                 MoreExecutors.directExecutor()), flowCapNodeOpt -> {
183                     // we have to read actual tables with all information before we set empty Flow list,
184                     // merge is expensive and not applicable for lists
185                     if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
186                         for (final Table tableData : flowCapNodeOpt.orElseThrow().nonnullTable().values()) {
187                             final Table table = new TableBuilder(tableData).setFlow(Map.of()).build();
188                             final InstanceIdentifier<Table> iiToTable = instanceIdentifier
189                                 .child(Table.class, tableData.key());
190                             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
191                         }
192                     }
193                     return null;
194                 }, MoreExecutors.directExecutor()).get();
195         } catch (InterruptedException | ExecutionException ex) {
196             LOG.debug("Failed to delete {} flows", deviceFlowRegistry.size(), ex);
197         }
198     }
199
200     public static void deleteAllKnownMeters(final TxFacade txFacade,
201                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
202                                             final DeviceMeterRegistry meterRegistry) {
203         meterRegistry.forEach(meterId -> {
204             txFacade
205                     .addDeleteToTxChain(
206                             LogicalDatastoreType.OPERATIONAL,
207                             instanceIdentifier.child(Meter.class, new MeterKey(meterId)));
208             meterRegistry.addMark(meterId);
209         });
210     }
211
212     public static void deleteAllKnownGroups(final TxFacade txFacade,
213                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
214                                             final DeviceGroupRegistry groupRegistry) {
215         LOG.debug("deleteAllKnownGroups on device targetType {}", instanceIdentifier.getTargetType());
216         groupRegistry.forEach(groupId -> {
217             txFacade
218                     .addDeleteToTxChain(
219                             LogicalDatastoreType.OPERATIONAL,
220                             instanceIdentifier.child(Group.class, new GroupKey(groupId)));
221             groupRegistry.addMark(groupId);
222         });
223     }
224
225     /**
226      * Writes snapshot gathering start timestamp + cleans end mark.
227      *
228      * @param deviceInfo device info
229      * @param txFacade tx manager
230      */
231     static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
232         final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
233                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
234
235         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
236         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
237                 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
238                         .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
239                         .build())
240                 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
241                 .build();
242         try {
243             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
244         } catch (final TransactionChainClosedException e) {
245             LOG.warn("Can't write to transaction, transaction chain probably closed.");
246             LOG.trace("Write to transaction exception: ", e);
247         }
248
249         txFacade.submitTransaction();
250     }
251
252     /**
253      * Writes snapshot gathering end timestamp + outcome.
254      *
255      * @param deviceInfo device info
256      * @param txFacade tx manager
257      * @param succeeded     outcome of currently finished gathering
258      */
259     static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo,
260                                            final TxFacade txFacade, final boolean succeeded) {
261         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
262                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
263                 .child(SnapshotGatheringStatusEnd.class);
264
265         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
266         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
267                 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
268                 .setSucceeded(succeeded)
269                 .build();
270         try {
271             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
272         } catch (TransactionChainClosedException e) {
273             LOG.warn("Can't write to transaction, transaction chain probably closed.");
274             LOG.trace("Write to transaction exception: ", e);
275         }
276
277         txFacade.submitTransaction();
278     }
279 }