Flow removed switch event. Group and Meter update RPC
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / GroupConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.controller.sal.core.IContainer;
24 import org.opendaylight.controller.sal.core.Node;
25 import org.opendaylight.controller.sal.utils.GlobalConstants;
26 import org.opendaylight.controller.sal.utils.Status;
27 import org.opendaylight.controller.sal.utils.StatusCode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.UpdatedGroupBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.UpdatedMeterBuilder;
45 import org.opendaylight.yangtools.concepts.Registration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.opendaylight.yangtools.yang.binding.NotificationListener;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 @SuppressWarnings("unused")
54 public class GroupConsumerImpl {
55
56     protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
57     private GroupEventListener groupEventListener = new GroupEventListener();
58     private Registration<NotificationListener> groupListener;
59     private SalGroupService groupService;
60     private GroupDataCommitHandler commitHandler;
61
62     private ConcurrentMap<GroupKey, Group> originalSwGroupView;
63     private ConcurrentMap<GroupKey, Group> installedSwGroupView;
64
65     private ConcurrentMap<Node, List<Group>> nodeGroups;
66     private ConcurrentMap<GroupKey, Group> inactiveGroups;
67
68     private IClusterContainerServices clusterGroupContainerService = null;
69     private IContainer container;
70
71     public GroupConsumerImpl() {
72         
73             InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Groups.class).node(Group.class).toInstance();
74         groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
75
76         clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();
77         container = FRMConsumerImpl.getContainer();
78
79         if (!(cacheStartup())) {
80             logger.error("Unanle to allocate/retrieve group cache");
81             System.out.println("Unable to allocate/retrieve group cache");
82         }
83
84         if (null == groupService) {
85             logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
86             System.out.println("Consumer SAL Group Service is down or NULL.");
87             return;
88         }
89
90         // For switch events
91         groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
92
93         if (null == groupListener) {
94             logger.error("Listener to listen on group data modifcation events");
95             System.out.println("Listener to listen on group data modifcation events.");
96             return;
97         }
98
99         commitHandler = new GroupDataCommitHandler();
100         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
101     }
102
103     private boolean allocateGroupCaches() {
104         if (this.clusterGroupContainerService == null) {
105             logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
106             return false;
107         }
108
109         try {
110             clusterGroupContainerService.createCache("frm.originalSwGroupView",
111                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
112
113             clusterGroupContainerService.createCache("frm.installedSwGroupView",
114                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
115
116             clusterGroupContainerService.createCache("frm.inactiveGroups",
117                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
118
119             clusterGroupContainerService.createCache("frm.nodeGroups",
120                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
121             
122 //TODO for cluster mode
123            /* clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
124                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
125
126             clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
127                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/
128             
129         } catch (CacheConfigException cce) {            
130             logger.error("Group CacheConfigException");
131             return false;
132             
133         } catch (CacheExistException cce) {
134             logger.error(" Group CacheExistException");           
135         }
136         
137         return true;
138     }
139     
140     private void nonClusterGroupObjectCreate() {
141         originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
142         installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
143         nodeGroups = new ConcurrentHashMap<Node, List<Group>>();        
144         inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
145     }
146     
147     @SuppressWarnings({ "unchecked" })
148     private boolean retrieveGroupCaches() {
149         ConcurrentMap<?, ?> map;
150
151         if (this.clusterGroupContainerService == null) {
152             logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
153             nonClusterGroupObjectCreate();
154             return false;
155         }       
156
157         map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
158         if (map != null) {
159             originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
160         } else {
161             logger.error("Retrieval of cache(originalSwGroupView) failed");
162             return false;
163         }
164
165         map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
166         if (map != null) {
167             installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
168         } else {
169             logger.error("Retrieval of cache(installedSwGroupView) failed");
170             return false;
171         }
172
173         map = clusterGroupContainerService.getCache("frm.inactiveGroups");
174         if (map != null) {
175             inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
176         } else {
177             logger.error("Retrieval of cache(inactiveGroups) failed");
178             return false;
179         }
180
181         map = clusterGroupContainerService.getCache("frm.nodeGroups");
182         if (map != null) {
183             nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
184         } else {
185             logger.error("Retrieval of cache(nodeGroup) failed");
186             return false;
187         }
188
189         return true;
190     }
191
192     private boolean cacheStartup() {
193         if (allocateGroupCaches()) {
194             if (retrieveGroupCaches()) {
195                 return true;
196             }
197         }
198
199         return false;
200     }
201
202     public Status validateGroup(Group group, FRMUtil.operation operation) {
203         String containerName;
204         String groupName;
205         Iterator<Bucket> bucketIterator;
206         boolean returnResult;
207         Buckets groupBuckets;
208
209         if (null != group) {
210             containerName = group.getContainerName();
211
212             if (null == containerName) {
213                 containerName = GlobalConstants.DEFAULT.toString();
214             } else if (!FRMUtil.isNameValid(containerName)) {
215                 logger.error("Container Name is invalid %s" + containerName);
216                 return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
217             }
218
219             groupName = group.getGroupName();
220             if (!FRMUtil.isNameValid(groupName)) {
221                 logger.error("Group Name is invalid %s" + groupName);
222                 return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
223             }
224
225             returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName);
226
227             if (FRMUtil.operation.ADD == operation && returnResult) {
228                 logger.error("Record with same Group Name exists");
229                 return new Status(StatusCode.BADREQUEST, "Group record exists");
230             } else if (!returnResult) {
231                 logger.error("Group record does not exist");
232                 return new Status(StatusCode.BADREQUEST, "Group record does not exist");
233             }
234             
235             if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && 
236                     group.getGroupType().getIntValue() <= GroupType.GroupFf.getIntValue())) {
237                 logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
238                 return new Status(StatusCode.BADREQUEST, "Invalid Group type");                
239             }
240
241             groupBuckets = group.getBuckets();
242
243             if (null != groupBuckets && null != groupBuckets.getBucket()) {
244                 bucketIterator = groupBuckets.getBucket().iterator();
245
246                 while (bucketIterator.hasNext()) {
247                     if (!(FRMUtil.validateActions(bucketIterator.next().getAction()))) {
248                         logger.error("Error in action bucket");
249                         return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
250                     }
251                 }
252             }
253         }
254
255         return new Status(StatusCode.SUCCESS);
256
257     }
258
259     private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) {
260         if (!originalSwGroupView.containsKey(key)) {
261             return false;
262         }
263
264         for (ConcurrentMap.Entry<GroupKey, Group> entry : originalSwGroupView.entrySet()) {
265             if (entry.getValue().getGroupName().equals(groupName)) {
266                 if (entry.getValue().getContainerName().equals(containerName)) {
267                     return true;
268                 }
269             }
270         }
271         return false;
272     }
273
274     /**
275      * Update Group entries to the southbound plugin/inventory and our internal
276      * database
277      *
278      * @param path
279      * @param dataObject
280      */
281     private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
282         GroupKey groupKey = groupUpdateDataObject.getKey();        
283         UpdatedGroupBuilder updateGroupBuilder = null;
284         
285         Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
286         
287         if (!groupOperationStatus.isSuccess()) {
288             logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
289             return groupOperationStatus;
290         }
291          
292         if (originalSwGroupView.containsKey(groupKey)) {
293             originalSwGroupView.remove(groupKey);
294             originalSwGroupView.put(groupKey, groupUpdateDataObject);
295         }
296         
297         if (groupUpdateDataObject.isInstall()) {
298             UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
299             updateGroupBuilder = new UpdatedGroupBuilder();
300             updateGroupBuilder.fieldsFrom(groupUpdateDataObject);
301             groupData.setUpdatedGroup(updateGroupBuilder.build());
302             //TODO how to get original group and modified group. 
303             
304             if (installedSwGroupView.containsKey(groupKey)) {
305                 installedSwGroupView.remove(groupKey);
306                 installedSwGroupView.put(groupKey, groupUpdateDataObject);
307             }
308             
309             groupService.updateGroup(groupData.build());
310         }
311         
312         return groupOperationStatus;
313     }
314     
315     /**
316      * Adds Group to the southbound plugin and our internal database
317      *
318      * @param path
319      * @param dataObject
320      */
321     private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
322         GroupKey groupKey = groupAddDataObject.getKey();
323         Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
324
325         if (!groupOperationStatus.isSuccess()) {
326             logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
327             return groupOperationStatus;
328         }
329
330         originalSwGroupView.put(groupKey, groupAddDataObject);
331
332         if (groupAddDataObject.isInstall()) {
333             AddGroupInputBuilder groupData = new AddGroupInputBuilder();
334             groupData.setBuckets(groupAddDataObject.getBuckets());
335             groupData.setContainerName(groupAddDataObject.getContainerName());
336             groupData.setGroupId(groupAddDataObject.getGroupId());
337             groupData.setGroupType(groupAddDataObject.getGroupType());
338             groupData.setNode(groupAddDataObject.getNode());
339             installedSwGroupView.put(groupKey, groupAddDataObject);
340             groupService.addGroup(groupData.build());
341         }
342
343         return groupOperationStatus;
344     }
345     
346         private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
347         for(Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
348             
349             if (!addGroup(entry.getKey(),entry.getValue()).isSuccess()) {
350                 transaction.additions.remove(entry.getKey());
351                 return Rpcs.getRpcResult(false, null, null);
352             }
353         }
354         
355         for(Entry<InstanceIdentifier<?>, Group> entry :transaction.updates.entrySet()) {
356            
357             if (!updateGroup(entry.getKey(),entry.getValue()).isSuccess()) {
358                 transaction.updates.remove(entry.getKey());
359                 return Rpcs.getRpcResult(false, null, null);
360             }
361         }
362         
363         for(InstanceIdentifier<?> removal : transaction.removals) {
364            // removeFlow(removal);
365         }
366
367         return Rpcs.getRpcResult(true, null, null);
368     }
369
370     private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
371
372         @SuppressWarnings("unchecked")
373         @Override
374         public DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
375             // We should verify transaction
376             System.out.println("Coming in GroupDatacommitHandler");
377             internalTransaction transaction = new internalTransaction(modification);
378             transaction.prepareUpdate();
379             return transaction;
380         }
381     }
382
383     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
384
385         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
386
387         @Override
388         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
389             return modification;
390         }
391
392         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
393             this.modification = modification;
394         }
395
396         Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
397         Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
398         Set<InstanceIdentifier<?>> removals = new HashSet<>();
399
400         /**
401          * We create a plan which flows will be added, which will be updated and
402          * which will be removed based on our internal state.
403          *
404          */
405         void prepareUpdate() {
406
407             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
408             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
409                 if (entry.getValue() instanceof Group) {
410                     Group group = (Group) entry.getValue();
411                     preparePutEntry(entry.getKey(), group);
412                 }
413
414             }
415
416             removals = modification.getRemovedConfigurationData();
417         }
418
419         private void preparePutEntry(InstanceIdentifier<?> key, Group group) {
420
421             Group original = originalSwGroupView.get(key);
422             if (original != null) {
423                 // It is update for us
424
425                 updates.put(key, group);
426             } else {
427                 // It is addition for us
428
429                 additions.put(key, group);
430             }
431         }
432
433         /**
434          * We are OK to go with execution of plan
435          *
436          */
437         @Override
438         public RpcResult<Void> finish() throws IllegalStateException {
439
440             RpcResult<Void> rpcStatus = commitToPlugin(this);
441             // We return true if internal transaction is successful.
442             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
443             return rpcStatus;
444         }
445
446         /**
447          *
448          * We should rollback our preparation
449          *
450          */
451         @Override
452         public RpcResult<Void> rollback() throws IllegalStateException {
453             // NOOP - we did not modified any internal state during
454             // requestCommit phase
455             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
456             return Rpcs.getRpcResult(true, null, null);
457
458         }
459
460     }
461
462     final class GroupEventListener implements SalGroupListener {
463
464         List<GroupAdded> addedGroups = new ArrayList<>();
465         List<GroupRemoved> removedGroups = new ArrayList<>();
466         List<GroupUpdated> updatedGroups = new ArrayList<>();
467
468         @Override
469         public void onGroupAdded(GroupAdded notification) {
470             System.out.println("added Group..........................");
471             addedGroups.add(notification);
472         }
473
474         @Override
475         public void onGroupRemoved(GroupRemoved notification) {
476             // TODO Auto-generated method stub
477
478         }
479
480         @Override
481         public void onGroupUpdated(GroupUpdated notification) {
482             // TODO Auto-generated method stub
483
484         }
485     }
486 }