80c8fa3cf8ff755cb8edb6ea5887442659151da3
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.text.SimpleDateFormat;
17 import java.util.Collections;
18 import java.util.Date;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.stream.Collectors;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
32 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
33 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
34 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
35 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
36 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
37 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
38 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
39 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
40 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
41 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
56 import org.opendaylight.yangtools.yang.binding.DataContainer;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Utils for gathering statistics
64  */
65 public final class StatisticsGatheringUtils {
66
67     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
68     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
69     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
70
71     private StatisticsGatheringUtils() {
72         throw new IllegalStateException("This class should not be instantiated.");
73     }
74
75     static <T extends OfHeader>ListenableFuture<Boolean> gatherStatistics(final StatisticsGatherer<T> statisticsGatheringService,
76                                                                           final DeviceInfo deviceInfo,
77                                                                           final MultipartType type,
78                                                                           final TxFacade txFacade,
79                                                                           final DeviceRegistry registry,
80                                                                           final ConvertorExecutor convertorExecutor,
81                                                                           final MultipartWriterProvider statisticsWriterProvider) {
82         return Futures.transformAsync(
83                 statisticsGatheringService.getStatisticsOfType(
84                         new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
85                         type),
86                 new AsyncFunction<RpcResult<List<T>>, Boolean>() {
87                     @Nullable
88                     @Override
89                     public ListenableFuture<Boolean> apply(@Nonnull final RpcResult<List<T>> rpcResult) {
90                         boolean isMultipartProcessed = Boolean.TRUE;
91
92                         if (rpcResult.isSuccessful()) {
93                             LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
94
95                             // TODO: in case the result value is null then multipart data probably got processed on the fly -
96                             // TODO: this contract should by clearly stated and enforced - now simple true value is returned
97                             if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
98                                 final List<DataContainer> allMultipartData;
99
100                                 try {
101                                     allMultipartData = rpcResult
102                                             .getResult()
103                                             .stream()
104                                             .map(reply ->  MultipartReplyTranslatorUtil
105                                                     .translate(reply, deviceInfo, convertorExecutor, null))
106                                             .filter(java.util.Optional::isPresent)
107                                             .map(java.util.Optional::get)
108                                             .collect(Collectors.toList());
109                                 } catch (final Exception e) {
110                                     LOG.warn("Stats processing of type {} for node {} failed during transformation step",
111                                             type, deviceInfo.getLOGValue(), e);
112                                     return Futures.immediateFailedFuture(e);
113                                 }
114
115                                 try {
116                                     return Futures.immediateFuture(processStatistics(
117                                             type,
118                                             allMultipartData,
119                                             txFacade,
120                                             registry,
121                                             deviceInfo,
122                                             statisticsWriterProvider));
123                                 } catch (final Exception e) {
124                                     LOG.warn("Stats processing of type {} for node {} failed during processing step",
125                                             type, deviceInfo.getNodeId(), e);
126                                     return Futures.immediateFailedFuture(e);
127                                 }
128                             } else {
129                                 LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
130                             }
131                         } else {
132                             LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
133                                     rpcResult.getErrors());
134                             isMultipartProcessed = Boolean.FALSE;
135                         }
136
137                         return Futures.immediateFuture(isMultipartProcessed);
138                     }
139                 });
140     }
141
142     private static boolean processStatistics(final MultipartType type,
143                                              final List<? extends DataContainer> statistics,
144                                              final TxFacade txFacade,
145                                              final DeviceRegistry deviceRegistry,
146                                              final DeviceInfo deviceInfo,
147                                              final MultipartWriterProvider statisticsWriterProvider) {
148         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
149                 .getNodeInstanceIdentifier()
150                 .augmentation(FlowCapableNode.class);
151
152         switch (type) {
153             case OFPMPFLOW:
154                 deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
155                 break;
156             case OFPMPMETERCONFIG:
157                 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
158                 break;
159             case OFPMPGROUPDESC:
160                 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
161                 break;
162         }
163
164         if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
165             txFacade.submitTransaction();
166
167             switch (type) {
168                 case OFPMPFLOW:
169                     deviceRegistry.getDeviceFlowRegistry().processMarks();
170                     break;
171                 case OFPMPMETERCONFIG:
172                     deviceRegistry.getDeviceMeterRegistry().processMarks();
173                     break;
174                 case OFPMPGROUPDESC:
175                     deviceRegistry.getDeviceGroupRegistry().processMarks();
176                     break;
177             }
178
179             LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
180             return true;
181         }
182
183         LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue());
184         return false;
185     }
186
187     private static boolean writeStatistics(final MultipartType type,
188                                            final List<? extends DataContainer> statistics,
189                                            final DeviceInfo deviceInfo,
190                                            final MultipartWriterProvider statisticsWriterProvider) {
191         final AtomicBoolean result = new AtomicBoolean(false);
192
193         try {
194             statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
195                 final boolean write = p.write(stat, false);
196
197                 if (!result.get()) {
198                     result.set(write);
199                 }
200             }));
201         } catch (final Exception ex) {
202             LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue(), ex);
203         }
204
205         return result.get();
206     }
207
208     public static void deleteAllKnownFlows(final TxFacade txFacade,
209                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
210                                            final DeviceFlowRegistry deviceFlowRegistry) {
211         if (!txFacade.isTransactionsEnabled()) {
212             return;
213         }
214
215         final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
216
217         try {
218             Futures.transform(Futures
219                     .catchingAsync(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier),
220                             Throwable.class,
221                             t -> {
222                         // we wish to close readTx for fallBack
223                         readTx.close();
224                         return Futures.immediateFailedFuture(t);
225                     }), (Function<Optional<FlowCapableNode>, Void>)
226                     flowCapNodeOpt -> {
227                         // we have to read actual tables with all information before we set empty Flow list, merge is expensive and
228                         // not applicable for lists
229                         if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
230                             for (final Table tableData : flowCapNodeOpt.get().getTable()) {
231                                 final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
232                                 final InstanceIdentifier<Table> iiToTable = instanceIdentifier.child(Table.class, tableData.getKey());
233                                 txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
234                             }
235                         }
236
237                         readTx.close();
238                         return null;
239                     }).get();
240         } catch (InterruptedException | ExecutionException ex) {
241             LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex);
242         }
243     }
244
245     public static void deleteAllKnownMeters(final TxFacade txFacade,
246                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
247                                             final DeviceMeterRegistry meterRegistry) {
248         meterRegistry.forEach(meterId -> txFacade
249                 .addDeleteToTxChain(
250                         LogicalDatastoreType.OPERATIONAL,
251                         instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
252     }
253
254     public static void deleteAllKnownGroups(final TxFacade txFacade,
255                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
256                                             final DeviceGroupRegistry groupRegistry) {
257         groupRegistry.forEach(groupId -> txFacade
258                 .addDeleteToTxChain(
259                         LogicalDatastoreType.OPERATIONAL,
260                         instanceIdentifier.child(Group.class, new GroupKey(groupId))));
261     }
262
263     /**
264      * Writes snapshot gathering start timestamp + cleans end mark
265      *
266      * @param deviceContext txManager + node path keeper
267      */
268     static void markDeviceStateSnapshotStart(final DeviceContext deviceContext) {
269         final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceContext.getDeviceInfo()
270                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
271
272         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
273         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
274                 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
275                         .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
276                         .build())
277                 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
278                 .build();
279         try {
280             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
281         } catch (final TransactionChainClosedException e) {
282             LOG.warn("Can't write to transaction, transaction chain probably closed.");
283             LOG.trace("Write to transaction exception: ", e);
284         }
285
286         deviceContext.submitTransaction();
287     }
288
289     /**
290      * Writes snapshot gathering end timestamp + outcome
291      *
292      * @param deviceContext txManager + node path keeper
293      * @param succeeded     outcome of currently finished gathering
294      */
295     static void markDeviceStateSnapshotEnd(final DeviceContext deviceContext, final boolean succeeded) {
296         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceContext.getDeviceInfo()
297                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
298                 .child(SnapshotGatheringStatusEnd.class);
299
300         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
301         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
302                 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
303                 .setSucceeded(succeeded)
304                 .build();
305         try {
306             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
307         } catch (TransactionChainClosedException e) {
308             LOG.warn("Can't write to transaction, transaction chain probably closed.");
309             LOG.trace("Write to transaction exception: ", e);
310         }
311
312         deviceContext.submitTransaction();
313     }
314 }