1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.Collections;
6 import java.util.EnumSet;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.util.Iterator;
10 import java.util.List;
12 import java.util.Map.Entry;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
17 import org.opendaylight.controller.clustering.services.CacheConfigException;
18 import org.opendaylight.controller.clustering.services.CacheExistException;
19 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
20 import org.opendaylight.controller.clustering.services.IClusterServices;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
24 import org.opendaylight.controller.sal.common.util.Rpcs;
25 import org.opendaylight.controller.sal.core.IContainer;
26 import org.opendaylight.controller.sal.core.Node;
27 import org.opendaylight.controller.sal.utils.GlobalConstants;
28 import org.opendaylight.controller.sal.utils.Status;
29 import org.opendaylight.controller.sal.utils.StatusCode;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.UpdatedGroupBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
46 import org.opendaylight.yangtools.concepts.Registration;
47 import org.opendaylight.yangtools.yang.binding.DataObject;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.binding.NotificationListener;
50 import org.opendaylight.yangtools.yang.common.RpcError;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 @SuppressWarnings("unused")
56 public class GroupConsumerImpl implements IForwardingRulesManager {
58 protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
59 private final GroupEventListener groupEventListener = new GroupEventListener();
60 private Registration<NotificationListener> groupListener;
61 private SalGroupService groupService;
62 private GroupDataCommitHandler groupCommitHandler;
64 private ConcurrentMap<GroupKey, Group> originalSwGroupView;
65 private ConcurrentMap<GroupKey, Group> installedSwGroupView;
67 private ConcurrentMap<Node, List<Group>> nodeGroups;
68 private ConcurrentMap<GroupKey, Group> inactiveGroups;
70 private IClusterContainerServices clusterGroupContainerService = null;
71 private IContainer container;
73 public GroupConsumerImpl() {
75 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Groups.class).toInstance();
76 groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
78 if (!(cacheStartup())) {
79 logger.error("Unanle to allocate/retrieve group cache");
80 System.out.println("Unable to allocate/retrieve group cache");
83 if (null == groupService) {
84 logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
85 System.out.println("Consumer SAL Group Service is down or NULL.");
90 groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
92 if (null == groupListener) {
93 logger.error("Listener to listen on group data modifcation events");
94 System.out.println("Listener to listen on group data modifcation events.");
98 groupCommitHandler = new GroupDataCommitHandler();
99 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, groupCommitHandler);
100 clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();
101 container = FRMConsumerImpl.getContainer();
104 private boolean allocateGroupCaches() {
105 if (this.clusterGroupContainerService == null) {
106 logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
111 clusterGroupContainerService.createCache("frm.originalSwGroupView",
112 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
114 clusterGroupContainerService.createCache("frm.installedSwGroupView",
115 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
117 clusterGroupContainerService.createCache("frm.inactiveGroups",
118 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
120 clusterGroupContainerService.createCache("frm.nodeGroups",
121 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
123 // TODO for cluster mode
125 * clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
126 * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
127 * IClusterServices.cacheMode.ASYNC));
129 * clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
130 * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
131 * IClusterServices.cacheMode.ASYNC));
134 } catch (CacheConfigException cce) {
135 logger.error("Group CacheConfigException");
138 } catch (CacheExistException cce) {
139 logger.error(" Group CacheExistException");
145 private void nonClusterGroupObjectCreate() {
146 originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
147 installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
148 nodeGroups = new ConcurrentHashMap<Node, List<Group>>();
149 inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
152 @SuppressWarnings({ "unchecked" })
153 private boolean retrieveGroupCaches() {
154 ConcurrentMap<?, ?> map;
156 if (this.clusterGroupContainerService == null) {
157 logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
158 nonClusterGroupObjectCreate();
162 map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
164 originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
166 logger.error("Retrieval of cache(originalSwGroupView) failed");
170 map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
172 installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
174 logger.error("Retrieval of cache(installedSwGroupView) failed");
178 map = clusterGroupContainerService.getCache("frm.inactiveGroups");
180 inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
182 logger.error("Retrieval of cache(inactiveGroups) failed");
186 map = clusterGroupContainerService.getCache("frm.nodeGroups");
188 nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
190 logger.error("Retrieval of cache(nodeGroup) failed");
197 private boolean cacheStartup() {
198 if (allocateGroupCaches()) {
199 if (retrieveGroupCaches()) {
207 public Status validateGroup(Group group, FRMUtil.operation operation) {
208 String containerName;
210 Iterator<Bucket> bucketIterator;
211 boolean returnResult;
212 Buckets groupBuckets;
215 containerName = group.getContainerName();
217 if (null == containerName) {
218 containerName = GlobalConstants.DEFAULT.toString();
219 } else if (!FRMUtil.isNameValid(containerName)) {
220 logger.error("Container Name is invalid %s" + containerName);
221 return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
224 groupName = group.getGroupName();
225 if (!FRMUtil.isNameValid(groupName)) {
226 logger.error("Group Name is invalid %s" + groupName);
227 return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
230 if (!(group.getGroupType().getIntValue() >= GroupTypes.GroupAll.getIntValue() && group.getGroupType()
231 .getIntValue() <= GroupTypes.GroupFf.getIntValue())) {
232 logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
233 return new Status(StatusCode.BADREQUEST, "Invalid Group type");
236 groupBuckets = group.getBuckets();
238 if (null != groupBuckets && null != groupBuckets.getBucket()) {
239 bucketIterator = groupBuckets.getBucket().iterator();
241 while (bucketIterator.hasNext()) {
242 if (!(FRMUtil.validateActions(bucketIterator.next().getAction()))) {
243 logger.error("Error in action bucket");
244 return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
250 return new Status(StatusCode.SUCCESS);
254 * Update Group entries to the southbound plugin/inventory and our internal
260 private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
261 GroupKey groupKey = groupUpdateDataObject.getKey();
262 UpdatedGroupBuilder updateGroupBuilder = null;
264 Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
266 if (!groupOperationStatus.isSuccess()) {
267 logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
268 return groupOperationStatus;
271 UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
272 updateGroupBuilder = new UpdatedGroupBuilder();
273 updateGroupBuilder.setGroupId(new GroupId(groupUpdateDataObject.getId()));
274 updateGroupBuilder.fieldsFrom(groupUpdateDataObject);
275 groupData.setUpdatedGroup(updateGroupBuilder.build());
276 groupService.updateGroup(groupData.build());
277 return groupOperationStatus;
281 * Adds Group to the southbound plugin and our internal database
286 private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
287 GroupKey groupKey = groupAddDataObject.getKey();
288 Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
290 if (!groupOperationStatus.isSuccess()) {
291 logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
292 return groupOperationStatus;
295 AddGroupInputBuilder groupData = new AddGroupInputBuilder();
296 groupData.setBuckets(groupAddDataObject.getBuckets());
297 groupData.setContainerName(groupAddDataObject.getContainerName());
298 groupData.setGroupId(new GroupId(groupAddDataObject.getId()));
299 groupData.setGroupType(groupAddDataObject.getGroupType());
300 groupData.setNode(groupAddDataObject.getNode());
301 groupService.addGroup(groupData.build());
302 return groupOperationStatus;
306 * Remove Group to the southbound plugin and our internal database
311 private Status removeGroup(InstanceIdentifier<?> path, Group groupRemoveDataObject) {
312 GroupKey groupKey = groupRemoveDataObject.getKey();
313 Status groupOperationStatus = validateGroup(groupRemoveDataObject, FRMUtil.operation.ADD);
315 if (!groupOperationStatus.isSuccess()) {
316 logger.error("Group data object validation failed %s" + groupRemoveDataObject.getGroupName());
317 return groupOperationStatus;
320 RemoveGroupInputBuilder groupData = new RemoveGroupInputBuilder();
321 groupData.setBuckets(groupRemoveDataObject.getBuckets());
322 groupData.setContainerName(groupRemoveDataObject.getContainerName());
323 groupData.setGroupId(new GroupId(groupRemoveDataObject.getId()));
324 groupData.setGroupType(groupRemoveDataObject.getGroupType());
325 groupData.setNode(groupRemoveDataObject.getNode());
326 groupService.removeGroup(groupData.build());
327 return groupOperationStatus;
330 private RpcResult<Void> commitToPlugin(InternalTransaction transaction) {
331 for (Entry<InstanceIdentifier<?>, Group> entry : transaction.additions.entrySet()) {
333 if (!addGroup(entry.getKey(), entry.getValue()).isSuccess()) {
334 transaction.additions.remove(entry.getKey());
335 return Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
339 for (Entry<InstanceIdentifier<?>, Group> entry : transaction.updates.entrySet()) {
341 if (!addGroup(entry.getKey(), entry.getValue()).isSuccess()) {
342 transaction.updates.remove(entry.getKey());
343 return Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
347 for (InstanceIdentifier<?> groupId : transaction.removals) {
348 DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(groupId);
350 if(removeValue instanceof Group) {
351 if(!removeGroup(groupId, (Group)removeValue).isSuccess()) {
352 return Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
357 return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
360 private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
363 public DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(
364 DataModification<InstanceIdentifier<?>, DataObject> modification) {
365 // We should verify transaction
366 System.out.println("Coming in GroupDatacommitHandler");
367 InternalTransaction transaction = new InternalTransaction(modification);
368 transaction.prepareUpdate();
373 private final class InternalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
375 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
378 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
382 public InternalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
383 this.modification = modification;
386 Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
387 Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
388 Set<InstanceIdentifier<?>> removals = new HashSet<>();
391 * We create a plan which flows will be added, which will be updated and
392 * which will be removed based on our internal state.
395 void prepareUpdate() {
397 Set<Entry<InstanceIdentifier<?>, DataObject>> groupAdded = modification.getCreatedConfigurationData().entrySet();
398 for (Entry<InstanceIdentifier<?>, DataObject> entry : groupAdded) {
399 if (entry.getValue() instanceof Group) {
400 Group group = (Group) entry.getValue();
401 additions.put(entry.getKey(), group);
406 Set<Entry<InstanceIdentifier<?>, DataObject>> groupUpdate = modification.getUpdatedConfigurationData().entrySet();
407 for (Entry<InstanceIdentifier<?>, DataObject> entry : groupUpdate) {
408 if (entry.getValue() instanceof Group) {
409 Group group = (Group) entry.getValue();
410 ///will be fixed once getUpdatedConfigurationData returns only updated data not created data with it.
411 if (additions.containsKey(entry.getKey())) {
412 updates.put(entry.getKey(), group);
418 removals = modification.getRemovedConfigurationData();
422 * We are OK to go with execution of plan
426 public RpcResult<Void> finish() throws IllegalStateException {
428 RpcResult<Void> rpcStatus = commitToPlugin(this);
429 // We return true if internal transaction is successful.
430 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
436 * We should rollback our preparation
440 public RpcResult<Void> rollback() throws IllegalStateException {
441 // NOOP - we did not modified any internal state during
442 // requestCommit phase
443 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
444 return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
450 final class GroupEventListener implements SalGroupListener {
452 List<GroupAdded> addedGroups = new ArrayList<>();
453 List<GroupRemoved> removedGroups = new ArrayList<>();
454 List<GroupUpdated> updatedGroups = new ArrayList<>();
457 public void onGroupAdded(GroupAdded notification) {
458 addedGroups.add(notification);
462 public void onGroupRemoved(GroupRemoved notification) {
463 // TODO Auto-generated method stub
468 public void onGroupUpdated(GroupUpdated notification) {
469 // TODO Auto-generated method stub
475 public List<DataObject> get() {
477 List<DataObject> orderedList = new ArrayList<DataObject>();
478 Collection<Group> groupList = originalSwGroupView.values();
479 for (Iterator<Group> iterator = groupList.iterator(); iterator.hasNext();) {
480 orderedList.add(iterator.next());
486 public DataObject getWithName(String name, Node n) {
488 if (this instanceof GroupConsumerImpl) {
489 Collection<Group> groupList = originalSwGroupView.values();
490 for (Iterator<Group> iterator = groupList.iterator(); iterator.hasNext();) {
491 Group group = iterator.next();
492 if (group.getNode().equals(n) && group.getGroupName().equals(name)) {