X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager_mdsal%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager_mdsal%2Fconsumer%2Fimpl%2FGroupConsumerImpl.java;h=73295cc1e79a204575f6ffd72e00c2c7edf018a9;hp=cc42e21f2a1fc8e0199455f9d7cc65d6b3521183;hb=f27f2337e171c9f944de55a3ac6650b3b1ec0af2;hpb=421d43013a16c9c2e025c66fea06f97e3f927bb9 diff --git a/opendaylight/forwardingrulesmanager_mdsal/openflow/src/main/java/org/opendaylight/controller/forwardingrulesmanager_mdsal/consumer/impl/GroupConsumerImpl.java b/opendaylight/forwardingrulesmanager_mdsal/openflow/src/main/java/org/opendaylight/controller/forwardingrulesmanager_mdsal/consumer/impl/GroupConsumerImpl.java index cc42e21f2a..73295cc1e7 100644 --- a/opendaylight/forwardingrulesmanager_mdsal/openflow/src/main/java/org/opendaylight/controller/forwardingrulesmanager_mdsal/consumer/impl/GroupConsumerImpl.java +++ b/opendaylight/forwardingrulesmanager_mdsal/openflow/src/main/java/org/opendaylight/controller/forwardingrulesmanager_mdsal/consumer/impl/GroupConsumerImpl.java @@ -1,7 +1,476 @@ package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.opendaylight.controller.clustering.services.CacheConfigException; +import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.IClusterContainerServices; +import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; +import org.opendaylight.controller.md.sal.common.api.data.DataModification; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.core.IContainer; +import org.opendaylight.controller.sal.core.Node; +import org.opendaylight.controller.sal.utils.GlobalConstants; +import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.sal.utils.StatusCode; +import org.opendaylight.controller.switchmanager.ISwitchManager; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.NotificationListener; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("unused") public class GroupConsumerImpl { - public GroupConsumerImpl() { - + + protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class); + private GroupEventListener groupEventListener = new GroupEventListener(); + private Registration groupListener; + private SalGroupService groupService; + private GroupDataCommitHandler commitHandler; + + private ConcurrentMap originalSwGroupView; + private ConcurrentMap installedSwGroupView; + + private ConcurrentMap> nodeGroups; + private ConcurrentMap inactiveGroups; + + private IClusterContainerServices clusterGroupContainerService = null; + private IContainer container; + + public GroupConsumerImpl() { + InstanceIdentifier path = InstanceIdentifier.builder().node(Groups.class).node(Group.class).toInstance(); + groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class); + + clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService(); + container = FRMConsumerImpl.getContainer(); + + if (!(cacheStartup())) { + logger.error("Unanle to allocate/retrieve group cache"); + System.out.println("Unable to allocate/retrieve group cache"); + } + + if (null == groupService) { + logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended"); + System.out.println("Consumer SAL Group Service is down or NULL."); + return; + } + + // For switch events + groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener); + + if (null == groupListener) { + logger.error("Listener to listen on group data modifcation events"); + System.out.println("Listener to listen on group data modifcation events."); + return; + } + + commitHandler = new GroupDataCommitHandler(); + FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler); } + + private boolean allocateGroupCaches() { + if (this.clusterGroupContainerService == null) { + logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache"); + return false; + } + + try { + clusterGroupContainerService.createCache("frm.originalSwGroupView", + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + + clusterGroupContainerService.createCache("frm.installedSwGroupView", + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + + clusterGroupContainerService.createCache("frm.inactiveGroups", + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + + clusterGroupContainerService.createCache("frm.nodeGroups", + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + +//TODO for cluster mode + /* clusterGroupContainerService.createCache(WORK_STATUS_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); + + clusterGroupContainerService.createCache(WORK_ORDER_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/ + + } catch (CacheConfigException cce) { + logger.error("Group CacheConfigException"); + return false; + + } catch (CacheExistException cce) { + logger.error(" Group CacheExistException"); + } + + return true; + } + + private void nonClusterGroupObjectCreate() { + originalSwGroupView = new ConcurrentHashMap(); + installedSwGroupView = new ConcurrentHashMap(); + nodeGroups = new ConcurrentHashMap>(); + inactiveGroups = new ConcurrentHashMap(); + } + + @SuppressWarnings({ "unchecked" }) + private boolean retrieveGroupCaches() { + ConcurrentMap map; + + if (this.clusterGroupContainerService == null) { + logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache"); + nonClusterGroupObjectCreate(); + return false; + } + + map = clusterGroupContainerService.getCache("frm.originalSwGroupView"); + if (map != null) { + originalSwGroupView = (ConcurrentMap) map; + } else { + logger.error("Retrieval of cache(originalSwGroupView) failed"); + return false; + } + + map = clusterGroupContainerService.getCache("frm.installedSwGroupView"); + if (map != null) { + installedSwGroupView = (ConcurrentMap) map; + } else { + logger.error("Retrieval of cache(installedSwGroupView) failed"); + return false; + } + + map = clusterGroupContainerService.getCache("frm.inactiveGroups"); + if (map != null) { + inactiveGroups = (ConcurrentMap) map; + } else { + logger.error("Retrieval of cache(inactiveGroups) failed"); + return false; + } + + map = clusterGroupContainerService.getCache("frm.nodeGroups"); + if (map != null) { + nodeGroups = (ConcurrentMap>) map; + } else { + logger.error("Retrieval of cache(nodeGroup) failed"); + return false; + } + + return true; + } + + private boolean cacheStartup() { + if (allocateGroupCaches()) { + if (retrieveGroupCaches()) { + return true; + } + } + + return false; + } + + public Status validateGroup(Group group, FRMUtil.operation operation) { + String containerName; + String groupName; + Iterator bucketIterator; + boolean returnResult; + Buckets groupBuckets; + + if (null != group) { + containerName = group.getContainerName(); + + if (null == containerName) { + containerName = GlobalConstants.DEFAULT.toString(); + } + else if (!FRMUtil.isNameValid(containerName)) { + logger.error("Container Name is invalid %s" + containerName); + return new Status(StatusCode.BADREQUEST, "Container Name is invalid"); + } + + groupName = group.getGroupName(); + if (!FRMUtil.isNameValid(groupName)) { + logger.error("Group Name is invalid %s" + groupName); + return new Status(StatusCode.BADREQUEST, "Group Name is invalid"); + } + + returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName); + + if (FRMUtil.operation.ADD == operation && returnResult) { + logger.error("Record with same Group Name exists"); + return new Status(StatusCode.BADREQUEST, "Group record exists"); + } + else if (!returnResult) { + logger.error("Group record does not exist"); + return new Status(StatusCode.BADREQUEST, "Group record does not exist"); + } + + if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && + group.getGroupType().getIntValue() <= GroupType.GroupFf.getIntValue())) { + logger.error("Invalid Group type %d" + group.getGroupType().getIntValue()); + return new Status(StatusCode.BADREQUEST, "Invalid Group type"); + } + + groupBuckets = group.getBuckets(); + + if (null != groupBuckets && null != groupBuckets.getBucket()) { + bucketIterator = groupBuckets.getBucket().iterator(); + + while (bucketIterator.hasNext()) { + if(!(FRMUtil.areActionsValid(bucketIterator.next().getActions()))) { + logger.error("Error in action bucket"); + return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents"); + } + } + } + } + + return new Status(StatusCode.SUCCESS); + + } + + private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) { + if (! originalSwGroupView.containsKey(key)) { + return false; + } + + for (ConcurrentMap.Entry entry : originalSwGroupView.entrySet()) { + if (entry.getValue().getGroupName().equals(groupName)) { + if (entry.getValue().getContainerName().equals(containerName)) { + return true; + } + } + } + return false; + } + + + /** + * Update Group entries to the southbound plugin/inventory and our internal database + * + * @param path + * @param dataObject + */ + private Status updateGroup(InstanceIdentifier path, Group groupUpdateDataObject) { + GroupKey groupKey = groupUpdateDataObject.getKey(); + Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE); + + if (!groupOperationStatus.isSuccess()) { + logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName()); + return groupOperationStatus; + } + + originalSwGroupView.remove(groupKey); + originalSwGroupView.put(groupKey, groupUpdateDataObject); + + if (groupUpdateDataObject.isInstall()) { + UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder(); + //TODO how to get original group and modified group. + + if (installedSwGroupView.containsKey(groupKey)) { + installedSwGroupView.remove(groupKey); + } + + installedSwGroupView.put(groupKey, groupUpdateDataObject); + groupService.updateGroup(groupData.build()); + } + + return groupOperationStatus; + } + + /** + * Adds Group to the southbound plugin and our internal database + * + * @param path + * @param dataObject + */ + private Status addGroup(InstanceIdentifier path, Group groupAddDataObject) { + GroupKey groupKey = groupAddDataObject.getKey(); + Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD); + + if (!groupOperationStatus.isSuccess()) { + logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName()); + return groupOperationStatus; + } + + originalSwGroupView.put(groupKey, groupAddDataObject); + + if (groupAddDataObject.isInstall()) { + AddGroupInputBuilder groupData = new AddGroupInputBuilder(); + groupData.setBuckets(groupAddDataObject.getBuckets()); + groupData.setContainerName(groupAddDataObject.getContainerName()); + groupData.setGroupId(groupAddDataObject.getGroupId()); + groupData.setGroupType(groupAddDataObject.getGroupType()); + groupData.setNode(groupAddDataObject.getNode()); + installedSwGroupView.put(groupKey, groupAddDataObject); + groupService.addGroup(groupData.build()); + } + + return groupOperationStatus; + } + + private RpcResult commitToPlugin(internalTransaction transaction) { + for(Entry, Group> entry :transaction.additions.entrySet()) { + + if (!addGroup(entry.getKey(),entry.getValue()).isSuccess()) { + return Rpcs.getRpcResult(false, null, null); + } + } + for(@SuppressWarnings("unused") Entry, Group> entry :transaction.additions.entrySet()) { + + if (!updateGroup(entry.getKey(),entry.getValue()).isSuccess()) { + return Rpcs.getRpcResult(false, null, null); + } + } + + for(InstanceIdentifier removal : transaction.removals) { + // removeFlow(removal); + } + + return Rpcs.getRpcResult(true, null, null); + } + + private final class GroupDataCommitHandler implements DataCommitHandler, DataObject> { + + @SuppressWarnings("unchecked") + @Override + public DataCommitTransaction requestCommit(DataModification, DataObject> modification) { + // We should verify transaction + System.out.println("Coming in GroupDatacommitHandler"); + internalTransaction transaction = new internalTransaction(modification); + transaction.prepareUpdate(); + return transaction; + } + } + + private final class internalTransaction implements DataCommitTransaction, DataObject> { + + private final DataModification, DataObject> modification; + + @Override + public DataModification, DataObject> getModification() { + return modification; + } + + public internalTransaction(DataModification, DataObject> modification) { + this.modification = modification; + } + + Map, Group> additions = new HashMap<>(); + Map, Group> updates = new HashMap<>(); + Set> removals = new HashSet<>(); + + /** + * We create a plan which flows will be added, which will be updated and + * which will be removed based on our internal state. + * + */ + void prepareUpdate() { + + Set, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet(); + for (Entry, DataObject> entry : puts) { + if (entry.getValue() instanceof Group) { + Group group = (Group) entry.getValue(); + preparePutEntry(entry.getKey(), group); + } + + } + + removals = modification.getRemovedConfigurationData(); + } + + private void preparePutEntry(InstanceIdentifier key, Group group) { + + Group original = originalSwGroupView.get(key); + if (original != null) { + // It is update for us + + updates.put(key, group); + } else { + // It is addition for us + + additions.put(key, group); + } + } + + /** + * We are OK to go with execution of plan + * + */ + @Override + public RpcResult finish() throws IllegalStateException { + + RpcResult rpcStatus = commitToPlugin(this); + // We return true if internal transaction is successful. + // return Rpcs.getRpcResult(true, null, Collections.emptySet()); + return rpcStatus; + } + + /** + * + * We should rollback our preparation + * + */ + @Override + public RpcResult rollback() throws IllegalStateException { + // NOOP - we did not modified any internal state during + // requestCommit phase + // return Rpcs.getRpcResult(true, null, Collections.emptySet()); + return Rpcs.getRpcResult(true, null, null); + + } + + } + + + final class GroupEventListener implements SalGroupListener { + + List addedGroups = new ArrayList<>(); + List removedGroups = new ArrayList<>(); + List updatedGroups = new ArrayList<>(); + + + @Override + public void onGroupAdded(GroupAdded notification) { + System.out.println("added Group.........................."); + addedGroups.add(notification); + } + + @Override + public void onGroupRemoved(GroupRemoved notification) { + // TODO Auto-generated method stub + + } + + @Override + public void onGroupUpdated(GroupUpdated notification) { + // TODO Auto-generated method stub + + } + } }