Merge "Bug 6110: Fixed bugs in statistics manager due to race condition." into stable...
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatListenCommitGroup.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.List;
15
16 import java.util.UUID;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
19 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
20 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
23 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
24 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
25 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
26 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
27 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
28 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
29 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatistics;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
58 import org.opendaylight.yangtools.yang.binding.DataObject;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 import com.google.common.base.Optional;
64 import com.google.common.base.Preconditions;
65
66 /**
67  * statistics-manager
68  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
69  *
70  * StatListenCommitGroup
71  * Class is a NotifyListener for GroupStatistics and DataTreeChangeListener for Config/DataStore for Group node.
72  * All expected (registered) GroupStatistics will be builded and commit to Operational/DataStore.
73  * DataTreeModification should call create/delete Group in Operational/DS
74  */
75 public class StatListenCommitGroup extends StatAbstractListenCommit<Group, OpendaylightGroupStatisticsListener>
76                                                     implements OpendaylightGroupStatisticsListener {
77
78     private static final Logger LOG = LoggerFactory.getLogger(StatListenCommitGroup.class);
79
80     public StatListenCommitGroup(final StatisticsManager manager,  final DataBroker db,
81             final NotificationProviderService nps,
82                                  final StatNodeRegistration nrm) {
83         super(manager, db, nps, Group.class,nrm);
84     }
85
86     @Override
87     protected OpendaylightGroupStatisticsListener getStatNotificationListener() {
88         return this;
89     }
90
91     @Override
92     protected InstanceIdentifier<Group> getWildCardedRegistrationPath() {
93         return InstanceIdentifier.create(Nodes.class).child(Node.class)
94                 .augmentation(FlowCapableNode.class).child(Group.class);
95     }
96
97     @Override
98     protected void processDataChange(Collection<DataTreeModification<Group>> changes) {
99         //NO-OP
100     }
101
102     @Override
103     public void onGroupDescStatsUpdated(final GroupDescStatsUpdated notification) {
104         final TransactionId transId = notification.getTransactionId();
105         final NodeId nodeId = notification.getId();
106         if ( ! isExpectedStatistics(transId, nodeId)) {
107             LOG.debug("Unregistred notification detect TransactionId {}", transId);
108             return;
109         }
110         manager.getRpcMsgManager().addNotification(notification, nodeId);
111         if (notification.isMoreReplies()) {
112             return;
113         }
114
115         /* Don't block RPC Notification thread */
116         manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
117             @Override
118             public void applyOperation(final ReadWriteTransaction tx) {
119                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
120                         .create(Nodes.class).child(Node.class, new NodeKey(nodeId));
121                 /* Validate exist FlowCapableNode */
122                 final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
123                 Optional<FlowCapableNode> fNode = Optional.absent();
124                 try {
125                     fNode = tx.read(LogicalDatastoreType.OPERATIONAL,fNodeIdent).checkedGet();
126                 }
127                 catch (final ReadFailedException e) {
128                     LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
129                 }
130                 if ( ! fNode.isPresent()) {
131                     return;
132                 }
133                 /* Get and Validate TransactionCacheContainer */
134                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
135                 if ( ! isTransactionCacheContainerValid(txContainer)) {
136                     return;
137                 }
138
139                 if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
140
141                 /* Prepare List actual Groups and not updated Groups will be removed */
142                 final List<Group> existGroups = fNode.get().getGroup() != null
143                         ? fNode.get().getGroup() : Collections.<Group> emptyList();
144                 final List<GroupKey> existGroupKeys = new ArrayList<>();
145                 for (final Group group : existGroups) {
146                     existGroupKeys.add(group.getKey());
147                 }
148                 /* GroupDesc processing */
149                 statGroupDescCommit(txContainer, tx, fNodeIdent, existGroupKeys);
150                 /* Delete all not presented Group Nodes */
151                 deleteAllNotPresentNode(fNodeIdent, tx, Collections.unmodifiableList(existGroupKeys));
152                 /* Notification for continue collecting statistics */
153                 notifyToCollectNextStatistics(nodeIdent, transId);
154             }
155
156             @Override
157             public UUID generatedUUIDForNode() {
158                 return manager.getGeneratedUUIDForNode(getNodeIdentifier());
159             }
160         });
161     }
162
163     @Override
164     public void onGroupFeaturesUpdated(final GroupFeaturesUpdated notification) {
165         Preconditions.checkNotNull(notification);
166         final TransactionId transId = notification.getTransactionId();
167         final NodeId nodeId = notification.getId();
168         if ( ! isExpectedStatistics(transId, nodeId)) {
169             LOG.debug("Unregistred notification detect TransactionId {}", transId);
170             return;
171         }
172         manager.getRpcMsgManager().addNotification(notification, nodeId);
173         if (notification.isMoreReplies()) {
174             return;
175         }
176
177         /* Don't block RPC Notification thread */
178         manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
179             @Override
180             public void applyOperation(final ReadWriteTransaction tx) {
181                 /* Get and Validate TransactionCacheContainer */
182                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
183                 if ( ! isTransactionCacheContainerValid(txContainer)) {
184                     return;
185                 }
186
187                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
188                         .create(Nodes.class).child(Node.class, new NodeKey(nodeId));
189
190                 manager.registerAdditionalNodeFeature(nodeIdent, StatCapabTypes.GROUP_STATS);
191
192                 if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
193
194                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
195                 for (final TransactionAware notif : cacheNotifs) {
196                     if ( ! (notif instanceof GroupFeaturesUpdated)) {
197                         break;
198                     }
199                     final GroupFeatures stats = new GroupFeaturesBuilder((GroupFeaturesUpdated)notif).build();
200                     final InstanceIdentifier<NodeGroupFeatures> nodeGroupFeatureIdent =
201                             nodeIdent.augmentation(NodeGroupFeatures.class);
202                     final InstanceIdentifier<GroupFeatures> groupFeatureIdent = nodeGroupFeatureIdent
203                             .child(GroupFeatures.class);
204                     Optional<Node> node = Optional.absent();
205                     try {
206                         node = tx.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet();
207                     }
208                     catch (final ReadFailedException e) {
209                         LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
210                     }
211                     if (node.isPresent()) {
212                         tx.merge(LogicalDatastoreType.OPERATIONAL, nodeGroupFeatureIdent, new NodeGroupFeaturesBuilder().build(), true);
213                         tx.put(LogicalDatastoreType.OPERATIONAL, groupFeatureIdent, stats);
214                         manager.unregisterNodeStats(nodeIdent, StatCapabTypes.GROUP_FEATURE_STATS);
215                     } else {
216                         LOG.debug("Node {} is NOT present in the operational data store",nodeId);
217                     }
218                 }
219             }
220
221             @Override
222             public UUID generatedUUIDForNode() {
223                 return manager.getGeneratedUUIDForNode(getNodeIdentifier());
224             }
225         });
226     }
227
228     @Override
229     public void onGroupStatisticsUpdated(final GroupStatisticsUpdated notification) {
230         Preconditions.checkNotNull(notification);
231         final TransactionId transId = notification.getTransactionId();
232         final NodeId nodeId = notification.getId();
233         if ( ! isExpectedStatistics(transId, nodeId)) {
234             LOG.debug("STAT-MANAGER - GroupStatisticsUpdated: unregistred notification detect TransactionId {}", transId);
235             return;
236         }
237         manager.getRpcMsgManager().addNotification(notification, nodeId);
238         if (notification.isMoreReplies()) {
239             return;
240         }
241
242         /* Don't block RPC Notification thread */
243         manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
244             @Override
245             public void applyOperation(final ReadWriteTransaction tx) {
246
247                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
248                         .create(Nodes.class).child(Node.class, new NodeKey(nodeId));
249                 /* Node exist check */
250                 Optional<Node> node = Optional.absent();
251                 try {
252                     node = tx.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet();
253                 }
254                 catch (final ReadFailedException e) {
255                     LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
256                 }
257                 if ( ! node.isPresent()) {
258                     return;
259                 }
260
261                 /* Get and Validate TransactionCacheContainer */
262                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
263                 if ( ! isTransactionCacheContainerValid(txContainer)) {
264                     return;
265                 }
266
267                 if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
268
269                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
270
271                 Optional<Group> notifGroup = Optional.absent();
272                 final Optional<? extends DataObject> inputObj = txContainer.get().getConfInput();
273                 if (inputObj.isPresent() && inputObj.get() instanceof Group) {
274                     notifGroup = Optional.<Group> of((Group)inputObj.get());
275                 }
276                 for (final TransactionAware notif : cacheNotifs) {
277                     if ( ! (notif instanceof GroupStatisticsUpdated)) {
278                         break;
279                     }
280                     statGroupCommit(((GroupStatisticsUpdated) notif).getGroupStats(), nodeIdent, tx);
281                 }
282                 if ( ! notifGroup.isPresent()) {
283                     notifyToCollectNextStatistics(nodeIdent, transId);
284                 }
285             }
286
287             public UUID generatedUUIDForNode() {
288                 return manager.getGeneratedUUIDForNode(getNodeIdentifier());
289             }
290         });
291     }
292
293     private void statGroupCommit(final List<GroupStats> groupStats, final InstanceIdentifier<Node> nodeIdent,
294             final ReadWriteTransaction tx) {
295
296         Preconditions.checkNotNull(groupStats);
297         Preconditions.checkNotNull(nodeIdent);
298         Preconditions.checkNotNull(tx);
299
300         final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
301
302         for (final GroupStats gStat : groupStats) {
303             final GroupStatistics stats = new GroupStatisticsBuilder(gStat).build();
304
305             final InstanceIdentifier<Group> groupIdent = fNodeIdent.child(Group.class, new GroupKey(gStat.getGroupId()));
306             final InstanceIdentifier<NodeGroupStatistics> nGroupStatIdent =groupIdent
307                     .augmentation(NodeGroupStatistics.class);
308             final InstanceIdentifier<GroupStatistics> gsIdent = nGroupStatIdent.child(GroupStatistics.class);
309             /* Statistics Writing */
310             Optional<Group> group = Optional.absent();
311             try {
312                 group = tx.read(LogicalDatastoreType.OPERATIONAL, groupIdent).checkedGet();
313             }
314             catch (final ReadFailedException e) {
315                 LOG.debug("Read Operational/DS for Group node fail! {}", groupIdent, e);
316             }
317             if (group.isPresent()) {
318                 tx.merge(LogicalDatastoreType.OPERATIONAL, nGroupStatIdent, new NodeGroupStatisticsBuilder().build(), true);
319                 tx.put(LogicalDatastoreType.OPERATIONAL, gsIdent, stats);
320             }
321         }
322     }
323
324     private void statGroupDescCommit(final Optional<TransactionCacheContainer<?>> txContainer, final ReadWriteTransaction tx,
325             final InstanceIdentifier<FlowCapableNode> fNodeIdent, final List<GroupKey> existGroupKeys) {
326
327         Preconditions.checkNotNull(existGroupKeys);
328         Preconditions.checkNotNull(txContainer);
329         Preconditions.checkNotNull(fNodeIdent);
330         Preconditions.checkNotNull(tx);
331
332         final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
333         for (final TransactionAware notif : cacheNotifs) {
334             if ( ! (notif instanceof GroupDescStatsUpdated)) {
335                 break;
336             }
337             final List<GroupDescStats> groupStats = ((GroupDescStatsUpdated) notif).getGroupDescStats();
338             if (groupStats == null) {
339                 break;
340             }
341             for (final GroupDescStats group : groupStats) {
342                 if (group.getGroupId() != null) {
343                     final GroupBuilder groupBuilder = new GroupBuilder(group);
344                     final GroupKey groupKey = new GroupKey(group.getGroupId());
345                     final InstanceIdentifier<Group> groupRef = fNodeIdent.child(Group.class,groupKey);
346
347                     final NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
348                     groupDesc.setGroupDesc(new GroupDescBuilder(group).build());
349                     //Update augmented data
350                     groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
351                     existGroupKeys.remove(groupKey);
352                     tx.put(LogicalDatastoreType.OPERATIONAL, groupRef, groupBuilder.build());
353                 }
354             }
355         }
356     }
357
358     private void deleteAllNotPresentNode(final InstanceIdentifier<FlowCapableNode> fNodeIdent,
359             final ReadWriteTransaction trans, final List<GroupKey> deviceGroupKeys) {
360
361         Preconditions.checkNotNull(fNodeIdent);
362         Preconditions.checkNotNull(trans);
363
364         if (deviceGroupKeys == null) {
365             return;
366         }
367
368         for (final GroupKey key : deviceGroupKeys) {
369             final InstanceIdentifier<Group> delGroupIdent = fNodeIdent.child(Group.class, key);
370             LOG.trace("Group {} has to removed.", key);
371             Optional<Group> delGroup = Optional.absent();
372             try {
373                 delGroup = trans.read(LogicalDatastoreType.OPERATIONAL, delGroupIdent).checkedGet();
374             }
375             catch (final ReadFailedException e) {
376                 // NOOP - probably another transaction delete that node
377                 LOG.debug("Group {} was probably deleted via other transaction. Exception {}", delGroupIdent, e);
378             }
379             if (delGroup.isPresent()) {
380                 trans.delete(LogicalDatastoreType.OPERATIONAL, delGroupIdent);
381             }
382         }
383     }
384 }
385