OPNFLWPLUG-991 : FRM: Flow Group Dependency support
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / GroupForwarder.java
index fc6cdc4c1d1638ae33e4ced53b7893eaaac720c0..29c166e0cf7adee3efce1004945d4473d0daa385 100644 (file)
@@ -22,10 +22,12 @@ import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.OriginalGroupBuilder;
@@ -36,6 +38,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -97,13 +100,17 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
 
         final Group group = removeDataObj;
         final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group);
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
 
         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setGroupRef(new GroupRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
 
-        final Future<RpcResult<RemoveGroupOutput>> resultFuture =
+        final ListenableFuture<RpcResult<RemoveGroupOutput>> resultFuture =
                 this.provider.getSalGroupService().removeGroup(builder.build());
+        Futures.addCallback(resultFuture,
+                new RemoveGroupCallBack(removeDataObj.getGroupId().getValue(), nodeId),
+                MoreExecutors.directExecutor());
         JdkFutures.addErrorLogging(resultFuture, LOG, "removeGroup");
     }
 
@@ -128,16 +135,22 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
         final Group originalGroup = original;
         final Group updatedGroup = update;
         final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder();
-
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setGroupRef(new GroupRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
         builder.setUpdatedGroup(new UpdatedGroupBuilder(updatedGroup).build());
         builder.setOriginalGroup(new OriginalGroupBuilder(originalGroup).build());
-
-        final Future<RpcResult<UpdateGroupOutput>> resultFuture =
-                this.provider.getSalGroupService().updateGroup(builder.build());
-        JdkFutures.addErrorLogging(resultFuture, LOG, "updateGroup");
+        nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+            UpdateGroupInput updateGroupInput = builder.build();
+            final ListenableFuture<RpcResult<UpdateGroupOutput>> resultFuture;
+            resultFuture = this.provider.getSalGroupService().updateGroup(updateGroupInput);
+            JdkFutures.addErrorLogging(resultFuture, LOG, "updateGroup");
+            Futures.addCallback(resultFuture,
+                    new UpdateGroupCallBack(updateGroupInput.getOriginalGroup().getGroupId().getValue(), nodeId),
+                    MoreExecutors.directExecutor());
+            return resultFuture;
+        });
     }
 
     @Override
@@ -146,11 +159,21 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
 
         final Group group = addDataObj;
         final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
 
         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setGroupRef(new GroupRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-        return this.provider.getSalGroupService().addGroup(builder.build());
+        AddGroupInput addGroupInput = builder.build();
+        return nodeConfigurator
+                .enqueueJob(nodeId.getValue(), () -> {
+                    final ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
+                    resultFuture = this.provider.getSalGroupService().addGroup(addGroupInput);
+                    Futures.addCallback(resultFuture,
+                            new AddGroupCallBack(addGroupInput.getGroupId().getValue(), nodeId),
+                            MoreExecutors.directExecutor());
+                    return resultFuture;
+                });
     }
 
     @Override
@@ -198,4 +221,81 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
         return nodeIdent.child(StaleGroup.class, new StaleGroupKey(new GroupId(staleGroup.getGroupId())));
     }
 
+    private final class AddGroupCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
+        private final Long groupId;
+        private final NodeId nodeId;
+
+        private AddGroupCallBack(final Long groupId, final NodeId nodeId) {
+            this.groupId = groupId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public void onSuccess(RpcResult<AddGroupOutput> result) {
+            if (result.isSuccessful()) {
+                provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
+                LOG.debug("Group add with id {} finished without error for node {}", groupId, nodeId);
+            } else {
+                LOG.debug("Group add with id {} failed for node {} with error {}", groupId, nodeId,
+                        result.getErrors().toString());
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.error("Service call for adding group {} failed for node with error {}", groupId, nodeId, throwable);
+        }
+    }
+
+    private final class UpdateGroupCallBack implements FutureCallback<RpcResult<UpdateGroupOutput>> {
+        private final Long groupId;
+        private final NodeId nodeId;
+
+        private UpdateGroupCallBack(final Long groupId, final NodeId nodeId) {
+            this.groupId = groupId;
+            this.nodeId = nodeId;
+        }
+
+        public void onSuccess(RpcResult<UpdateGroupOutput> result) {
+            if (result.isSuccessful()) {
+                provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
+                LOG.debug("Group update with id {} finished without error for node {}", groupId, nodeId);
+            } else {
+                LOG.debug("Group update with id {} failed for node {} with error {}", groupId, nodeId,
+                        result.getErrors().toString());
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.error("Service call for updating group {} failed for node {} with error {}", groupId, nodeId,
+                    throwable);
+        }
+    }
+
+    private final class RemoveGroupCallBack implements FutureCallback<RpcResult<RemoveGroupOutput>> {
+        private final Long groupId;
+        private final NodeId nodeId;
+
+        private RemoveGroupCallBack(final Long groupId, final NodeId nodeId) {
+            this.groupId = groupId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public void onSuccess(RpcResult<RemoveGroupOutput> result) {
+            if (result.isSuccessful()) {
+                LOG.debug("Group remove with id {} finished without error for node {}", groupId, nodeId);
+                provider.getDevicesGroupRegistry().removeGroup(nodeId, groupId);
+            } else {
+                LOG.debug("Group remove with id {} failed for node {} with error {}", groupId, nodeId,
+                        result.getErrors().toString());
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.error("Service call for removing group {} failed for node with error {}", groupId, nodeId, throwable);
+        }
+    }
 }