package org.opendaylight.controller.forwardingrulesmanager.consumer.impl; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; import org.opendaylight.controller.md.sal.common.api.data.DataModification; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.UpdatedGroupBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.Meter; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings("unused") public class GroupConsumerImpl implements IForwardingRulesManager { protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class); private final GroupEventListener groupEventListener = new GroupEventListener(); private Registration groupListener; private SalGroupService groupService; private GroupDataCommitHandler commitHandler; private ConcurrentMap originalSwGroupView; private ConcurrentMap installedSwGroupView; private ConcurrentMap> nodeGroups; private ConcurrentMap inactiveGroups; private IClusterContainerServices clusterGroupContainerService = null; private IContainer container; public GroupConsumerImpl() { InstanceIdentifier path = InstanceIdentifier.builder(Groups.class).child(Group.class) .toInstance(); groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class); clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService(); container = FRMConsumerImpl.getContainer(); if (!(cacheStartup())) { logger.error("Unanle to allocate/retrieve group cache"); System.out.println("Unable to allocate/retrieve group cache"); } if (null == groupService) { logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended"); System.out.println("Consumer SAL Group Service is down or NULL."); return; } // For switch events groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener); if (null == groupListener) { logger.error("Listener to listen on group data modifcation events"); System.out.println("Listener to listen on group data modifcation events."); return; } commitHandler = new GroupDataCommitHandler(); FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler); } private boolean allocateGroupCaches() { if (this.clusterGroupContainerService == null) { logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache"); return false; } try { clusterGroupContainerService.createCache("frm.originalSwGroupView", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterGroupContainerService.createCache("frm.installedSwGroupView", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterGroupContainerService.createCache("frm.inactiveGroups", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterGroupContainerService.createCache("frm.nodeGroups", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); // TODO for cluster mode /* * clusterGroupContainerService.createCache(WORK_STATUS_CACHE, * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, * IClusterServices.cacheMode.ASYNC)); * * clusterGroupContainerService.createCache(WORK_ORDER_CACHE, * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, * IClusterServices.cacheMode.ASYNC)); */ } catch (CacheConfigException cce) { logger.error("Group CacheConfigException"); return false; } catch (CacheExistException cce) { logger.error(" Group CacheExistException"); } return true; } private void nonClusterGroupObjectCreate() { originalSwGroupView = new ConcurrentHashMap(); installedSwGroupView = new ConcurrentHashMap(); nodeGroups = new ConcurrentHashMap>(); inactiveGroups = new ConcurrentHashMap(); } @SuppressWarnings({ "unchecked" }) private boolean retrieveGroupCaches() { ConcurrentMap map; if (this.clusterGroupContainerService == null) { logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache"); nonClusterGroupObjectCreate(); return false; } map = clusterGroupContainerService.getCache("frm.originalSwGroupView"); if (map != null) { originalSwGroupView = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(originalSwGroupView) failed"); return false; } map = clusterGroupContainerService.getCache("frm.installedSwGroupView"); if (map != null) { installedSwGroupView = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(installedSwGroupView) failed"); return false; } map = clusterGroupContainerService.getCache("frm.inactiveGroups"); if (map != null) { inactiveGroups = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(inactiveGroups) failed"); return false; } map = clusterGroupContainerService.getCache("frm.nodeGroups"); if (map != null) { nodeGroups = (ConcurrentMap>) map; } else { logger.error("Retrieval of cache(nodeGroup) failed"); return false; } return true; } private boolean cacheStartup() { if (allocateGroupCaches()) { if (retrieveGroupCaches()) { return true; } } return false; } public Status validateGroup(Group group, FRMUtil.operation operation) { String containerName; String groupName; Iterator bucketIterator; boolean returnResult; Buckets groupBuckets; if (null != group) { containerName = group.getContainerName(); if (null == containerName) { containerName = GlobalConstants.DEFAULT.toString(); } else if (!FRMUtil.isNameValid(containerName)) { logger.error("Container Name is invalid %s" + containerName); return new Status(StatusCode.BADREQUEST, "Container Name is invalid"); } groupName = group.getGroupName(); if (!FRMUtil.isNameValid(groupName)) { logger.error("Group Name is invalid %s" + groupName); return new Status(StatusCode.BADREQUEST, "Group Name is invalid"); } /* returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName); if (FRMUtil.operation.ADD == operation && returnResult) { logger.error("Record with same Group Name exists"); return new Status(StatusCode.BADREQUEST, "Group record exists"); } else if (!returnResult) { logger.error("Group record does not exist"); return new Status(StatusCode.BADREQUEST, "Group record does not exist"); }*/ if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && group.getGroupType() .getIntValue() <= GroupType.GroupFf.getIntValue())) { logger.error("Invalid Group type %d" + group.getGroupType().getIntValue()); return new Status(StatusCode.BADREQUEST, "Invalid Group type"); } groupBuckets = group.getBuckets(); if (null != groupBuckets && null != groupBuckets.getBucket()) { bucketIterator = groupBuckets.getBucket().iterator(); while (bucketIterator.hasNext()) { if (!(FRMUtil.validateActions(bucketIterator.next().getAction()))) { logger.error("Error in action bucket"); return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents"); } } } } return new Status(StatusCode.SUCCESS); } /* private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) { if (!originalSwGroupView.containsKey(key)) { return false; } for (ConcurrentMap.Entry entry : originalSwGroupView.entrySet()) { if (entry.getValue().getGroupName().equals(groupName)) { if (entry.getValue().getContainerName().equals(containerName)) { return true; } } } return true; }*/ /** * Update Group entries to the southbound plugin/inventory and our internal * database * * @param path * @param dataObject */ private Status updateGroup(InstanceIdentifier path, Group groupUpdateDataObject) { GroupKey groupKey = groupUpdateDataObject.getKey(); UpdatedGroupBuilder updateGroupBuilder = null; Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE); if (!groupOperationStatus.isSuccess()) { logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName()); return groupOperationStatus; } /*if (originalSwGroupView.containsKey(groupKey)) { originalSwGroupView.remove(groupKey); originalSwGroupView.put(groupKey, groupUpdateDataObject); } */ if (groupUpdateDataObject.isInstall()) { UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder(); updateGroupBuilder = new UpdatedGroupBuilder(); updateGroupBuilder.fieldsFrom(groupUpdateDataObject); groupData.setUpdatedGroup(updateGroupBuilder.build()); // TODO how to get original group and modified group. /* if (installedSwGroupView.containsKey(groupKey)) { installedSwGroupView.remove(groupKey); installedSwGroupView.put(groupKey, groupUpdateDataObject); }*/ groupService.updateGroup(groupData.build()); } return groupOperationStatus; } /** * Adds Group to the southbound plugin and our internal database * * @param path * @param dataObject */ private Status addGroup(InstanceIdentifier path, Group groupAddDataObject) { GroupKey groupKey = groupAddDataObject.getKey(); Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD); if (!groupOperationStatus.isSuccess()) { logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName()); return groupOperationStatus; } //originalSwGroupView.put(groupKey, groupAddDataObject); if (groupAddDataObject.isInstall()) { AddGroupInputBuilder groupData = new AddGroupInputBuilder(); groupData.setBuckets(groupAddDataObject.getBuckets()); groupData.setContainerName(groupAddDataObject.getContainerName()); groupData.setGroupId(groupAddDataObject.getGroupId()); groupData.setGroupType(groupAddDataObject.getGroupType()); groupData.setNode(groupAddDataObject.getNode()); // installedSwGroupView.put(groupKey, groupAddDataObject); groupService.addGroup(groupData.build()); } return groupOperationStatus; } /** * Remove Group to the southbound plugin and our internal database * * @param path * @param dataObject */ private Status removeGroup(InstanceIdentifier path, Group groupRemoveDataObject) { GroupKey groupKey = groupRemoveDataObject.getKey(); Status groupOperationStatus = validateGroup(groupRemoveDataObject, FRMUtil.operation.ADD); if (!groupOperationStatus.isSuccess()) { logger.error("Group data object validation failed %s" + groupRemoveDataObject.getGroupName()); return groupOperationStatus; } //originalSwGroupView.put(groupKey, groupAddDataObject); if (groupRemoveDataObject.isInstall()) { RemoveGroupInputBuilder groupData = new RemoveGroupInputBuilder(); groupData.setBuckets(groupRemoveDataObject.getBuckets()); groupData.setContainerName(groupRemoveDataObject.getContainerName()); groupData.setGroupId(groupRemoveDataObject.getGroupId()); groupData.setGroupType(groupRemoveDataObject.getGroupType()); groupData.setNode(groupRemoveDataObject.getNode()); // installedSwGroupView.put(groupKey, groupAddDataObject); groupService.removeGroup(groupData.build()); } return groupOperationStatus; } private RpcResult commitToPlugin(InternalTransaction transaction) { for (Entry, Group> entry : transaction.additions.entrySet()) { if (!addGroup(entry.getKey(), entry.getValue()).isSuccess()) { transaction.additions.remove(entry.getKey()); return Rpcs.getRpcResult(false, null, Collections.emptySet()); } } for (Entry, Group> entry : transaction.updates.entrySet()) { if (!updateGroup(entry.getKey(), entry.getValue()).isSuccess()) { transaction.updates.remove(entry.getKey()); return Rpcs.getRpcResult(false, null, Collections.emptySet()); } } for (InstanceIdentifier groupId : transaction.removals) { DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(groupId); if(removeValue instanceof Group) { if(!removeGroup(groupId, (Group)removeValue).isSuccess()) { return Rpcs.getRpcResult(false, null, Collections.emptySet()); } } } return Rpcs.getRpcResult(true, null, null); } private final class GroupDataCommitHandler implements DataCommitHandler, DataObject> { @Override public DataCommitTransaction, DataObject> requestCommit( DataModification, DataObject> modification) { // We should verify transaction System.out.println("Coming in GroupDatacommitHandler"); InternalTransaction transaction = new InternalTransaction(modification); transaction.prepareUpdate(); return transaction; } } private final class InternalTransaction implements DataCommitTransaction, DataObject> { private final DataModification, DataObject> modification; @Override public DataModification, DataObject> getModification() { return modification; } public InternalTransaction(DataModification, DataObject> modification) { this.modification = modification; } Map, Group> additions = new HashMap<>(); Map, Group> updates = new HashMap<>(); Set> removals = new HashSet<>(); /** * We create a plan which flows will be added, which will be updated and * which will be removed based on our internal state. * */ void prepareUpdate() { Set, DataObject>> groupAdded = modification.getCreatedConfigurationData().entrySet(); for (Entry, DataObject> entry : groupAdded) { if (entry.getValue() instanceof Group) { Group group = (Group) entry.getValue(); additions.put(entry.getKey(), group); } } Set, DataObject>> groupUpdate = modification.getUpdatedConfigurationData().entrySet(); for (Entry, DataObject> entry : groupUpdate) { if (entry.getValue() instanceof Group) { Group group = (Group) entry.getValue(); ///will be fixed once getUpdatedConfigurationData returns only updated data not created data with it. if (additions.containsKey(entry.getKey())) { updates.put(entry.getKey(), group); } } } removals = modification.getRemovedConfigurationData(); } /** * We are OK to go with execution of plan * */ @Override public RpcResult finish() throws IllegalStateException { RpcResult rpcStatus = commitToPlugin(this); // We return true if internal transaction is successful. // return Rpcs.getRpcResult(true, null, Collections.emptySet()); return rpcStatus; } /** * * We should rollback our preparation * */ @Override public RpcResult rollback() throws IllegalStateException { // NOOP - we did not modified any internal state during // requestCommit phase // return Rpcs.getRpcResult(true, null, Collections.emptySet()); return Rpcs.getRpcResult(true, null, Collections.emptySet()); } } final class GroupEventListener implements SalGroupListener { List addedGroups = new ArrayList<>(); List removedGroups = new ArrayList<>(); List updatedGroups = new ArrayList<>(); @Override public void onGroupAdded(GroupAdded notification) { System.out.println("added Group.........................."); addedGroups.add(notification); } @Override public void onGroupRemoved(GroupRemoved notification) { // TODO Auto-generated method stub } @Override public void onGroupUpdated(GroupUpdated notification) { // TODO Auto-generated method stub } } @Override public List get() { List orderedList = new ArrayList(); Collection groupList = originalSwGroupView.values(); for (Iterator iterator = groupList.iterator(); iterator.hasNext();) { orderedList.add(iterator.next()); } return orderedList; } @Override public DataObject getWithName(String name, Node n) { if (this instanceof GroupConsumerImpl) { Collection groupList = originalSwGroupView.values(); for (Iterator iterator = groupList.iterator(); iterator.hasNext();) { Group group = iterator.next(); if (group.getNode().equals(n) && group.getGroupName().equals(name)) { return group; } } } return null; } }