Added flow and group NSF.
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / GroupConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.controller.sal.core.IContainer;
24 import org.opendaylight.controller.sal.core.Node;
25 import org.opendaylight.controller.sal.utils.GlobalConstants;
26 import org.opendaylight.controller.sal.utils.Status;
27 import org.opendaylight.controller.sal.utils.StatusCode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.yang.binding.DataObject;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.opendaylight.yangtools.yang.binding.NotificationListener;
45 import org.opendaylight.yangtools.yang.common.RpcResult;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 @SuppressWarnings("unused")
50 public class GroupConsumerImpl {
51
52     protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
53     private GroupEventListener groupEventListener = new GroupEventListener();
54     private Registration<NotificationListener> groupListener;
55     private SalGroupService groupService;
56     private GroupDataCommitHandler commitHandler;
57
58     private ConcurrentMap<GroupKey, Group> originalSwGroupView;
59     private ConcurrentMap<GroupKey, Group> installedSwGroupView;
60
61     private ConcurrentMap<Node, List<Group>> nodeGroups;
62     private ConcurrentMap<GroupKey, Group> inactiveGroups;
63
64     private IClusterContainerServices clusterGroupContainerService = null;
65     private IContainer container;
66
67     public GroupConsumerImpl() {
68         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Groups.class)
69                 .node(Group.class).toInstance();
70         groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
71
72         clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();
73         container = FRMConsumerImpl.getContainer();
74
75         if (!(cacheStartup())) {
76             logger.error("Unanle to allocate/retrieve group cache");
77             System.out.println("Unable to allocate/retrieve group cache");
78         }
79
80         if (null == groupService) {
81             logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
82             System.out.println("Consumer SAL Group Service is down or NULL.");
83             return;
84         }
85
86         // For switch events
87         groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
88
89         if (null == groupListener) {
90             logger.error("Listener to listen on group data modifcation events");
91             System.out.println("Listener to listen on group data modifcation events.");
92             return;
93         }
94
95         commitHandler = new GroupDataCommitHandler();
96         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
97     }
98
99     private boolean allocateGroupCaches() {
100         if (this.clusterGroupContainerService == null) {
101             logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
102             return false;
103         }
104
105         try {
106             clusterGroupContainerService.createCache("frm.originalSwGroupView",
107                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
108
109             clusterGroupContainerService.createCache("frm.installedSwGroupView",
110                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
111
112             clusterGroupContainerService.createCache("frm.inactiveGroups",
113                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
114
115             clusterGroupContainerService.createCache("frm.nodeGroups",
116                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
117
118             // TODO for cluster mode
119             /*
120              * clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
121              * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
122              * IClusterServices.cacheMode.ASYNC));
123              *
124              * clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
125              * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
126              * IClusterServices.cacheMode.ASYNC));
127              */
128
129         } catch (CacheConfigException cce) {
130             logger.error("Group CacheConfigException");
131             return false;
132
133         } catch (CacheExistException cce) {
134             logger.error(" Group CacheExistException");
135         }
136
137         return true;
138     }
139
140     private void nonClusterGroupObjectCreate() {
141         originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
142         installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
143         nodeGroups = new ConcurrentHashMap<Node, List<Group>>();
144         inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
145     }
146
147     @SuppressWarnings({ "unchecked" })
148     private boolean retrieveGroupCaches() {
149         ConcurrentMap<?, ?> map;
150
151         if (this.clusterGroupContainerService == null) {
152             logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
153             nonClusterGroupObjectCreate();
154             return false;
155         }
156
157         map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
158         if (map != null) {
159             originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
160         } else {
161             logger.error("Retrieval of cache(originalSwGroupView) failed");
162             return false;
163         }
164
165         map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
166         if (map != null) {
167             installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
168         } else {
169             logger.error("Retrieval of cache(installedSwGroupView) failed");
170             return false;
171         }
172
173         map = clusterGroupContainerService.getCache("frm.inactiveGroups");
174         if (map != null) {
175             inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
176         } else {
177             logger.error("Retrieval of cache(inactiveGroups) failed");
178             return false;
179         }
180
181         map = clusterGroupContainerService.getCache("frm.nodeGroups");
182         if (map != null) {
183             nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
184         } else {
185             logger.error("Retrieval of cache(nodeGroup) failed");
186             return false;
187         }
188
189         return true;
190     }
191
192     private boolean cacheStartup() {
193         if (allocateGroupCaches()) {
194             if (retrieveGroupCaches()) {
195                 return true;
196             }
197         }
198
199         return false;
200     }
201
202     public Status validateGroup(Group group, FRMUtil.operation operation) {
203         String containerName;
204         String groupName;
205         Iterator<Bucket> bucketIterator;
206         boolean returnResult;
207         Buckets groupBuckets;
208
209         if (null != group) {
210             containerName = group.getContainerName();
211
212             if (null == containerName) {
213                 containerName = GlobalConstants.DEFAULT.toString();
214             } else if (!FRMUtil.isNameValid(containerName)) {
215                 logger.error("Container Name is invalid %s" + containerName);
216                 return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
217             }
218
219             groupName = group.getGroupName();
220             if (!FRMUtil.isNameValid(groupName)) {
221                 logger.error("Group Name is invalid %s" + groupName);
222                 return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
223             }
224
225             returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName);
226
227             if (FRMUtil.operation.ADD == operation && returnResult) {
228                 logger.error("Record with same Group Name exists");
229                 return new Status(StatusCode.BADREQUEST, "Group record exists");
230             } else if (!returnResult) {
231                 logger.error("Group record does not exist");
232                 return new Status(StatusCode.BADREQUEST, "Group record does not exist");
233             }
234
235             if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && group.getGroupType()
236                     .getIntValue() <= GroupType.GroupFf.getIntValue())) {
237                 logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
238                 return new Status(StatusCode.BADREQUEST, "Invalid Group type");
239             }
240
241             groupBuckets = group.getBuckets();
242
243             if (null != groupBuckets && null != groupBuckets.getBucket()) {
244                 bucketIterator = groupBuckets.getBucket().iterator();
245
246                 while (bucketIterator.hasNext()) {
247                     if (!(FRMUtil.validateActions(bucketIterator.next().getAction()))) {
248                         logger.error("Error in action bucket");
249                         return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
250                     }
251                 }
252             }
253         }
254
255         return new Status(StatusCode.SUCCESS);
256
257     }
258
259     private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) {
260         if (!originalSwGroupView.containsKey(key)) {
261             return false;
262         }
263
264         for (ConcurrentMap.Entry<GroupKey, Group> entry : originalSwGroupView.entrySet()) {
265             if (entry.getValue().getGroupName().equals(groupName)) {
266                 if (entry.getValue().getContainerName().equals(containerName)) {
267                     return true;
268                 }
269             }
270         }
271         return false;
272     }
273
274     /**
275      * Update Group entries to the southbound plugin/inventory and our internal
276      * database
277      *
278      * @param path
279      * @param dataObject
280      */
281     private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
282         GroupKey groupKey = groupUpdateDataObject.getKey();
283         Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
284
285         if (!groupOperationStatus.isSuccess()) {
286             logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
287             return groupOperationStatus;
288         }
289
290         originalSwGroupView.remove(groupKey);
291         originalSwGroupView.put(groupKey, groupUpdateDataObject);
292
293         if (groupUpdateDataObject.isInstall()) {
294             UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
295             // TODO how to get original group and modified group.
296
297             if (installedSwGroupView.containsKey(groupKey)) {
298                 installedSwGroupView.remove(groupKey);
299             }
300
301             installedSwGroupView.put(groupKey, groupUpdateDataObject);
302             groupService.updateGroup(groupData.build());
303         }
304
305         return groupOperationStatus;
306     }
307
308     /**
309      * Adds Group to the southbound plugin and our internal database
310      *
311      * @param path
312      * @param dataObject
313      */
314     private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
315         GroupKey groupKey = groupAddDataObject.getKey();
316         Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
317
318         if (!groupOperationStatus.isSuccess()) {
319             logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
320             return groupOperationStatus;
321         }
322
323         originalSwGroupView.put(groupKey, groupAddDataObject);
324
325         if (groupAddDataObject.isInstall()) {
326             AddGroupInputBuilder groupData = new AddGroupInputBuilder();
327             groupData.setBuckets(groupAddDataObject.getBuckets());
328             groupData.setContainerName(groupAddDataObject.getContainerName());
329             groupData.setGroupId(groupAddDataObject.getGroupId());
330             groupData.setGroupType(groupAddDataObject.getGroupType());
331             groupData.setNode(groupAddDataObject.getNode());
332             installedSwGroupView.put(groupKey, groupAddDataObject);
333             groupService.addGroup(groupData.build());
334         }
335
336         return groupOperationStatus;
337     }
338
339     private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
340         for (Entry<InstanceIdentifier<?>, Group> entry : transaction.additions.entrySet()) {
341
342             if (!addGroup(entry.getKey(), entry.getValue()).isSuccess()) {
343                 return Rpcs.getRpcResult(false, null, null);
344             }
345         }
346         for (@SuppressWarnings("unused")
347         Entry<InstanceIdentifier<?>, Group> entry : transaction.additions.entrySet()) {
348
349             if (!updateGroup(entry.getKey(), entry.getValue()).isSuccess()) {
350                 return Rpcs.getRpcResult(false, null, null);
351             }
352         }
353
354         for (InstanceIdentifier<?> removal : transaction.removals) {
355             // removeFlow(removal);
356         }
357
358         return Rpcs.getRpcResult(true, null, null);
359     }
360
361     private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
362
363         @SuppressWarnings("unchecked")
364         @Override
365         public DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
366             // We should verify transaction
367             System.out.println("Coming in GroupDatacommitHandler");
368             internalTransaction transaction = new internalTransaction(modification);
369             transaction.prepareUpdate();
370             return transaction;
371         }
372     }
373
374     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
375
376         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
377
378         @Override
379         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
380             return modification;
381         }
382
383         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
384             this.modification = modification;
385         }
386
387         Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
388         Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
389         Set<InstanceIdentifier<?>> removals = new HashSet<>();
390
391         /**
392          * We create a plan which flows will be added, which will be updated and
393          * which will be removed based on our internal state.
394          *
395          */
396         void prepareUpdate() {
397
398             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
399             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
400                 if (entry.getValue() instanceof Group) {
401                     Group group = (Group) entry.getValue();
402                     preparePutEntry(entry.getKey(), group);
403                 }
404
405             }
406
407             removals = modification.getRemovedConfigurationData();
408         }
409
410         private void preparePutEntry(InstanceIdentifier<?> key, Group group) {
411
412             Group original = originalSwGroupView.get(key);
413             if (original != null) {
414                 // It is update for us
415
416                 updates.put(key, group);
417             } else {
418                 // It is addition for us
419
420                 additions.put(key, group);
421             }
422         }
423
424         /**
425          * We are OK to go with execution of plan
426          *
427          */
428         @Override
429         public RpcResult<Void> finish() throws IllegalStateException {
430
431             RpcResult<Void> rpcStatus = commitToPlugin(this);
432             // We return true if internal transaction is successful.
433             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
434             return rpcStatus;
435         }
436
437         /**
438          *
439          * We should rollback our preparation
440          *
441          */
442         @Override
443         public RpcResult<Void> rollback() throws IllegalStateException {
444             // NOOP - we did not modified any internal state during
445             // requestCommit phase
446             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
447             return Rpcs.getRpcResult(true, null, null);
448
449         }
450
451     }
452
453     final class GroupEventListener implements SalGroupListener {
454
455         List<GroupAdded> addedGroups = new ArrayList<>();
456         List<GroupRemoved> removedGroups = new ArrayList<>();
457         List<GroupUpdated> updatedGroups = new ArrayList<>();
458
459         @Override
460         public void onGroupAdded(GroupAdded notification) {
461             System.out.println("added Group..........................");
462             addedGroups.add(notification);
463         }
464
465         @Override
466         public void onGroupRemoved(GroupRemoved notification) {
467             // TODO Auto-generated method stub
468
469         }
470
471         @Override
472         public void onGroupUpdated(GroupUpdated notification) {
473             // TODO Auto-generated method stub
474
475         }
476     }
477 }