Fixup Augmentable and Identifiable methods changing
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.text.SimpleDateFormat;
18 import java.util.Collections;
19 import java.util.Date;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.stream.Collectors;
25 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
30 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
31 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
32 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
33 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
34 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
35 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
36 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
37 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
38 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatusBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEndBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusStartBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
54 import org.opendaylight.yangtools.yang.binding.DataContainer;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Utils for gathering statistics.
61  */
62 public final class StatisticsGatheringUtils {
63
64     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
65     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
66     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
67
68     private StatisticsGatheringUtils() {
69         throw new IllegalStateException("This class should not be instantiated.");
70     }
71
72     static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
73             final StatisticsGatherer<T> statisticsGatheringService, final DeviceInfo deviceInfo,
74             final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry,
75             final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider,
76             final ListeningExecutorService executorService) {
77         return Futures.transformAsync(statisticsGatheringService.getStatisticsOfType(
78            new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
79             rpcResult -> executorService.submit(() -> {
80                 final boolean rpcResultIsNull = rpcResult == null;
81
82                 if (!rpcResultIsNull && rpcResult.isSuccessful()) {
83                     LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
84                         // TODO: in case the result value is null then multipart data probably got processed
85                         // TODO: on the fly. This contract should by clearly stated and enforced.
86                         // TODO: Now simple true value is returned
87                     if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
88                         final List<DataContainer> allMultipartData = rpcResult.getResult().stream()
89                                 .map(reply -> MultipartReplyTranslatorUtil
90                                                     .translate(reply, deviceInfo, convertorExecutor, null))
91                                 .filter(java.util.Optional::isPresent).map(java.util.Optional::get)
92                                             .collect(Collectors.toList());
93
94                         return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
95                                         statisticsWriterProvider);
96                     } else {
97                         LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
98                     }
99                 } else {
100                     LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
101                                 rpcResultIsNull ? "" : rpcResult.getErrors());
102                 }
103                 return false;
104             }), MoreExecutors.directExecutor());
105     }
106
107     private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
108                                              final TxFacade txFacade, final DeviceRegistry deviceRegistry,
109                                              final DeviceInfo deviceInfo,
110                                              final MultipartWriterProvider statisticsWriterProvider) {
111         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
112                 .augmentation(FlowCapableNode.class);
113
114         switch (type) {
115             case OFPMPFLOW:
116                 deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
117                 deviceRegistry.getDeviceFlowRegistry().processMarks();
118                 break;
119             case OFPMPMETERCONFIG:
120                 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
121                 deviceRegistry.getDeviceMeterRegistry().processMarks();
122                 break;
123             case OFPMPGROUPDESC:
124                 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
125                 deviceRegistry.getDeviceGroupRegistry().processMarks();
126                 break;
127             default:
128                 // no operation
129         }
130
131         if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
132             txFacade.submitTransaction();
133
134             LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
135             return true;
136         }
137
138         LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);
139         return false;
140     }
141
142     @SuppressWarnings("checkstyle:IllegalCatch")
143     private static boolean writeStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
144                                            final DeviceInfo deviceInfo,
145                                            final MultipartWriterProvider statisticsWriterProvider) {
146         final AtomicBoolean result = new AtomicBoolean(false);
147
148         try {
149             statistics.forEach(stat -> statisticsWriterProvider.lookup(type).ifPresent(p -> {
150                 final boolean write = p.write(stat, false);
151
152                 if (!result.get()) {
153                     result.set(write);
154                 }
155             }));
156         } catch (final Exception ex) {
157             LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo,
158                      ex);
159         }
160
161         return result.get();
162     }
163
164     public static void deleteAllKnownFlows(final TxFacade txFacade,
165                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
166                                            final DeviceFlowRegistry deviceFlowRegistry) {
167         if (!txFacade.isTransactionsEnabled()) {
168             return;
169         }
170
171         final ListenableFuture<Optional<FlowCapableNode>> future;
172         try (ReadOnlyTransaction readTx = txFacade.getReadTransaction()) {
173             future = readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier);
174         }
175
176         try {
177             Futures.transform(Futures.catchingAsync(future, Throwable.class, throwable -> {
178                 return Futures.immediateFailedFuture(throwable);
179             }, MoreExecutors.directExecutor()), (Function<Optional<FlowCapableNode>, Void>) flowCapNodeOpt -> {
180                     // we have to read actual tables with all information before we set empty Flow list,
181                     // merge is expensive and not applicable for lists
182                     if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
183                         for (final Table tableData : flowCapNodeOpt.get().getTable()) {
184                             final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
185                             final InstanceIdentifier<Table> iiToTable = instanceIdentifier
186                                     .child(Table.class, tableData.key());
187                             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
188                         }
189                     }
190                     return null;
191                 }, MoreExecutors.directExecutor()).get();
192         } catch (InterruptedException | ExecutionException ex) {
193             LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex);
194         }
195     }
196
197     public static void deleteAllKnownMeters(final TxFacade txFacade,
198                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
199                                             final DeviceMeterRegistry meterRegistry) {
200         meterRegistry.forEach(meterId -> {
201             txFacade
202                     .addDeleteToTxChain(
203                             LogicalDatastoreType.OPERATIONAL,
204                             instanceIdentifier.child(Meter.class, new MeterKey(meterId)));
205             meterRegistry.addMark(meterId);
206         });
207     }
208
209     public static void deleteAllKnownGroups(final TxFacade txFacade,
210                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
211                                             final DeviceGroupRegistry groupRegistry) {
212         LOG.debug("deleteAllKnownGroups on device targetType {}", instanceIdentifier.getTargetType());
213         groupRegistry.forEach(groupId -> {
214             txFacade
215                     .addDeleteToTxChain(
216                             LogicalDatastoreType.OPERATIONAL,
217                             instanceIdentifier.child(Group.class, new GroupKey(groupId)));
218             groupRegistry.addMark(groupId);
219         });
220     }
221
222     /**
223      * Writes snapshot gathering start timestamp + cleans end mark.
224      *
225      * @param deviceInfo device info
226      * @param txFacade tx manager
227      */
228     static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
229         final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
230                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
231
232         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
233         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
234                 .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
235                         .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
236                         .build())
237                 .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
238                 .build();
239         try {
240             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
241         } catch (final TransactionChainClosedException e) {
242             LOG.warn("Can't write to transaction, transaction chain probably closed.");
243             LOG.trace("Write to transaction exception: ", e);
244         }
245
246         txFacade.submitTransaction();
247     }
248
249     /**
250      * Writes snapshot gathering end timestamp + outcome.
251      *
252      * @param deviceInfo device info
253      * @param txFacade tx manager
254      * @param succeeded     outcome of currently finished gathering
255      */
256     static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo,
257                                            final TxFacade txFacade, final boolean succeeded) {
258         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
259                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
260                 .child(SnapshotGatheringStatusEnd.class);
261
262         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
263         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
264                 .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
265                 .setSucceeded(succeeded)
266                 .build();
267         try {
268             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
269         } catch (TransactionChainClosedException e) {
270             LOG.warn("Can't write to transaction, transaction chain probably closed.");
271             LOG.trace("Write to transaction exception: ", e);
272         }
273
274         txFacade.submitTransaction();
275     }
276 }