package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataModification; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings("unused") public class GroupConsumerImpl { protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class); private GroupEventListener groupEventListener = new GroupEventListener(); private Registration groupListener; private SalGroupService groupService; private GroupDataCommitHandler commitHandler; private ConcurrentMap originalSwGroupView; private ConcurrentMap installedSwGroupView; private ConcurrentMap> nodeGroups; private ConcurrentMap inactiveGroups; private IClusterContainerServices clusterGroupContainerService = null; private ISwitchManager switchGroupManager; private IContainer container; public GroupConsumerImpl() { InstanceIdentifier path = InstanceIdentifier.builder().node(Groups.class).toInstance(); groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class); clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService(); switchGroupManager = FRMConsumerImpl.getSwitchManager(); container = FRMConsumerImpl.getContainer(); if (!(cacheStartup())) { logger.error("Unanle to allocate/retrieve group cache"); System.out.println("Unable to allocate/retrieve group cache"); } if (null == groupService) { logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended"); System.out.println("Consumer SAL Group Service is down or NULL."); return; } // For switch events groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener); if (null == groupListener) { logger.error("Listener to listen on group data modifcation events"); System.out.println("Listener to listen on group data modifcation events."); return; } commitHandler = new GroupDataCommitHandler(); FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler); } private boolean allocateGroupCaches() { if (this.clusterGroupContainerService == null) { logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache"); return false; } try { clusterGroupContainerService.createCache("frm.originalSwGroupView", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterGroupContainerService.createCache("frm.installedSwGroupView", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterGroupContainerService.createCache("frm.inactiveGroups", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterGroupContainerService.createCache("frm.nodeGroups", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); //TODO for cluster mode /* clusterGroupContainerService.createCache(WORK_STATUS_CACHE, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); clusterGroupContainerService.createCache(WORK_ORDER_CACHE, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/ } catch (CacheConfigException cce) { logger.error("Group CacheConfigException"); return false; } catch (CacheExistException cce) { logger.error(" Group CacheExistException"); } return true; } private void nonClusterGroupObjectCreate() { originalSwGroupView = new ConcurrentHashMap(); installedSwGroupView = new ConcurrentHashMap(); nodeGroups = new ConcurrentHashMap>(); inactiveGroups = new ConcurrentHashMap(); } @SuppressWarnings({ "unchecked" }) private boolean retrieveGroupCaches() { ConcurrentMap map; if (this.clusterGroupContainerService == null) { logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache"); nonClusterGroupObjectCreate(); return false; } map = clusterGroupContainerService.getCache("frm.originalSwGroupView"); if (map != null) { originalSwGroupView = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(originalSwGroupView) failed"); return false; } map = clusterGroupContainerService.getCache("frm.installedSwGroupView"); if (map != null) { installedSwGroupView = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(installedSwGroupView) failed"); return false; } map = clusterGroupContainerService.getCache("frm.inactiveGroups"); if (map != null) { inactiveGroups = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(inactiveGroups) failed"); return false; } map = clusterGroupContainerService.getCache("frm.nodeGroups"); if (map != null) { nodeGroups = (ConcurrentMap>) map; } else { logger.error("Retrieval of cache(nodeGroup) failed"); return false; } return true; } private boolean cacheStartup() { if (allocateGroupCaches()) { if (retrieveGroupCaches()) { return true; } } return false; } public Status validateGroup(Group group, FRMUtil.operation operation) { String containerName; String groupName; Iterator bucketIterator; boolean returnResult; Buckets groupBuckets; if (null != group) { containerName = group.getContainerName(); if (null == containerName) { containerName = GlobalConstants.DEFAULT.toString(); } else if (!FRMUtil.isNameValid(containerName)) { logger.error("Container Name is invalid %s" + containerName); return new Status(StatusCode.BADREQUEST, "Container Name is invalid"); } groupName = group.getGroupName(); if (!FRMUtil.isNameValid(groupName)) { logger.error("Group Name is invalid %s" + groupName); return new Status(StatusCode.BADREQUEST, "Group Name is invalid"); } returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName); if (FRMUtil.operation.ADD == operation && returnResult) { logger.error("Record with same Group Name exists"); return new Status(StatusCode.BADREQUEST, "Group record exists"); } else if (!returnResult) { logger.error("Group record does not exist"); return new Status(StatusCode.BADREQUEST, "Group record does not exist"); } if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && group.getGroupType().getIntValue() <= GroupType.GroupFf.getIntValue())) { logger.error("Invalid Group type %d" + group.getGroupType().getIntValue()); return new Status(StatusCode.BADREQUEST, "Invalid Group type"); } groupBuckets = group.getBuckets(); if (null != groupBuckets && null != groupBuckets.getBucket()) { bucketIterator = groupBuckets.getBucket().iterator(); while (bucketIterator.hasNext()) { if(!(FRMUtil.areActionsValid(bucketIterator.next().getActions()))) { logger.error("Error in action bucket"); return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents"); } } } } return new Status(StatusCode.SUCCESS); } private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) { if (! originalSwGroupView.containsKey(key)) { return false; } for (ConcurrentMap.Entry entry : originalSwGroupView.entrySet()) { if (entry.getValue().getGroupName().equals(groupName)) { if (entry.getValue().getContainerName().equals(containerName)) { return true; } } } return false; } /** * Update Group entries to the southbound plugin/inventory and our internal database * * @param path * @param dataObject */ private Status updateGroup(InstanceIdentifier path, Group groupUpdateDataObject) { GroupKey groupKey = groupUpdateDataObject.getKey(); Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE); if (!groupOperationStatus.isSuccess()) { logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName()); return groupOperationStatus; } originalSwGroupView.remove(groupKey); originalSwGroupView.put(groupKey, groupUpdateDataObject); if (groupUpdateDataObject.isInstall()) { UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder(); //TODO how to get original group and modified group. if (installedSwGroupView.containsKey(groupKey)) { installedSwGroupView.remove(groupKey); } installedSwGroupView.put(groupKey, groupUpdateDataObject); groupService.updateGroup(groupData.build()); } return groupOperationStatus; } /** * Adds Group to the southbound plugin and our internal database * * @param path * @param dataObject */ private Status addGroup(InstanceIdentifier path, Group groupAddDataObject) { GroupKey groupKey = groupAddDataObject.getKey(); Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD); if (!groupOperationStatus.isSuccess()) { logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName()); return groupOperationStatus; } validateGroup(groupAddDataObject, FRMUtil.operation.ADD); originalSwGroupView.put(groupKey, groupAddDataObject); if (groupAddDataObject.isInstall()) { AddGroupInputBuilder groupData = new AddGroupInputBuilder(); groupData.setBuckets(groupAddDataObject.getBuckets()); groupData.setContainerName(groupAddDataObject.getContainerName()); groupData.setGroupId(groupAddDataObject.getGroupId()); groupData.setGroupType(groupAddDataObject.getGroupType()); groupData.setNode(groupAddDataObject.getNode()); installedSwGroupView.put(groupKey, groupAddDataObject); groupService.addGroup(groupData.build()); } return groupOperationStatus; } private RpcResult commitToPlugin(internalTransaction transaction) { for(Entry, Group> entry :transaction.additions.entrySet()) { if (!addGroup(entry.getKey(),entry.getValue()).isSuccess()) { return Rpcs.getRpcResult(false, null, null); } } for(@SuppressWarnings("unused") Entry, Group> entry :transaction.additions.entrySet()) { if (!updateGroup(entry.getKey(),entry.getValue()).isSuccess()) { return Rpcs.getRpcResult(false, null, null); } } for(InstanceIdentifier removal : transaction.removals) { // removeFlow(removal); } return Rpcs.getRpcResult(true, null, null); } private final class GroupDataCommitHandler implements DataCommitHandler, DataObject> { @SuppressWarnings("unchecked") @Override public DataCommitTransaction requestCommit(DataModification, DataObject> modification) { // We should verify transaction System.out.println("Coming in FlowDatacommitHandler"); internalTransaction transaction = new internalTransaction(modification); transaction.prepareUpdate(); return transaction; } } private final class internalTransaction implements DataCommitTransaction, DataObject> { private final DataModification, DataObject> modification; @Override public DataModification, DataObject> getModification() { return modification; } public internalTransaction(DataModification, DataObject> modification) { this.modification = modification; } Map, Group> additions = new HashMap<>(); Map, Group> updates = new HashMap<>(); Set> removals = new HashSet<>(); /** * We create a plan which flows will be added, which will be updated and * which will be removed based on our internal state. * */ void prepareUpdate() { Set, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet(); for (Entry, DataObject> entry : puts) { if (entry.getValue() instanceof Group) { Group group = (Group) entry.getValue(); preparePutEntry(entry.getKey(), group); } } removals = modification.getRemovedConfigurationData(); } private void preparePutEntry(InstanceIdentifier key, Group group) { Group original = originalSwGroupView.get(key); if (original != null) { // It is update for us updates.put(key, group); } else { // It is addition for us additions.put(key, group); } } /** * We are OK to go with execution of plan * */ @Override public RpcResult finish() throws IllegalStateException { RpcResult rpcStatus = commitToPlugin(this); // We return true if internal transaction is successful. // return Rpcs.getRpcResult(true, null, Collections.emptySet()); return rpcStatus; } /** * * We should rollback our preparation * */ @Override public RpcResult rollback() throws IllegalStateException { // NOOP - we did not modified any internal state during // requestCommit phase // return Rpcs.getRpcResult(true, null, Collections.emptySet()); return Rpcs.getRpcResult(true, null, null); } } final class GroupEventListener implements SalGroupListener { List addedGroups = new ArrayList<>(); List removedGroups = new ArrayList<>(); List updatedGroups = new ArrayList<>(); @Override public void onGroupAdded(GroupAdded notification) { System.out.println("added Group.........................."); addedGroups.add(notification); } @Override public void onGroupRemoved(GroupRemoved notification) { // TODO Auto-generated method stub } @Override public void onGroupUpdated(GroupUpdated notification) { // TODO Auto-generated method stub } } }