Merge changes from topic 'ofj-models-to-ofp-models'
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.text.SimpleDateFormat;
17 import java.util.Collections;
18 import java.util.Date;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.stream.Collectors;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
25 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
31 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
32 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
33 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
34 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
35 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
36 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
37 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
38 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
39 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
55 import org.opendaylight.yangtools.yang.binding.DataContainer;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * Utils for gathering statistics
63  */
64 public final class StatisticsGatheringUtils {
65
66     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
67     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
68     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
69
70     private StatisticsGatheringUtils() {
71         throw new IllegalStateException("This class should not be instantiated.");
72     }
73
74     static <T extends OfHeader>ListenableFuture<Boolean> gatherStatistics(final StatisticsGatherer<T> statisticsGatheringService,
75                                                                           final DeviceInfo deviceInfo,
76                                                                           final MultipartType type,
77                                                                           final TxFacade txFacade,
78                                                                           final DeviceRegistry registry,
79                                                                           final Boolean initial,
80                                                                           final ConvertorExecutor convertorExecutor,
81                                                                           final MultipartWriterProvider statisticsWriterProvider) {
82
83         final EventIdentifier eventIdentifier;
84         if (MultipartType.OFPMPFLOW.equals(type)) {
85             eventIdentifier = new EventIdentifier(type.toString(), deviceInfo.getNodeId().getValue());
86             EventsTimeCounter.markStart(eventIdentifier);
87         } else {
88             eventIdentifier = null;
89         }
90
91         return Futures.transform(
92             statisticsGatheringService.getStatisticsOfType(
93                 new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
94                 type),
95             new AsyncFunction<RpcResult<List<T>>, Boolean>() {
96                 @Nullable
97                 @Override
98                 public ListenableFuture<Boolean> apply(@Nonnull final RpcResult<List<T>> rpcResult) {
99                     boolean isMultipartProcessed = Boolean.TRUE;
100
101                     if (rpcResult.isSuccessful()) {
102                         LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
103
104                         // TODO: in case the result value is null then multipart data probably got processed on the fly -
105                         // TODO: this contract should by clearly stated and enforced - now simple true value is returned
106                         if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
107                             final List<DataContainer> allMultipartData;
108
109                             try {
110                                 allMultipartData = rpcResult
111                                     .getResult()
112                                     .stream()
113                                     .map(reply ->  MultipartReplyTranslatorUtil
114                                         .translate(reply, deviceInfo, convertorExecutor, null))
115                                     .filter(java.util.Optional::isPresent)
116                                     .map(java.util.Optional::get)
117                                     .collect(Collectors.toList());
118                             } catch (final Exception e) {
119                                 LOG.warn("Stats processing of type {} for node {} failed during transformation step",
120                                     type, deviceInfo.getLOGValue(), e);
121                                 return Futures.immediateFailedFuture(e);
122                             }
123
124                             try {
125                                 return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
126                                     statisticsWriterProvider,
127                                     eventIdentifier, initial);
128                             } catch (final Exception e) {
129                                 LOG.warn("Stats processing of type {} for node {} failed during processing step",
130                                     type, deviceInfo.getNodeId(), e);
131                                 return Futures.immediateFailedFuture(e);
132                             }
133                         } else {
134                             LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
135                         }
136                     } else {
137                         LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
138                             rpcResult.getErrors());
139                         isMultipartProcessed = Boolean.FALSE;
140                     }
141
142                     return Futures.immediateFuture(isMultipartProcessed);
143                 }
144             });
145     }
146
147     private static ListenableFuture<Boolean> processStatistics(final MultipartType type,
148                                                                final List<? extends DataContainer> statistics,
149                                                                final TxFacade txFacade,
150                                                                final DeviceRegistry deviceRegistry,
151                                                                final DeviceInfo deviceInfo,
152                                                                final MultipartWriterProvider statisticsWriterProvider,
153                                                                final EventIdentifier eventIdentifier,
154                                                                final boolean initial) {
155
156         ListenableFuture<Void> future = Futures.immediateFuture(null);
157
158         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
159             .getNodeInstanceIdentifier()
160             .augmentation(FlowCapableNode.class);
161
162         switch (type) {
163             case OFPMPFLOW:
164                 future = deleteAllKnownFlows(txFacade, instanceIdentifier, initial);
165                 break;
166             case OFPMPMETERCONFIG:
167                 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
168                 break;
169             case OFPMPGROUPDESC:
170                 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
171                 break;
172         }
173
174         return Futures.transform(future, (Function<Void, Boolean>) input -> {
175             if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
176                 txFacade.submitTransaction();
177
178                 if (MultipartType.OFPMPFLOW.equals(type)) {
179                     EventsTimeCounter.markEnd(eventIdentifier);
180                 }
181
182                 LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
183                 return Boolean.TRUE;
184             }
185
186             LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue());
187             return Boolean.FALSE;
188         });
189     }
190
191     private static boolean writeStatistics(final MultipartType type,
192                                           final List<? extends DataContainer> statistics,
193                                           final DeviceInfo deviceInfo,
194                                           final MultipartWriterProvider statisticsWriterProvider) {
195         final AtomicBoolean result = new AtomicBoolean(false);
196
197         try {
198             statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
199                 final boolean write = p.write(stat, false);
200
201                 if (!result.get()) {
202                     result.set(write);
203                 }
204             }));
205         } catch (final Exception ex) {
206             LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue(), ex);
207         }
208
209         return result.get();
210     }
211
212     public static ListenableFuture<Void> deleteAllKnownFlows(final TxFacade txFacade,
213                                                              final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
214                                                              final boolean initial) {
215         if (initial) {
216             return Futures.immediateFuture(null);
217         }
218
219         final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
220         return Futures.transform(Futures
221             .withFallback(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), t -> {
222                 // we wish to close readTx for fallBack
223                 readTx.close();
224                 return Futures.immediateFailedFuture(t);
225             }), (Function<Optional<FlowCapableNode>, Void>)
226             flowCapNodeOpt -> {
227                 // we have to read actual tables with all information before we set empty Flow list, merge is expensive and
228                 // not applicable for lists
229                 if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
230                     for (final Table tableData : flowCapNodeOpt.get().getTable()) {
231                         final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
232                         final InstanceIdentifier<Table> iiToTable = instanceIdentifier.child(Table.class, tableData.getKey());
233                         txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
234                     }
235                 }
236
237                 readTx.close();
238                 return null;
239             });
240     }
241
242     private static void deleteAllKnownMeters(final TxFacade txFacade,
243                                              final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
244                                              final DeviceMeterRegistry meterRegistry) {
245         meterRegistry.getAllMeterIds().forEach(meterId -> txFacade
246             .addDeleteToTxChain(
247                 LogicalDatastoreType.OPERATIONAL,
248                 instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
249
250         meterRegistry.removeMarked();
251     }
252
253     private static void deleteAllKnownGroups(final TxFacade txFacade,
254                                              final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
255                                              final DeviceGroupRegistry groupRegistry) {
256         groupRegistry.getAllGroupIds().forEach(groupId -> txFacade
257             .addDeleteToTxChain(
258                 LogicalDatastoreType.OPERATIONAL,
259                 instanceIdentifier.child(Group.class, new GroupKey(groupId))));
260
261         groupRegistry.removeMarked();
262     }
263
264     /**
265      * Writes snapshot gathering start timestamp + cleans end mark
266      *
267      * @param deviceContext txManager + node path keeper
268      */
269     static void markDeviceStateSnapshotStart(final DeviceContext deviceContext) {
270         final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceContext.getDeviceInfo()
271             .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
272
273         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
274         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
275             .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
276                 .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
277                 .build())
278             .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
279             .build();
280         try {
281             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
282         } catch (final TransactionChainClosedException e) {
283             LOG.warn("Can't write to transaction, transaction chain probably closed.");
284             LOG.trace("Write to transaction exception: ", e);
285         }
286
287         deviceContext.submitTransaction();
288     }
289
290     /**
291      * Writes snapshot gathering end timestamp + outcome
292      *
293      * @param deviceContext txManager + node path keeper
294      * @param succeeded     outcome of currently finished gathering
295      */
296     static void markDeviceStateSnapshotEnd(final DeviceContext deviceContext, final boolean succeeded) {
297         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceContext.getDeviceInfo()
298             .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
299             .child(SnapshotGatheringStatusEnd.class);
300
301         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
302         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
303             .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
304             .setSucceeded(succeeded)
305             .build();
306         try {
307             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
308         } catch (TransactionChainClosedException e) {
309             LOG.warn("Can't write to transaction, transaction chain probably closed.");
310             LOG.trace("Write to transaction exception: ", e);
311         }
312
313         deviceContext.submitTransaction();
314     }
315 }