Added meter, action, group, flow models, mask and transactions support.
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / GroupConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.controller.sal.core.IContainer;
24 import org.opendaylight.controller.sal.core.Node;
25 import org.opendaylight.controller.sal.utils.GlobalConstants;
26 import org.opendaylight.controller.sal.utils.Status;
27 import org.opendaylight.controller.sal.utils.StatusCode;
28 import org.opendaylight.controller.switchmanager.ISwitchManager;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.NotificationListener;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 @SuppressWarnings("unused")
51 public class GroupConsumerImpl {
52     
53     protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
54     private GroupEventListener groupEventListener = new GroupEventListener();
55     private Registration<NotificationListener> groupListener;
56     private SalGroupService groupService;    
57     private GroupDataCommitHandler commitHandler;
58     
59     private ConcurrentMap<GroupKey, Group> originalSwGroupView;
60     private ConcurrentMap<GroupKey, Group> installedSwGroupView;
61     
62     private ConcurrentMap<Node, List<Group>> nodeGroups;
63     private ConcurrentMap<GroupKey, Group> inactiveGroups;
64     
65     private IClusterContainerServices clusterGroupContainerService = null;   
66     private IContainer container;
67     
68     public GroupConsumerImpl() {
69             InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Groups.class).node(Group.class).toInstance();
70         groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
71         
72         clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();        
73         container = FRMConsumerImpl.getContainer();
74         
75         if (!(cacheStartup())) {
76             logger.error("Unanle to allocate/retrieve group cache");
77             System.out.println("Unable to allocate/retrieve group cache");
78         }
79         
80         if (null == groupService) {
81             logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
82             System.out.println("Consumer SAL Group Service is down or NULL.");
83             return;
84         }     
85         
86         // For switch events
87         groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
88         
89         if (null == groupListener) {
90             logger.error("Listener to listen on group data modifcation events");
91             System.out.println("Listener to listen on group data modifcation events.");
92             return;
93         }       
94         
95         commitHandler = new GroupDataCommitHandler();
96         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
97         }
98         
99     private boolean allocateGroupCaches() {
100         if (this.clusterGroupContainerService == null) {
101             logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
102             return false;
103         }       
104
105         try {
106             clusterGroupContainerService.createCache("frm.originalSwGroupView",
107                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
108
109             clusterGroupContainerService.createCache("frm.installedSwGroupView",
110                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
111
112             clusterGroupContainerService.createCache("frm.inactiveGroups",
113                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
114
115             clusterGroupContainerService.createCache("frm.nodeGroups",
116                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
117             
118 //TODO for cluster mode
119            /* clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
120                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
121
122             clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
123                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/
124             
125         } catch (CacheConfigException cce) {            
126             logger.error("Group CacheConfigException");
127             return false;
128             
129         } catch (CacheExistException cce) {
130             logger.error(" Group CacheExistException");           
131         }
132         
133         return true;
134     }
135     
136     private void nonClusterGroupObjectCreate() {
137         originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
138         installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
139         nodeGroups = new ConcurrentHashMap<Node, List<Group>>();        
140         inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
141     }
142     
143     @SuppressWarnings({ "unchecked" })
144     private boolean retrieveGroupCaches() {
145         ConcurrentMap<?, ?> map;
146
147         if (this.clusterGroupContainerService == null) {
148             logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
149             nonClusterGroupObjectCreate();
150             return false;
151         }       
152
153         map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
154         if (map != null) {
155             originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
156         } else {
157             logger.error("Retrieval of cache(originalSwGroupView) failed");
158             return false;
159         }
160
161         map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
162         if (map != null) {
163             installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
164         } else {
165             logger.error("Retrieval of cache(installedSwGroupView) failed");
166             return false;
167         }
168
169         map = clusterGroupContainerService.getCache("frm.inactiveGroups");
170         if (map != null) {
171             inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
172         } else {
173             logger.error("Retrieval of cache(inactiveGroups) failed");
174             return false;
175         }
176
177         map = clusterGroupContainerService.getCache("frm.nodeGroups");
178         if (map != null) {
179             nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
180         } else {
181             logger.error("Retrieval of cache(nodeGroup) failed");
182             return false;
183         }
184         
185         return true;
186     }
187         
188     private boolean cacheStartup() {
189         if (allocateGroupCaches()) {
190             if (retrieveGroupCaches()) {
191                 return true;
192             }
193         }
194         
195         return false;
196     }
197     
198     public Status validateGroup(Group group, FRMUtil.operation operation) {
199         String containerName;
200         String groupName;
201         Iterator<Bucket> bucketIterator;
202         boolean returnResult;
203         Buckets groupBuckets;
204         
205         if (null != group) {
206             containerName = group.getContainerName();
207             
208             if (null == containerName) {
209                 containerName = GlobalConstants.DEFAULT.toString();
210             }
211             else if (!FRMUtil.isNameValid(containerName)) {
212                 logger.error("Container Name is invalid %s" + containerName);
213                 return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
214             }
215             
216             groupName = group.getGroupName();
217             if (!FRMUtil.isNameValid(groupName)) {
218                 logger.error("Group Name is invalid %s" + groupName);
219                 return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
220             }
221             
222             returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName);
223             
224             if (FRMUtil.operation.ADD == operation && returnResult) {
225                 logger.error("Record with same Group Name exists");
226                 return new Status(StatusCode.BADREQUEST, "Group record exists");
227             }
228             else if (!returnResult) {
229                 logger.error("Group record does not exist");
230                 return new Status(StatusCode.BADREQUEST, "Group record does not exist");
231             }
232             
233             if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && 
234                     group.getGroupType().getIntValue() <= GroupType.GroupFf.getIntValue())) {
235                 logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
236                 return new Status(StatusCode.BADREQUEST, "Invalid Group type");                
237             }
238             
239             groupBuckets = group.getBuckets();
240                     
241             if (null != groupBuckets && null != groupBuckets.getBucket()) {
242                 bucketIterator = groupBuckets.getBucket().iterator();
243                 
244                 while (bucketIterator.hasNext()) {
245                     if(!(FRMUtil.areActionsValid(bucketIterator.next().getActions()))) {
246                         logger.error("Error in action bucket");
247                         return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
248                     }                                
249                 }
250             }                
251         }
252         
253         return new Status(StatusCode.SUCCESS);
254         
255     }
256     
257     private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) {
258         if (! originalSwGroupView.containsKey(key)) {
259             return false;
260         }
261         
262         for (ConcurrentMap.Entry<GroupKey, Group> entry : originalSwGroupView.entrySet()) {
263             if (entry.getValue().getGroupName().equals(groupName)) {
264                 if (entry.getValue().getContainerName().equals(containerName)) {
265                     return true;
266                 }
267             }
268         }
269         return false;
270     }
271
272     
273     /**
274      * Update Group entries to the southbound plugin/inventory and our internal database
275      *
276      * @param path
277      * @param dataObject
278      */
279     private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
280         GroupKey groupKey = groupUpdateDataObject.getKey();        
281         Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
282         
283         if (!groupOperationStatus.isSuccess()) {
284             logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
285             return groupOperationStatus;
286         }
287             
288         originalSwGroupView.remove(groupKey);
289         originalSwGroupView.put(groupKey, groupUpdateDataObject);
290         
291         if (groupUpdateDataObject.isInstall()) {
292             UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
293             //TODO how to get original group and modified group. 
294             
295             if (installedSwGroupView.containsKey(groupKey)) {
296                 installedSwGroupView.remove(groupKey);
297             }
298             
299             installedSwGroupView.put(groupKey, groupUpdateDataObject);
300             groupService.updateGroup(groupData.build());
301         }
302         
303         return groupOperationStatus;
304     }
305     
306     /**
307      * Adds Group to the southbound plugin and our internal database
308      *
309      * @param path
310      * @param dataObject
311      */
312     private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
313         GroupKey groupKey = groupAddDataObject.getKey();
314         Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
315         
316         if (!groupOperationStatus.isSuccess()) {
317             logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
318             return groupOperationStatus;
319         }
320         
321         originalSwGroupView.put(groupKey, groupAddDataObject);
322         
323         if (groupAddDataObject.isInstall()) {
324             AddGroupInputBuilder groupData = new AddGroupInputBuilder();
325             groupData.setBuckets(groupAddDataObject.getBuckets());
326             groupData.setContainerName(groupAddDataObject.getContainerName());
327             groupData.setGroupId(groupAddDataObject.getGroupId());
328             groupData.setGroupType(groupAddDataObject.getGroupType());
329             groupData.setNode(groupAddDataObject.getNode());  
330             installedSwGroupView.put(groupKey, groupAddDataObject);
331             groupService.addGroup(groupData.build());
332         }
333         
334         return groupOperationStatus;
335     }
336     
337         private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
338         for(Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
339             
340             if (!addGroup(entry.getKey(),entry.getValue()).isSuccess()) {
341                 return Rpcs.getRpcResult(false, null, null);
342             }
343         }
344         for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
345            
346             if (!updateGroup(entry.getKey(),entry.getValue()).isSuccess()) {
347                 return Rpcs.getRpcResult(false, null, null);
348             }
349         }
350         
351         for(InstanceIdentifier<?> removal : transaction.removals) {
352            // removeFlow(removal);
353         }
354         
355         return Rpcs.getRpcResult(true, null, null);
356     }
357     
358     private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
359
360          @SuppressWarnings("unchecked")
361         @Override
362          public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
363              // We should verify transaction
364              System.out.println("Coming in GroupDatacommitHandler");
365              internalTransaction transaction = new internalTransaction(modification);
366              transaction.prepareUpdate();
367              return transaction;
368          }
369     }
370
371     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
372
373         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
374
375         @Override
376         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
377             return modification;
378         }
379
380         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
381             this.modification = modification;
382         }
383
384         Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
385         Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
386         Set<InstanceIdentifier<?>> removals = new HashSet<>();
387
388         /**
389          * We create a plan which flows will be added, which will be updated and
390          * which will be removed based on our internal state.
391          * 
392          */
393         void prepareUpdate() {
394
395             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
396             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
397                 if (entry.getValue() instanceof Group) {                    
398                     Group group = (Group) entry.getValue();                    
399                     preparePutEntry(entry.getKey(), group);
400                 }
401
402             }
403
404             removals = modification.getRemovedConfigurationData();
405         }
406
407         private void preparePutEntry(InstanceIdentifier<?> key, Group group) {
408             
409             Group original = originalSwGroupView.get(key);
410             if (original != null) {
411                 // It is update for us
412                 
413                 updates.put(key, group);               
414             } else {
415                 // It is addition for us
416                 
417                 additions.put(key, group);
418             }
419         }
420
421         /**
422          * We are OK to go with execution of plan
423          * 
424          */
425         @Override
426         public RpcResult<Void> finish() throws IllegalStateException {
427             
428             RpcResult<Void> rpcStatus = commitToPlugin(this);
429             // We return true if internal transaction is successful.
430           //  return Rpcs.getRpcResult(true, null, Collections.emptySet());
431             return rpcStatus;
432         }
433
434         /**
435          * 
436          * We should rollback our preparation
437          * 
438          */
439         @Override
440         public RpcResult<Void> rollback() throws IllegalStateException {
441             // NOOP - we did not modified any internal state during
442             // requestCommit phase
443            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
444             return Rpcs.getRpcResult(true, null, null);
445             
446         }
447         
448     }
449     
450         
451         final class GroupEventListener implements SalGroupListener {
452             
453         List<GroupAdded> addedGroups = new ArrayList<>();
454         List<GroupRemoved> removedGroups = new ArrayList<>();
455         List<GroupUpdated> updatedGroups = new ArrayList<>();
456        
457
458         @Override
459         public void onGroupAdded(GroupAdded notification) {
460             System.out.println("added Group..........................");
461             addedGroups.add(notification);            
462         }
463
464         @Override
465         public void onGroupRemoved(GroupRemoved notification) {
466             // TODO Auto-generated method stub
467             
468         }
469
470         @Override
471         public void onGroupUpdated(GroupUpdated notification) {
472             // TODO Auto-generated method stub
473             
474         }    
475     }
476 }