1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.controller.sal.core.IContainer;
24 import org.opendaylight.controller.sal.core.Node;
25 import org.opendaylight.controller.sal.utils.GlobalConstants;
26 import org.opendaylight.controller.sal.utils.Status;
27 import org.opendaylight.controller.sal.utils.StatusCode;
28 import org.opendaylight.controller.switchmanager.ISwitchManager;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.NotificationListener;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 @SuppressWarnings("unused")
51 public class GroupConsumerImpl {
53 protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
54 private GroupEventListener groupEventListener = new GroupEventListener();
55 private Registration<NotificationListener> groupListener;
56 private SalGroupService groupService;
57 private GroupDataCommitHandler commitHandler;
59 private ConcurrentMap<GroupKey, Group> originalSwGroupView;
60 private ConcurrentMap<GroupKey, Group> installedSwGroupView;
62 private ConcurrentMap<Node, List<Group>> nodeGroups;
63 private ConcurrentMap<GroupKey, Group> inactiveGroups;
65 private IClusterContainerServices clusterGroupContainerService = null;
66 private IContainer container;
68 public GroupConsumerImpl() {
69 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Groups.class).node(Group.class).toInstance();
70 groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
72 clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();
73 container = FRMConsumerImpl.getContainer();
75 if (!(cacheStartup())) {
76 logger.error("Unanle to allocate/retrieve group cache");
77 System.out.println("Unable to allocate/retrieve group cache");
80 if (null == groupService) {
81 logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
82 System.out.println("Consumer SAL Group Service is down or NULL.");
87 groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
89 if (null == groupListener) {
90 logger.error("Listener to listen on group data modifcation events");
91 System.out.println("Listener to listen on group data modifcation events.");
95 commitHandler = new GroupDataCommitHandler();
96 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
99 private boolean allocateGroupCaches() {
100 if (this.clusterGroupContainerService == null) {
101 logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
106 clusterGroupContainerService.createCache("frm.originalSwGroupView",
107 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
109 clusterGroupContainerService.createCache("frm.installedSwGroupView",
110 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
112 clusterGroupContainerService.createCache("frm.inactiveGroups",
113 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
115 clusterGroupContainerService.createCache("frm.nodeGroups",
116 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
118 //TODO for cluster mode
119 /* clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
120 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
122 clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
123 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/
125 } catch (CacheConfigException cce) {
126 logger.error("Group CacheConfigException");
129 } catch (CacheExistException cce) {
130 logger.error(" Group CacheExistException");
136 private void nonClusterGroupObjectCreate() {
137 originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
138 installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
139 nodeGroups = new ConcurrentHashMap<Node, List<Group>>();
140 inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
143 @SuppressWarnings({ "unchecked" })
144 private boolean retrieveGroupCaches() {
145 ConcurrentMap<?, ?> map;
147 if (this.clusterGroupContainerService == null) {
148 logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
149 nonClusterGroupObjectCreate();
153 map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
155 originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
157 logger.error("Retrieval of cache(originalSwGroupView) failed");
161 map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
163 installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
165 logger.error("Retrieval of cache(installedSwGroupView) failed");
169 map = clusterGroupContainerService.getCache("frm.inactiveGroups");
171 inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
173 logger.error("Retrieval of cache(inactiveGroups) failed");
177 map = clusterGroupContainerService.getCache("frm.nodeGroups");
179 nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
181 logger.error("Retrieval of cache(nodeGroup) failed");
188 private boolean cacheStartup() {
189 if (allocateGroupCaches()) {
190 if (retrieveGroupCaches()) {
198 public Status validateGroup(Group group, FRMUtil.operation operation) {
199 String containerName;
201 Iterator<Bucket> bucketIterator;
202 boolean returnResult;
203 Buckets groupBuckets;
206 containerName = group.getContainerName();
208 if (null == containerName) {
209 containerName = GlobalConstants.DEFAULT.toString();
211 else if (!FRMUtil.isNameValid(containerName)) {
212 logger.error("Container Name is invalid %s" + containerName);
213 return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
216 groupName = group.getGroupName();
217 if (!FRMUtil.isNameValid(groupName)) {
218 logger.error("Group Name is invalid %s" + groupName);
219 return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
222 returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName);
224 if (FRMUtil.operation.ADD == operation && returnResult) {
225 logger.error("Record with same Group Name exists");
226 return new Status(StatusCode.BADREQUEST, "Group record exists");
228 else if (!returnResult) {
229 logger.error("Group record does not exist");
230 return new Status(StatusCode.BADREQUEST, "Group record does not exist");
233 if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() &&
234 group.getGroupType().getIntValue() <= GroupType.GroupFf.getIntValue())) {
235 logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
236 return new Status(StatusCode.BADREQUEST, "Invalid Group type");
239 groupBuckets = group.getBuckets();
241 if (null != groupBuckets && null != groupBuckets.getBucket()) {
242 bucketIterator = groupBuckets.getBucket().iterator();
244 while (bucketIterator.hasNext()) {
245 if(!(FRMUtil.areActionsValid(bucketIterator.next().getActions()))) {
246 logger.error("Error in action bucket");
247 return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
253 return new Status(StatusCode.SUCCESS);
257 private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) {
258 if (! originalSwGroupView.containsKey(key)) {
262 for (ConcurrentMap.Entry<GroupKey, Group> entry : originalSwGroupView.entrySet()) {
263 if (entry.getValue().getGroupName().equals(groupName)) {
264 if (entry.getValue().getContainerName().equals(containerName)) {
274 * Update Group entries to the southbound plugin/inventory and our internal database
279 private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
280 GroupKey groupKey = groupUpdateDataObject.getKey();
281 Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
283 if (!groupOperationStatus.isSuccess()) {
284 logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
285 return groupOperationStatus;
288 originalSwGroupView.remove(groupKey);
289 originalSwGroupView.put(groupKey, groupUpdateDataObject);
291 if (groupUpdateDataObject.isInstall()) {
292 UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
293 //TODO how to get original group and modified group.
295 if (installedSwGroupView.containsKey(groupKey)) {
296 installedSwGroupView.remove(groupKey);
299 installedSwGroupView.put(groupKey, groupUpdateDataObject);
300 groupService.updateGroup(groupData.build());
303 return groupOperationStatus;
307 * Adds Group to the southbound plugin and our internal database
312 private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
313 GroupKey groupKey = groupAddDataObject.getKey();
314 Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
316 if (!groupOperationStatus.isSuccess()) {
317 logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
318 return groupOperationStatus;
321 originalSwGroupView.put(groupKey, groupAddDataObject);
323 if (groupAddDataObject.isInstall()) {
324 AddGroupInputBuilder groupData = new AddGroupInputBuilder();
325 groupData.setBuckets(groupAddDataObject.getBuckets());
326 groupData.setContainerName(groupAddDataObject.getContainerName());
327 groupData.setGroupId(groupAddDataObject.getGroupId());
328 groupData.setGroupType(groupAddDataObject.getGroupType());
329 groupData.setNode(groupAddDataObject.getNode());
330 installedSwGroupView.put(groupKey, groupAddDataObject);
331 groupService.addGroup(groupData.build());
334 return groupOperationStatus;
337 private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
338 for(Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
340 if (!addGroup(entry.getKey(),entry.getValue()).isSuccess()) {
341 return Rpcs.getRpcResult(false, null, null);
344 for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
346 if (!updateGroup(entry.getKey(),entry.getValue()).isSuccess()) {
347 return Rpcs.getRpcResult(false, null, null);
351 for(InstanceIdentifier<?> removal : transaction.removals) {
352 // removeFlow(removal);
355 return Rpcs.getRpcResult(true, null, null);
358 private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
360 @SuppressWarnings("unchecked")
362 public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
363 // We should verify transaction
364 System.out.println("Coming in GroupDatacommitHandler");
365 internalTransaction transaction = new internalTransaction(modification);
366 transaction.prepareUpdate();
371 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
373 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
376 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
380 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
381 this.modification = modification;
384 Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
385 Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
386 Set<InstanceIdentifier<?>> removals = new HashSet<>();
389 * We create a plan which flows will be added, which will be updated and
390 * which will be removed based on our internal state.
393 void prepareUpdate() {
395 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
396 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
397 if (entry.getValue() instanceof Group) {
398 Group group = (Group) entry.getValue();
399 preparePutEntry(entry.getKey(), group);
404 removals = modification.getRemovedConfigurationData();
407 private void preparePutEntry(InstanceIdentifier<?> key, Group group) {
409 Group original = originalSwGroupView.get(key);
410 if (original != null) {
411 // It is update for us
413 updates.put(key, group);
415 // It is addition for us
417 additions.put(key, group);
422 * We are OK to go with execution of plan
426 public RpcResult<Void> finish() throws IllegalStateException {
428 RpcResult<Void> rpcStatus = commitToPlugin(this);
429 // We return true if internal transaction is successful.
430 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
436 * We should rollback our preparation
440 public RpcResult<Void> rollback() throws IllegalStateException {
441 // NOOP - we did not modified any internal state during
442 // requestCommit phase
443 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
444 return Rpcs.getRpcResult(true, null, null);
451 final class GroupEventListener implements SalGroupListener {
453 List<GroupAdded> addedGroups = new ArrayList<>();
454 List<GroupRemoved> removedGroups = new ArrayList<>();
455 List<GroupUpdated> updatedGroups = new ArrayList<>();
459 public void onGroupAdded(GroupAdded notification) {
460 System.out.println("added Group..........................");
461 addedGroups.add(notification);
465 public void onGroupRemoved(GroupRemoved notification) {
466 // TODO Auto-generated method stub
471 public void onGroupUpdated(GroupUpdated notification) {
472 // TODO Auto-generated method stub