Merge "Initial implementation of the ClusteredDataStore"
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / GroupConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.controller.sal.core.IContainer;
24 import org.opendaylight.controller.sal.core.Node;
25 import org.opendaylight.controller.sal.utils.GlobalConstants;
26 import org.opendaylight.controller.sal.utils.Status;
27 import org.opendaylight.controller.sal.utils.StatusCode;
28 import org.opendaylight.controller.switchmanager.ISwitchManager;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.NotificationListener;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 @SuppressWarnings("unused")
51 public class GroupConsumerImpl {
52     
53     protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
54     private GroupEventListener groupEventListener = new GroupEventListener();
55     private Registration<NotificationListener> groupListener;
56     private SalGroupService groupService;    
57     private GroupDataCommitHandler commitHandler;
58     
59     private ConcurrentMap<GroupKey, Group> originalSwGroupView;
60     private ConcurrentMap<GroupKey, Group> installedSwGroupView;
61     
62     private ConcurrentMap<Node, List<Group>> nodeGroups;
63     private ConcurrentMap<GroupKey, Group> inactiveGroups;
64     
65     private IClusterContainerServices clusterGroupContainerService = null;
66     private ISwitchManager switchGroupManager;
67     private IContainer container;
68     
69     public GroupConsumerImpl() {
70             InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Groups.class).toInstance();
71         groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
72         
73         clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();
74         switchGroupManager = FRMConsumerImpl.getSwitchManager();
75         container = FRMConsumerImpl.getContainer();
76         
77         if (!(cacheStartup())) {
78             logger.error("Unanle to allocate/retrieve group cache");
79             System.out.println("Unable to allocate/retrieve group cache");
80         }
81         
82         if (null == groupService) {
83             logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
84             System.out.println("Consumer SAL Group Service is down or NULL.");
85             return;
86         }     
87         
88         // For switch events
89         groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
90         
91         if (null == groupListener) {
92             logger.error("Listener to listen on group data modifcation events");
93             System.out.println("Listener to listen on group data modifcation events.");
94             return;
95         }       
96         
97         commitHandler = new GroupDataCommitHandler();
98         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
99         }
100         
101     private boolean allocateGroupCaches() {
102         if (this.clusterGroupContainerService == null) {
103             logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
104             return false;
105         }       
106
107         try {
108             clusterGroupContainerService.createCache("frm.originalSwGroupView",
109                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
110
111             clusterGroupContainerService.createCache("frm.installedSwGroupView",
112                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
113
114             clusterGroupContainerService.createCache("frm.inactiveGroups",
115                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
116
117             clusterGroupContainerService.createCache("frm.nodeGroups",
118                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
119             
120 //TODO for cluster mode
121            /* clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
122                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
123
124             clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
125                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/
126             
127         } catch (CacheConfigException cce) {            
128             logger.error("Group CacheConfigException");
129             return false;
130             
131         } catch (CacheExistException cce) {
132             logger.error(" Group CacheExistException");           
133         }
134         
135         return true;
136     }
137     
138     private void nonClusterGroupObjectCreate() {
139         originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
140         installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
141         nodeGroups = new ConcurrentHashMap<Node, List<Group>>();        
142         inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
143     }
144     
145     @SuppressWarnings({ "unchecked" })
146     private boolean retrieveGroupCaches() {
147         ConcurrentMap<?, ?> map;
148
149         if (this.clusterGroupContainerService == null) {
150             logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
151             nonClusterGroupObjectCreate();
152             return false;
153         }       
154
155         map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
156         if (map != null) {
157             originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
158         } else {
159             logger.error("Retrieval of cache(originalSwGroupView) failed");
160             return false;
161         }
162
163         map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
164         if (map != null) {
165             installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
166         } else {
167             logger.error("Retrieval of cache(installedSwGroupView) failed");
168             return false;
169         }
170
171         map = clusterGroupContainerService.getCache("frm.inactiveGroups");
172         if (map != null) {
173             inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
174         } else {
175             logger.error("Retrieval of cache(inactiveGroups) failed");
176             return false;
177         }
178
179         map = clusterGroupContainerService.getCache("frm.nodeGroups");
180         if (map != null) {
181             nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
182         } else {
183             logger.error("Retrieval of cache(nodeGroup) failed");
184             return false;
185         }
186         
187         return true;
188     }
189         
190     private boolean cacheStartup() {
191         if (allocateGroupCaches()) {
192             if (retrieveGroupCaches()) {
193                 return true;
194             }
195         }
196         
197         return false;
198     }
199     
200     public Status validateGroup(Group group, FRMUtil.operation operation) {
201         String containerName;
202         String groupName;
203         Iterator<Bucket> bucketIterator;
204         boolean returnResult;
205         Buckets groupBuckets;
206         
207         if (null != group) {
208             containerName = group.getContainerName();
209             
210             if (null == containerName) {
211                 containerName = GlobalConstants.DEFAULT.toString();
212             }
213             else if (!FRMUtil.isNameValid(containerName)) {
214                 logger.error("Container Name is invalid %s" + containerName);
215                 return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
216             }
217             
218             groupName = group.getGroupName();
219             if (!FRMUtil.isNameValid(groupName)) {
220                 logger.error("Group Name is invalid %s" + groupName);
221                 return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
222             }
223             
224             returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName);
225             
226             if (FRMUtil.operation.ADD == operation && returnResult) {
227                 logger.error("Record with same Group Name exists");
228                 return new Status(StatusCode.BADREQUEST, "Group record exists");
229             }
230             else if (!returnResult) {
231                 logger.error("Group record does not exist");
232                 return new Status(StatusCode.BADREQUEST, "Group record does not exist");
233             }
234             
235             if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && 
236                     group.getGroupType().getIntValue() <= GroupType.GroupFf.getIntValue())) {
237                 logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
238                 return new Status(StatusCode.BADREQUEST, "Invalid Group type");                
239             }
240             
241             groupBuckets = group.getBuckets();
242                     
243             if (null != groupBuckets && null != groupBuckets.getBucket()) {
244                 bucketIterator = groupBuckets.getBucket().iterator();
245                 
246                 while (bucketIterator.hasNext()) {
247                     if(!(FRMUtil.areActionsValid(bucketIterator.next().getActions()))) {
248                         logger.error("Error in action bucket");
249                         return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
250                     }                                
251                 }
252             }                
253         }
254         
255         return new Status(StatusCode.SUCCESS);
256         
257     }
258     
259     private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) {
260         if (! originalSwGroupView.containsKey(key)) {
261             return false;
262         }
263         
264         for (ConcurrentMap.Entry<GroupKey, Group> entry : originalSwGroupView.entrySet()) {
265             if (entry.getValue().getGroupName().equals(groupName)) {
266                 if (entry.getValue().getContainerName().equals(containerName)) {
267                     return true;
268                 }
269             }
270         }
271         return false;
272     }
273
274     
275     /**
276      * Update Group entries to the southbound plugin/inventory and our internal database
277      *
278      * @param path
279      * @param dataObject
280      */
281     private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
282         GroupKey groupKey = groupUpdateDataObject.getKey();
283         Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
284         
285         if (!groupOperationStatus.isSuccess()) {
286             logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
287             return groupOperationStatus;
288         }
289             
290         originalSwGroupView.remove(groupKey);
291         originalSwGroupView.put(groupKey, groupUpdateDataObject);
292         
293         if (groupUpdateDataObject.isInstall()) {
294             UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
295             //TODO how to get original group and modified group. 
296             
297             if (installedSwGroupView.containsKey(groupKey)) {
298                 installedSwGroupView.remove(groupKey);
299             }
300             
301             installedSwGroupView.put(groupKey, groupUpdateDataObject);
302             groupService.updateGroup(groupData.build());
303         }
304         
305         return groupOperationStatus;
306     }
307     
308     /**
309      * Adds Group to the southbound plugin and our internal database
310      *
311      * @param path
312      * @param dataObject
313      */
314     private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
315         GroupKey groupKey = groupAddDataObject.getKey();
316         Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
317         
318         if (!groupOperationStatus.isSuccess()) {
319             logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
320             return groupOperationStatus;
321         }
322         validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
323         originalSwGroupView.put(groupKey, groupAddDataObject);
324         
325         if (groupAddDataObject.isInstall()) {
326             AddGroupInputBuilder groupData = new AddGroupInputBuilder();
327             groupData.setBuckets(groupAddDataObject.getBuckets());
328             groupData.setContainerName(groupAddDataObject.getContainerName());
329             groupData.setGroupId(groupAddDataObject.getGroupId());
330             groupData.setGroupType(groupAddDataObject.getGroupType());
331             groupData.setNode(groupAddDataObject.getNode());  
332             installedSwGroupView.put(groupKey, groupAddDataObject);
333             groupService.addGroup(groupData.build());
334         }
335         
336         return groupOperationStatus;
337     }
338     
339         private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
340         for(Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
341             
342             if (!addGroup(entry.getKey(),entry.getValue()).isSuccess()) {
343                 return Rpcs.getRpcResult(false, null, null);
344             }
345         }
346         for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
347            
348             if (!updateGroup(entry.getKey(),entry.getValue()).isSuccess()) {
349                 return Rpcs.getRpcResult(false, null, null);
350             }
351         }
352         
353         for(InstanceIdentifier<?> removal : transaction.removals) {
354            // removeFlow(removal);
355         }
356         
357         return Rpcs.getRpcResult(true, null, null);
358     }
359     
360     private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
361
362          @SuppressWarnings("unchecked")
363         @Override
364          public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
365              // We should verify transaction
366              System.out.println("Coming in FlowDatacommitHandler");
367              internalTransaction transaction = new internalTransaction(modification);
368              transaction.prepareUpdate();
369              return transaction;
370          }
371     }
372
373     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
374
375         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
376
377         @Override
378         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
379             return modification;
380         }
381
382         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
383             this.modification = modification;
384         }
385
386         Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
387         Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
388         Set<InstanceIdentifier<?>> removals = new HashSet<>();
389
390         /**
391          * We create a plan which flows will be added, which will be updated and
392          * which will be removed based on our internal state.
393          * 
394          */
395         void prepareUpdate() {
396
397             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
398             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
399                 if (entry.getValue() instanceof Group) {                    
400                     Group group = (Group) entry.getValue();                    
401                     preparePutEntry(entry.getKey(), group);
402                 }
403
404             }
405
406             removals = modification.getRemovedConfigurationData();
407         }
408
409         private void preparePutEntry(InstanceIdentifier<?> key, Group group) {
410             
411             Group original = originalSwGroupView.get(key);
412             if (original != null) {
413                 // It is update for us
414                 
415                 updates.put(key, group);               
416             } else {
417                 // It is addition for us
418                 
419                 additions.put(key, group);
420             }
421         }
422
423         /**
424          * We are OK to go with execution of plan
425          * 
426          */
427         @Override
428         public RpcResult<Void> finish() throws IllegalStateException {
429             
430             RpcResult<Void> rpcStatus = commitToPlugin(this);
431             // We return true if internal transaction is successful.
432           //  return Rpcs.getRpcResult(true, null, Collections.emptySet());
433             return rpcStatus;
434         }
435
436         /**
437          * 
438          * We should rollback our preparation
439          * 
440          */
441         @Override
442         public RpcResult<Void> rollback() throws IllegalStateException {
443             // NOOP - we did not modified any internal state during
444             // requestCommit phase
445            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
446             return Rpcs.getRpcResult(true, null, null);
447             
448         }
449         
450     }
451     
452         
453         final class GroupEventListener implements SalGroupListener {
454             
455         List<GroupAdded> addedGroups = new ArrayList<>();
456         List<GroupRemoved> removedGroups = new ArrayList<>();
457         List<GroupUpdated> updatedGroups = new ArrayList<>();
458        
459
460         @Override
461         public void onGroupAdded(GroupAdded notification) {
462             System.out.println("added Group..........................");
463             addedGroups.add(notification);            
464         }
465
466         @Override
467         public void onGroupRemoved(GroupRemoved notification) {
468             // TODO Auto-generated method stub
469             
470         }
471
472         @Override
473         public void onGroupUpdated(GroupUpdated notification) {
474             // TODO Auto-generated method stub
475             
476         }    
477     }
478 }