1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.controller.sal.core.IContainer;
24 import org.opendaylight.controller.sal.core.Node;
25 import org.opendaylight.controller.sal.utils.GlobalConstants;
26 import org.opendaylight.controller.sal.utils.Status;
27 import org.opendaylight.controller.sal.utils.StatusCode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.UpdatedGroupBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.UpdatedMeterBuilder;
45 import org.opendaylight.yangtools.concepts.Registration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.opendaylight.yangtools.yang.binding.NotificationListener;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 @SuppressWarnings("unused")
54 public class GroupConsumerImpl {
56 protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
57 private GroupEventListener groupEventListener = new GroupEventListener();
58 private Registration<NotificationListener> groupListener;
59 private SalGroupService groupService;
60 private GroupDataCommitHandler commitHandler;
62 private ConcurrentMap<GroupKey, Group> originalSwGroupView;
63 private ConcurrentMap<GroupKey, Group> installedSwGroupView;
65 private ConcurrentMap<Node, List<Group>> nodeGroups;
66 private ConcurrentMap<GroupKey, Group> inactiveGroups;
68 private IClusterContainerServices clusterGroupContainerService = null;
69 private IContainer container;
71 public GroupConsumerImpl() {
73 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Groups.class).node(Group.class).toInstance();
74 groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
76 clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();
77 container = FRMConsumerImpl.getContainer();
79 if (!(cacheStartup())) {
80 logger.error("Unanle to allocate/retrieve group cache");
81 System.out.println("Unable to allocate/retrieve group cache");
84 if (null == groupService) {
85 logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
86 System.out.println("Consumer SAL Group Service is down or NULL.");
91 groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
93 if (null == groupListener) {
94 logger.error("Listener to listen on group data modifcation events");
95 System.out.println("Listener to listen on group data modifcation events.");
99 commitHandler = new GroupDataCommitHandler();
100 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
103 private boolean allocateGroupCaches() {
104 if (this.clusterGroupContainerService == null) {
105 logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
110 clusterGroupContainerService.createCache("frm.originalSwGroupView",
111 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
113 clusterGroupContainerService.createCache("frm.installedSwGroupView",
114 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
116 clusterGroupContainerService.createCache("frm.inactiveGroups",
117 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
119 clusterGroupContainerService.createCache("frm.nodeGroups",
120 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
122 //TODO for cluster mode
123 /* clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
124 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
126 clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
127 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/
129 } catch (CacheConfigException cce) {
130 logger.error("Group CacheConfigException");
133 } catch (CacheExistException cce) {
134 logger.error(" Group CacheExistException");
140 private void nonClusterGroupObjectCreate() {
141 originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
142 installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
143 nodeGroups = new ConcurrentHashMap<Node, List<Group>>();
144 inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
147 @SuppressWarnings({ "unchecked" })
148 private boolean retrieveGroupCaches() {
149 ConcurrentMap<?, ?> map;
151 if (this.clusterGroupContainerService == null) {
152 logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
153 nonClusterGroupObjectCreate();
157 map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
159 originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
161 logger.error("Retrieval of cache(originalSwGroupView) failed");
165 map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
167 installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
169 logger.error("Retrieval of cache(installedSwGroupView) failed");
173 map = clusterGroupContainerService.getCache("frm.inactiveGroups");
175 inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
177 logger.error("Retrieval of cache(inactiveGroups) failed");
181 map = clusterGroupContainerService.getCache("frm.nodeGroups");
183 nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
185 logger.error("Retrieval of cache(nodeGroup) failed");
192 private boolean cacheStartup() {
193 if (allocateGroupCaches()) {
194 if (retrieveGroupCaches()) {
202 public Status validateGroup(Group group, FRMUtil.operation operation) {
203 String containerName;
205 Iterator<Bucket> bucketIterator;
206 boolean returnResult;
207 Buckets groupBuckets;
210 containerName = group.getContainerName();
212 if (null == containerName) {
213 containerName = GlobalConstants.DEFAULT.toString();
214 } else if (!FRMUtil.isNameValid(containerName)) {
215 logger.error("Container Name is invalid %s" + containerName);
216 return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
219 groupName = group.getGroupName();
220 if (!FRMUtil.isNameValid(groupName)) {
221 logger.error("Group Name is invalid %s" + groupName);
222 return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
225 returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName);
227 if (FRMUtil.operation.ADD == operation && returnResult) {
228 logger.error("Record with same Group Name exists");
229 return new Status(StatusCode.BADREQUEST, "Group record exists");
230 } else if (!returnResult) {
231 logger.error("Group record does not exist");
232 return new Status(StatusCode.BADREQUEST, "Group record does not exist");
235 if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() &&
236 group.getGroupType().getIntValue() <= GroupType.GroupFf.getIntValue())) {
237 logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
238 return new Status(StatusCode.BADREQUEST, "Invalid Group type");
241 groupBuckets = group.getBuckets();
243 if (null != groupBuckets && null != groupBuckets.getBucket()) {
244 bucketIterator = groupBuckets.getBucket().iterator();
246 while (bucketIterator.hasNext()) {
247 if (!(FRMUtil.validateActions(bucketIterator.next().getAction()))) {
248 logger.error("Error in action bucket");
249 return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
255 return new Status(StatusCode.SUCCESS);
259 private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) {
260 if (!originalSwGroupView.containsKey(key)) {
264 for (ConcurrentMap.Entry<GroupKey, Group> entry : originalSwGroupView.entrySet()) {
265 if (entry.getValue().getGroupName().equals(groupName)) {
266 if (entry.getValue().getContainerName().equals(containerName)) {
275 * Update Group entries to the southbound plugin/inventory and our internal
281 private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
282 GroupKey groupKey = groupUpdateDataObject.getKey();
283 UpdatedGroupBuilder updateGroupBuilder = null;
285 Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
287 if (!groupOperationStatus.isSuccess()) {
288 logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
289 return groupOperationStatus;
292 if (originalSwGroupView.containsKey(groupKey)) {
293 originalSwGroupView.remove(groupKey);
294 originalSwGroupView.put(groupKey, groupUpdateDataObject);
297 if (groupUpdateDataObject.isInstall()) {
298 UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
299 updateGroupBuilder = new UpdatedGroupBuilder();
300 updateGroupBuilder.fieldsFrom(groupUpdateDataObject);
301 groupData.setUpdatedGroup(updateGroupBuilder.build());
302 //TODO how to get original group and modified group.
304 if (installedSwGroupView.containsKey(groupKey)) {
305 installedSwGroupView.remove(groupKey);
306 installedSwGroupView.put(groupKey, groupUpdateDataObject);
309 groupService.updateGroup(groupData.build());
312 return groupOperationStatus;
316 * Adds Group to the southbound plugin and our internal database
321 private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
322 GroupKey groupKey = groupAddDataObject.getKey();
323 Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
325 if (!groupOperationStatus.isSuccess()) {
326 logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
327 return groupOperationStatus;
330 originalSwGroupView.put(groupKey, groupAddDataObject);
332 if (groupAddDataObject.isInstall()) {
333 AddGroupInputBuilder groupData = new AddGroupInputBuilder();
334 groupData.setBuckets(groupAddDataObject.getBuckets());
335 groupData.setContainerName(groupAddDataObject.getContainerName());
336 groupData.setGroupId(groupAddDataObject.getGroupId());
337 groupData.setGroupType(groupAddDataObject.getGroupType());
338 groupData.setNode(groupAddDataObject.getNode());
339 installedSwGroupView.put(groupKey, groupAddDataObject);
340 groupService.addGroup(groupData.build());
343 return groupOperationStatus;
346 private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
347 for(Entry<InstanceIdentifier<?>, Group> entry :transaction.additions.entrySet()) {
349 if (!addGroup(entry.getKey(),entry.getValue()).isSuccess()) {
350 transaction.additions.remove(entry.getKey());
351 return Rpcs.getRpcResult(false, null, null);
355 for(Entry<InstanceIdentifier<?>, Group> entry :transaction.updates.entrySet()) {
357 if (!updateGroup(entry.getKey(),entry.getValue()).isSuccess()) {
358 transaction.updates.remove(entry.getKey());
359 return Rpcs.getRpcResult(false, null, null);
363 for(InstanceIdentifier<?> removal : transaction.removals) {
364 // removeFlow(removal);
367 return Rpcs.getRpcResult(true, null, null);
370 private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
372 @SuppressWarnings("unchecked")
374 public DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
375 // We should verify transaction
376 System.out.println("Coming in GroupDatacommitHandler");
377 internalTransaction transaction = new internalTransaction(modification);
378 transaction.prepareUpdate();
383 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
385 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
388 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
392 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
393 this.modification = modification;
396 Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
397 Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
398 Set<InstanceIdentifier<?>> removals = new HashSet<>();
401 * We create a plan which flows will be added, which will be updated and
402 * which will be removed based on our internal state.
405 void prepareUpdate() {
407 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
408 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
409 if (entry.getValue() instanceof Group) {
410 Group group = (Group) entry.getValue();
411 preparePutEntry(entry.getKey(), group);
416 removals = modification.getRemovedConfigurationData();
419 private void preparePutEntry(InstanceIdentifier<?> key, Group group) {
421 Group original = originalSwGroupView.get(key);
422 if (original != null) {
423 // It is update for us
425 updates.put(key, group);
427 // It is addition for us
429 additions.put(key, group);
434 * We are OK to go with execution of plan
438 public RpcResult<Void> finish() throws IllegalStateException {
440 RpcResult<Void> rpcStatus = commitToPlugin(this);
441 // We return true if internal transaction is successful.
442 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
448 * We should rollback our preparation
452 public RpcResult<Void> rollback() throws IllegalStateException {
453 // NOOP - we did not modified any internal state during
454 // requestCommit phase
455 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
456 return Rpcs.getRpcResult(true, null, null);
462 final class GroupEventListener implements SalGroupListener {
464 List<GroupAdded> addedGroups = new ArrayList<>();
465 List<GroupRemoved> removedGroups = new ArrayList<>();
466 List<GroupUpdated> updatedGroups = new ArrayList<>();
469 public void onGroupAdded(GroupAdded notification) {
470 System.out.println("added Group..........................");
471 addedGroups.add(notification);
475 public void onGroupRemoved(GroupRemoved notification) {
476 // TODO Auto-generated method stub
481 public void onGroupUpdated(GroupUpdated notification) {
482 // TODO Auto-generated method stub