Added flow and group NSF.
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / GroupConsumerImpl.java
index cc42e21f2a1fc8e0199455f9d7cc65d6b3521183..b1aaba4ee5a3d5f183a8213c89406f3b9d177f58 100644 (file)
@@ -1,7 +1,477 @@
 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
 
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.opendaylight.controller.clustering.services.CacheConfigException;
+import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
+import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.IContainer;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.Groups;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.config.rev131024.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes.GroupType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused")
 public class GroupConsumerImpl {
-       public GroupConsumerImpl() {
-               
-       }
+
+    protected static final Logger logger = LoggerFactory.getLogger(GroupConsumerImpl.class);
+    private GroupEventListener groupEventListener = new GroupEventListener();
+    private Registration<NotificationListener> groupListener;
+    private SalGroupService groupService;
+    private GroupDataCommitHandler commitHandler;
+
+    private ConcurrentMap<GroupKey, Group> originalSwGroupView;
+    private ConcurrentMap<GroupKey, Group> installedSwGroupView;
+
+    private ConcurrentMap<Node, List<Group>> nodeGroups;
+    private ConcurrentMap<GroupKey, Group> inactiveGroups;
+
+    private IClusterContainerServices clusterGroupContainerService = null;
+    private IContainer container;
+
+    public GroupConsumerImpl() {
+        InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Groups.class)
+                .node(Group.class).toInstance();
+        groupService = FRMConsumerImpl.getProviderSession().getRpcService(SalGroupService.class);
+
+        clusterGroupContainerService = FRMConsumerImpl.getClusterContainerService();
+        container = FRMConsumerImpl.getContainer();
+
+        if (!(cacheStartup())) {
+            logger.error("Unanle to allocate/retrieve group cache");
+            System.out.println("Unable to allocate/retrieve group cache");
+        }
+
+        if (null == groupService) {
+            logger.error("Consumer SAL Group Service is down or NULL. FRM may not function as intended");
+            System.out.println("Consumer SAL Group Service is down or NULL.");
+            return;
+        }
+
+        // For switch events
+        groupListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(groupEventListener);
+
+        if (null == groupListener) {
+            logger.error("Listener to listen on group data modifcation events");
+            System.out.println("Listener to listen on group data modifcation events.");
+            return;
+        }
+
+        commitHandler = new GroupDataCommitHandler();
+        FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
+    }
+
+    private boolean allocateGroupCaches() {
+        if (this.clusterGroupContainerService == null) {
+            logger.warn("Group: Un-initialized clusterGroupContainerService, can't create cache");
+            return false;
+        }
+
+        try {
+            clusterGroupContainerService.createCache("frm.originalSwGroupView",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+
+            clusterGroupContainerService.createCache("frm.installedSwGroupView",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+
+            clusterGroupContainerService.createCache("frm.inactiveGroups",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+
+            clusterGroupContainerService.createCache("frm.nodeGroups",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+
+            // TODO for cluster mode
+            /*
+             * clusterGroupContainerService.createCache(WORK_STATUS_CACHE,
+             * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
+             * IClusterServices.cacheMode.ASYNC));
+             *
+             * clusterGroupContainerService.createCache(WORK_ORDER_CACHE,
+             * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
+             * IClusterServices.cacheMode.ASYNC));
+             */
+
+        } catch (CacheConfigException cce) {
+            logger.error("Group CacheConfigException");
+            return false;
+
+        } catch (CacheExistException cce) {
+            logger.error(" Group CacheExistException");
+        }
+
+        return true;
+    }
+
+    private void nonClusterGroupObjectCreate() {
+        originalSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
+        installedSwGroupView = new ConcurrentHashMap<GroupKey, Group>();
+        nodeGroups = new ConcurrentHashMap<Node, List<Group>>();
+        inactiveGroups = new ConcurrentHashMap<GroupKey, Group>();
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private boolean retrieveGroupCaches() {
+        ConcurrentMap<?, ?> map;
+
+        if (this.clusterGroupContainerService == null) {
+            logger.warn("Group: un-initialized clusterGroupContainerService, can't retrieve cache");
+            nonClusterGroupObjectCreate();
+            return false;
+        }
+
+        map = clusterGroupContainerService.getCache("frm.originalSwGroupView");
+        if (map != null) {
+            originalSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
+        } else {
+            logger.error("Retrieval of cache(originalSwGroupView) failed");
+            return false;
+        }
+
+        map = clusterGroupContainerService.getCache("frm.installedSwGroupView");
+        if (map != null) {
+            installedSwGroupView = (ConcurrentMap<GroupKey, Group>) map;
+        } else {
+            logger.error("Retrieval of cache(installedSwGroupView) failed");
+            return false;
+        }
+
+        map = clusterGroupContainerService.getCache("frm.inactiveGroups");
+        if (map != null) {
+            inactiveGroups = (ConcurrentMap<GroupKey, Group>) map;
+        } else {
+            logger.error("Retrieval of cache(inactiveGroups) failed");
+            return false;
+        }
+
+        map = clusterGroupContainerService.getCache("frm.nodeGroups");
+        if (map != null) {
+            nodeGroups = (ConcurrentMap<Node, List<Group>>) map;
+        } else {
+            logger.error("Retrieval of cache(nodeGroup) failed");
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean cacheStartup() {
+        if (allocateGroupCaches()) {
+            if (retrieveGroupCaches()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public Status validateGroup(Group group, FRMUtil.operation operation) {
+        String containerName;
+        String groupName;
+        Iterator<Bucket> bucketIterator;
+        boolean returnResult;
+        Buckets groupBuckets;
+
+        if (null != group) {
+            containerName = group.getContainerName();
+
+            if (null == containerName) {
+                containerName = GlobalConstants.DEFAULT.toString();
+            } else if (!FRMUtil.isNameValid(containerName)) {
+                logger.error("Container Name is invalid %s" + containerName);
+                return new Status(StatusCode.BADREQUEST, "Container Name is invalid");
+            }
+
+            groupName = group.getGroupName();
+            if (!FRMUtil.isNameValid(groupName)) {
+                logger.error("Group Name is invalid %s" + groupName);
+                return new Status(StatusCode.BADREQUEST, "Group Name is invalid");
+            }
+
+            returnResult = doesGroupEntryExists(group.getKey(), groupName, containerName);
+
+            if (FRMUtil.operation.ADD == operation && returnResult) {
+                logger.error("Record with same Group Name exists");
+                return new Status(StatusCode.BADREQUEST, "Group record exists");
+            } else if (!returnResult) {
+                logger.error("Group record does not exist");
+                return new Status(StatusCode.BADREQUEST, "Group record does not exist");
+            }
+
+            if (!(group.getGroupType().getIntValue() >= GroupType.GroupAll.getIntValue() && group.getGroupType()
+                    .getIntValue() <= GroupType.GroupFf.getIntValue())) {
+                logger.error("Invalid Group type %d" + group.getGroupType().getIntValue());
+                return new Status(StatusCode.BADREQUEST, "Invalid Group type");
+            }
+
+            groupBuckets = group.getBuckets();
+
+            if (null != groupBuckets && null != groupBuckets.getBucket()) {
+                bucketIterator = groupBuckets.getBucket().iterator();
+
+                while (bucketIterator.hasNext()) {
+                    if (!(FRMUtil.validateActions(bucketIterator.next().getAction()))) {
+                        logger.error("Error in action bucket");
+                        return new Status(StatusCode.BADREQUEST, "Invalid Group bucket contents");
+                    }
+                }
+            }
+        }
+
+        return new Status(StatusCode.SUCCESS);
+
+    }
+
+    private boolean doesGroupEntryExists(GroupKey key, String groupName, String containerName) {
+        if (!originalSwGroupView.containsKey(key)) {
+            return false;
+        }
+
+        for (ConcurrentMap.Entry<GroupKey, Group> entry : originalSwGroupView.entrySet()) {
+            if (entry.getValue().getGroupName().equals(groupName)) {
+                if (entry.getValue().getContainerName().equals(containerName)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Update Group entries to the southbound plugin/inventory and our internal
+     * database
+     *
+     * @param path
+     * @param dataObject
+     */
+    private Status updateGroup(InstanceIdentifier<?> path, Group groupUpdateDataObject) {
+        GroupKey groupKey = groupUpdateDataObject.getKey();
+        Status groupOperationStatus = validateGroup(groupUpdateDataObject, FRMUtil.operation.UPDATE);
+
+        if (!groupOperationStatus.isSuccess()) {
+            logger.error("Group data object validation failed %s" + groupUpdateDataObject.getGroupName());
+            return groupOperationStatus;
+        }
+
+        originalSwGroupView.remove(groupKey);
+        originalSwGroupView.put(groupKey, groupUpdateDataObject);
+
+        if (groupUpdateDataObject.isInstall()) {
+            UpdateGroupInputBuilder groupData = new UpdateGroupInputBuilder();
+            // TODO how to get original group and modified group.
+
+            if (installedSwGroupView.containsKey(groupKey)) {
+                installedSwGroupView.remove(groupKey);
+            }
+
+            installedSwGroupView.put(groupKey, groupUpdateDataObject);
+            groupService.updateGroup(groupData.build());
+        }
+
+        return groupOperationStatus;
+    }
+
+    /**
+     * Adds Group to the southbound plugin and our internal database
+     *
+     * @param path
+     * @param dataObject
+     */
+    private Status addGroup(InstanceIdentifier<?> path, Group groupAddDataObject) {
+        GroupKey groupKey = groupAddDataObject.getKey();
+        Status groupOperationStatus = validateGroup(groupAddDataObject, FRMUtil.operation.ADD);
+
+        if (!groupOperationStatus.isSuccess()) {
+            logger.error("Group data object validation failed %s" + groupAddDataObject.getGroupName());
+            return groupOperationStatus;
+        }
+
+        originalSwGroupView.put(groupKey, groupAddDataObject);
+
+        if (groupAddDataObject.isInstall()) {
+            AddGroupInputBuilder groupData = new AddGroupInputBuilder();
+            groupData.setBuckets(groupAddDataObject.getBuckets());
+            groupData.setContainerName(groupAddDataObject.getContainerName());
+            groupData.setGroupId(groupAddDataObject.getGroupId());
+            groupData.setGroupType(groupAddDataObject.getGroupType());
+            groupData.setNode(groupAddDataObject.getNode());
+            installedSwGroupView.put(groupKey, groupAddDataObject);
+            groupService.addGroup(groupData.build());
+        }
+
+        return groupOperationStatus;
+    }
+
+    private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
+        for (Entry<InstanceIdentifier<?>, Group> entry : transaction.additions.entrySet()) {
+
+            if (!addGroup(entry.getKey(), entry.getValue()).isSuccess()) {
+                return Rpcs.getRpcResult(false, null, null);
+            }
+        }
+        for (@SuppressWarnings("unused")
+        Entry<InstanceIdentifier<?>, Group> entry : transaction.additions.entrySet()) {
+
+            if (!updateGroup(entry.getKey(), entry.getValue()).isSuccess()) {
+                return Rpcs.getRpcResult(false, null, null);
+            }
+        }
+
+        for (InstanceIdentifier<?> removal : transaction.removals) {
+            // removeFlow(removal);
+        }
+
+        return Rpcs.getRpcResult(true, null, null);
+    }
+
+    private final class GroupDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
+            // We should verify transaction
+            System.out.println("Coming in GroupDatacommitHandler");
+            internalTransaction transaction = new internalTransaction(modification);
+            transaction.prepareUpdate();
+            return transaction;
+        }
+    }
+
+    private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
+
+        private final DataModification<InstanceIdentifier<?>, DataObject> modification;
+
+        @Override
+        public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
+            return modification;
+        }
+
+        public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
+            this.modification = modification;
+        }
+
+        Map<InstanceIdentifier<?>, Group> additions = new HashMap<>();
+        Map<InstanceIdentifier<?>, Group> updates = new HashMap<>();
+        Set<InstanceIdentifier<?>> removals = new HashSet<>();
+
+        /**
+         * We create a plan which flows will be added, which will be updated and
+         * which will be removed based on our internal state.
+         *
+         */
+        void prepareUpdate() {
+
+            Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
+            for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
+                if (entry.getValue() instanceof Group) {
+                    Group group = (Group) entry.getValue();
+                    preparePutEntry(entry.getKey(), group);
+                }
+
+            }
+
+            removals = modification.getRemovedConfigurationData();
+        }
+
+        private void preparePutEntry(InstanceIdentifier<?> key, Group group) {
+
+            Group original = originalSwGroupView.get(key);
+            if (original != null) {
+                // It is update for us
+
+                updates.put(key, group);
+            } else {
+                // It is addition for us
+
+                additions.put(key, group);
+            }
+        }
+
+        /**
+         * We are OK to go with execution of plan
+         *
+         */
+        @Override
+        public RpcResult<Void> finish() throws IllegalStateException {
+
+            RpcResult<Void> rpcStatus = commitToPlugin(this);
+            // We return true if internal transaction is successful.
+            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+            return rpcStatus;
+        }
+
+        /**
+         *
+         * We should rollback our preparation
+         *
+         */
+        @Override
+        public RpcResult<Void> rollback() throws IllegalStateException {
+            // NOOP - we did not modified any internal state during
+            // requestCommit phase
+            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+            return Rpcs.getRpcResult(true, null, null);
+
+        }
+
+    }
+
+    final class GroupEventListener implements SalGroupListener {
+
+        List<GroupAdded> addedGroups = new ArrayList<>();
+        List<GroupRemoved> removedGroups = new ArrayList<>();
+        List<GroupUpdated> updatedGroups = new ArrayList<>();
+
+        @Override
+        public void onGroupAdded(GroupAdded notification) {
+            System.out.println("added Group..........................");
+            addedGroups.add(notification);
+        }
+
+        @Override
+        public void onGroupRemoved(GroupRemoved notification) {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public void onGroupUpdated(GroupUpdated notification) {
+            // TODO Auto-generated method stub
+
+        }
+    }
 }