Merge "Migrate to LoggingFutures"
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / BundleFlowForwarder.java
index 351f703dbb0056916292657c4d3adc5704aa5277..b1827a31403cd9b4b25e7eb1d3c669c1656546c0 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.frm.impl;
 
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
@@ -14,7 +13,6 @@ import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getN
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -22,13 +20,15 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
@@ -94,21 +94,26 @@ public class BundleFlowForwarder {
                 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
         LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
                 bundleId.getValue(), node);
-        JdkFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
+        LoggingFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
     }
 
     public void update(final InstanceIdentifier<Flow> identifier, final Flow originalFlow, final Flow updatedFlow,
             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
         final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
         nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
-            BundleInnerMessage bundleInnerMessage = new BundleUpdateFlowCaseBuilder()
+            BundleInnerMessage innerDeleteMessage = new BundleRemoveFlowCaseBuilder()
+                    .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(originalFlow).build()).build();
+            Message deleteMessage = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                    .setBundleInnerMessage(innerDeleteMessage).build();
+            BundleInnerMessage innerUpdateMessage = new BundleUpdateFlowCaseBuilder()
                     .setUpdateFlowCaseData(new UpdateFlowCaseDataBuilder(updatedFlow).build()).build();
-            Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
-                    .setBundleInnerMessage(bundleInnerMessage).build();
+            Message updateMessage = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                    .setBundleInnerMessage(innerUpdateMessage).build();
             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent,
                     updatedFlow, identifier, bundleId);
+            List<Message> messages = Arrays.asList(deleteMessage, updateMessage);
             SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
-            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
+            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, messages, resultFuture),
                     MoreExecutors.directExecutor());
             return resultFuture;
         });
@@ -125,7 +130,8 @@ public class BundleFlowForwarder {
             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
                     identifier, bundleId);
             SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
-            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
+            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId,
+                            Collections.singletonList(message), resultFuture),
                     MoreExecutors.directExecutor());
             return resultFuture;
         });
@@ -150,9 +156,8 @@ public class BundleFlowForwarder {
                 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
                 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
                 LOG.info("Reading the group from config inventory: {}", groupId);
-                try (ReadOnlyTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
-                    Optional<Group> group = readTransaction
-                            .read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
+                try (ReadTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
+                    Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
                     if (group.isPresent()) {
                         final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
                         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
@@ -200,15 +205,15 @@ public class BundleFlowForwarder {
     private final class BundleFlowCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
         private final InstanceIdentifier<FlowCapableNode> nodeIdent;
         private final BundleId bundleId;
-        private final Message message;
+        private final List<Message> messages;
         private final NodeId nodeId;
         private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
 
-        BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, Message message,
+        BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, List<Message> messages,
                 SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
             this.nodeIdent = nodeIdent;
             this.bundleId = bundleId;
-            this.message = message;
+            this.messages = messages;
             this.resultFuture = resultFuture;
             nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
         }
@@ -218,8 +223,7 @@ public class BundleFlowForwarder {
             if (rpcResult.isSuccessful()) {
                 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
                         .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
-                        .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(
-                                Collections.singletonList(message)).build()).build();
+                        .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
 
                 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
                         bundleId.getValue(), nodeId.getValue());
@@ -244,7 +248,8 @@ public class BundleFlowForwarder {
 
         @Override
         public void onFailure(Throwable throwable) {
-            LOG.error("Error while pushing flow add bundle {} for device {}", message, nodeId);
+            LOG.error("Error while pushing flow add bundle {} for device {}", messages, nodeId);
+            resultFuture.setException(throwable);
         }
     }
 }