Merge "Log Future failure in NodeConfiguratorImpl"
authorArunprakash D <d.arunprakash@ericsson.com>
Mon, 19 Nov 2018 15:20:03 +0000 (15:20 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 19 Nov 2018 15:20:03 +0000 (15:20 +0000)
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java

index 88e90e8a2992f036623c20cf1e6a8f45aa0f12e9..351f703dbb0056916292657c4d3adc5704aa5277 100644 (file)
@@ -21,8 +21,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -34,7 +34,6 @@ import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
@@ -108,9 +107,10 @@ public class BundleFlowForwarder {
                     .setBundleInnerMessage(bundleInnerMessage).build();
             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent,
                     updatedFlow, identifier, bundleId);
-            ListenableFuture<RpcResult<AddFlowOutput>> flowFuture = SettableFuture.create();
-            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, flowFuture));
-            return flowFuture;
+            SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
+            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
+                    MoreExecutors.directExecutor());
+            return resultFuture;
         });
     }
 
@@ -124,10 +124,10 @@ public class BundleFlowForwarder {
                     .setBundleInnerMessage(bundleInnerMessage).build();
             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
                     identifier, bundleId);
-            ListenableFuture<RpcResult<AddFlowOutput>> flowFuture = SettableFuture.create();
-            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, flowFuture),
+            SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
+            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
                     MoreExecutors.directExecutor());
-            return flowFuture;
+            return resultFuture;
         });
     }
 
@@ -202,27 +202,43 @@ public class BundleFlowForwarder {
         private final BundleId bundleId;
         private final Message message;
         private final NodeId nodeId;
+        private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
 
         BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, Message message,
-                ListenableFuture<RpcResult<AddFlowOutput>> flowFuture) {
+                SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
             this.nodeIdent = nodeIdent;
             this.bundleId = bundleId;
             this.message = message;
+            this.resultFuture = resultFuture;
             nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
         }
 
         @Override
-        @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
         public void onSuccess(RpcResult<AddBundleMessagesOutput> rpcResult) {
             if (rpcResult.isSuccessful()) {
-                List<Message> messages = new ArrayList<>(1);
-                messages.add(message);
                 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
                         .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
-                        .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
+                        .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(
+                                Collections.singletonList(message)).build()).build();
+
                 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
                         bundleId.getValue(), nodeId.getValue());
-                forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
+
+                final ListenableFuture<RpcResult<AddBundleMessagesOutput>> addFuture =
+                        forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
+                Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
+                    @Override
+                    public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
+                        resultFuture.set(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable failure) {
+                        resultFuture.setException(failure);
+                    }
+                },  MoreExecutors.directExecutor());
+            } else {
+                resultFuture.set(rpcResult);
             }
         }
 
index 45f3ae3b065b035f66bd8dbe9a15e2194ab87376..00eda3f7c1f65b576feb3d2f95ab07ab7284dbf5 100644 (file)
@@ -21,7 +21,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -42,6 +41,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
@@ -90,8 +90,8 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         bundleFlowForwarder = new BundleFlowForwarder(manager);
     }
 
-    @SuppressWarnings("IllegalCatch")
     @Override
+    @SuppressWarnings("IllegalCatch")
     public void registerListener() {
         final DataTreeIdentifier<Flow> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
                 getWildCardPath());
@@ -195,30 +195,28 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
                     builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
 
                     Long groupId = isFlowDependentOnGroup(update);
-                    ListenableFuture<RpcResult<UpdateFlowOutput>> future = Futures.immediateFuture(null);
                     if (groupId != null) {
                         LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
                                 getFlowId(new FlowRef(identifier)), groupId);
                         if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
                             LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
                                     getFlowId(new FlowRef(identifier)));
-                            future = provider.getSalFlowService().updateFlow(builder.build());
-                            JdkFutures.addErrorLogging(future, LOG, "updateFlow");
+                            return provider.getSalFlowService().updateFlow(builder.build());
                         } else {
                             LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
                             ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
                                     groupId);
+                            SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture = SettableFuture.create();
                             Futures.addCallback(groupFuture,
-                                    new UpdateFlowCallBack(builder.build(), nodeId, future, groupId),
+                                    new UpdateFlowCallBack(builder.build(), nodeId, resultFuture, groupId),
                                     MoreExecutors.directExecutor());
+                            return resultFuture;
                         }
-                    } else {
-                        LOG.trace("The flow {} is not dependent on any group. Updating the flow",
-                                getFlowId(new FlowRef(identifier)));
-                        future = provider.getSalFlowService().updateFlow(builder.build());
-                        JdkFutures.addErrorLogging(future, LOG, "updateFlow");
                     }
-                    return future;
+
+                    LOG.trace("The flow {} is not dependent on any group. Updating the flow",
+                            getFlowId(new FlowRef(identifier)));
+                    return provider.getSalFlowService().updateFlow(builder.build());
                 });
             }
         }
@@ -254,16 +252,16 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
                             LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
                             ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
                                     groupId);
-                            Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId),
-                                    MoreExecutors.directExecutor());
-                            // TODO This makes new sense and seems just wrong...
-                            return SettableFuture.create();
+                            SettableFuture<RpcResult<AddFlowOutput>> resultFuture = SettableFuture.create();
+                            Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId,
+                                    resultFuture), MoreExecutors.directExecutor());
+                            return resultFuture;
                         }
-                    } else {
-                        LOG.trace("The flow {} is not dependent on any group. Adding the flow",
-                                getFlowId(new FlowRef(identifier)));
-                        return provider.getSalFlowService().addFlow(builder.build());
                     }
+
+                    LOG.trace("The flow {} is not dependent on any group. Adding the flow",
+                            getFlowId(new FlowRef(identifier)));
+                    return provider.getSalFlowService().addFlow(builder.build());
                 });
             }
         }
@@ -370,34 +368,41 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         private final AddFlowInput addFlowInput;
         private final NodeId nodeId;
         private final Long groupId;
+        private final SettableFuture<RpcResult<AddFlowOutput>> resultFuture;
 
-        // TODO
-        private AddFlowCallBack(final AddFlowInput addFlowInput, final NodeId nodeId, Long groupId) {
+        private AddFlowCallBack(final AddFlowInput addFlowInput, final NodeId nodeId, Long groupId,
+                SettableFuture<RpcResult<AddFlowOutput>> resultFuture) {
             this.addFlowInput = addFlowInput;
             this.nodeId = nodeId;
             this.groupId = groupId;
+            this.resultFuture = resultFuture;
         }
 
         @Override
-        @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
         public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
-            if (rpcResult.isSuccessful()) {
+            if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
+                    && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
-                provider.getSalFlowService().addFlow(addFlowInput);
+                Futures.addCallback(provider.getSalFlowService().addFlow(addFlowInput),
+                    new FutureCallback<RpcResult<AddFlowOutput>>() {
+                        @Override
+                        public void onSuccess(RpcResult<AddFlowOutput> result) {
+                            resultFuture.set(result);
+                        }
+
+                        @Override
+                        public void onFailure(Throwable failure) {
+                            resultFuture.setException(failure);
+                        }
+                    },  MoreExecutors.directExecutor());
+
                 LOG.debug("Flow add with id {} finished without error for node {}",
                         getFlowId(addFlowInput.getFlowRef()), nodeId);
             } else {
-                if (rpcResult.getErrors().size() == 1
-                        && rpcResult.getErrors().iterator().next().getMessage()
-                        .contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
-                    provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
-                    provider.getSalFlowService().addFlow(addFlowInput);
-                    LOG.debug("Group {} already programmed in the device. Adding the flow {}", groupId,
-                            getFlowId(addFlowInput.getFlowRef()));
-                } else {
-                    LOG.error("Flow add with id {} failed for node {} with error {}",
-                            getFlowId(addFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
-                }
+                LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()),
+                        nodeId, rpcResult.getErrors().toString());
+                resultFuture.set(RpcResultBuilder.<AddFlowOutput>failed()
+                        .withRpcErrors(rpcResult.getErrors()).build());
             }
         }
 
@@ -412,34 +417,41 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         private final UpdateFlowInput updateFlowInput;
         private final NodeId nodeId;
         private final Long groupId;
+        private final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture;
 
         private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final NodeId nodeId,
-                ListenableFuture<RpcResult<UpdateFlowOutput>> future, Long groupId) {
+                SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture, Long groupId) {
             this.updateFlowInput = updateFlowInput;
             this.nodeId = nodeId;
             this.groupId = groupId;
+            this.resultFuture = resultFuture;
         }
 
         @Override
-        @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
         public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
-            if (rpcResult.isSuccessful()) {
+            if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
+                    && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
-                provider.getSalFlowService().updateFlow(updateFlowInput);
+                Futures.addCallback(provider.getSalFlowService().updateFlow(updateFlowInput),
+                    new FutureCallback<RpcResult<UpdateFlowOutput>>() {
+                        @Override
+                        public void onSuccess(RpcResult<UpdateFlowOutput> result) {
+                            resultFuture.set(result);
+                        }
+
+                        @Override
+                        public void onFailure(Throwable failure) {
+                            resultFuture.setException(failure);
+                        }
+                    },  MoreExecutors.directExecutor());
+
                 LOG.debug("Flow update with id {} finished without error for node {}",
                         getFlowId(updateFlowInput.getFlowRef()), nodeId);
             } else {
-                if (rpcResult.getErrors().size() == 1
-                        && rpcResult.getErrors().iterator().next().getMessage()
-                        .contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
-                    provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
-                    provider.getSalFlowService().updateFlow(updateFlowInput);
-                    LOG.debug("Group {} already programmed in the device. Updating the flow {}", groupId,
-                            getFlowId(updateFlowInput.getFlowRef()));
-                } else {
-                    LOG.error("Flow update with id {} failed for node {} with error {}",
-                            getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
-                }
+                LOG.error("Flow update with id {} failed for node {} with error {}",
+                        getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
+                resultFuture.set(RpcResultBuilder.<UpdateFlowOutput>failed()
+                        .withRpcErrors(rpcResult.getErrors()).build());
             }
         }