Bump mdsal to 5.0.2
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowNodeReconciliationImpl.java
index 85658f05c12f22b121e969ec85d93d48b1ef84de..b3ef2f2f0bf1b125676548537f864e5b1f8171f3 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -65,7 +66,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
@@ -80,12 +80,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
@@ -97,6 +93,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint32;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,6 +105,7 @@ import org.slf4j.LoggerFactory;
 public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
+    private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
 
     // The number of nanoseconds to wait for a single group to be added.
     private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
@@ -188,6 +186,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
 
             if (flowNode.isPresent()) {
                 LOG.debug("FlowNode present for Datapath ID {}", dpnId);
+                OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
                 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
 
                 final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
@@ -202,10 +201,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                         .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
                         .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
 
-                final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
                         .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                        .setMessages(createMessages(nodeRef, flowNode)).build();
+                        .setMessages(createMessages(nodeRef)).build();
 
+                LOG.debug("Closing openflow bundle for device {}", dpnId);
                 /* Close previously opened bundle on the openflow switch if any */
                 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
                         = salBundleService.controlBundle(closeBundleInput);
@@ -217,23 +217,33 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                             MoreExecutors.directExecutor());
 
                 /* Push groups and flows via bundle add messages */
-                ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture
+                ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
                         = Futures.transformAsync(openBundle, rpcResult -> {
                             if (rpcResult.isSuccessful()) {
-                                return salBundleService.addBundleMessages(addBundleMessagesInput);
+                                return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
                             }
                             return Futures.immediateFuture(null);
                         }, MoreExecutors.directExecutor());
 
-                /* Commit the bundle on the openflow switch */
-                ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
-                        = Futures.transformAsync(addBundleMessagesFuture, rpcResult -> {
+                /* Push flows and groups via bundle add messages */
+                Optional<FlowCapableNode> finalFlowNode = flowNode;
+                ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
+                        = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
                             if (rpcResult.isSuccessful()) {
-                                return salBundleService.controlBundle(commitBundleInput);
+                                LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId);
+                                return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
+                                        nodeIdentity));
                             }
                             return Futures.immediateFuture(null);
                         }, MoreExecutors.directExecutor());
 
+                /* Commit the bundle on the openflow switch */
+                ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture = Futures.transformAsync(
+                        addbundlesFuture, rpcResult -> {
+                        LOG.debug("Adding bundle messages completed for device {}", dpnId);
+                        return salBundleService.controlBundle(commitBundleInput);
+                    }, MoreExecutors.directExecutor());
+
                 /* Bundles not supported for meters */
                 List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
                         : Collections.emptyList();
@@ -248,16 +258,19 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                         }
                         return Futures.immediateFuture(null);
                     }, MoreExecutors.directExecutor());
-
                 try {
-                    if (commitBundleFuture.get().isSuccessful()) {
+                    RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
+                    if (bundleFuture != null && bundleFuture.isSuccessful()) {
                         LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
+                        OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
                         return true;
                     } else {
+                        LOG.error("commit bundle failed for device {} with error {}", dpnId,
+                                commitBundleFuture.get().getErrors());
                         return false;
                     }
                 } catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Error while doing bundle based reconciliation for device ID:{}", nodeIdentity);
+                    LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
                     return false;
                 }
             }
@@ -309,6 +322,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         public Boolean call() {
             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
             BigInteger dpnId = getDpnIdFromNodeName(node);
+            OF_EVENT_LOG.debug("Reconciliation Start, Node: {}", dpnId);
 
             Optional<FlowCapableNode> flowNode;
             // initialize the counter
@@ -342,7 +356,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                 // new list for suspected groups pointing to ports .. when the ports come up
                 // late
                 List<Group> suspectedGroups = new ArrayList<>();
-                Map<Long, ListenableFuture<?>> groupFutures = new HashMap<>();
+                Map<Uint32, ListenableFuture<?>> groupFutures = new HashMap<>();
 
                 while ((!toBeInstalledGroups.isEmpty() || !suspectedGroups.isEmpty())
                         && counter <= provider.getReconciliationRetryCount()) { // also check if the counter has not
@@ -399,7 +413,8 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                                         .equals("org.opendaylight.yang.gen.v1.urn.opendaylight"
                                                 + ".action.types.rev131112.action.action.GroupActionCase")) {
                                     // chained groups
-                                    Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
+                                    Uint32 groupId = ((GroupActionCase) action.getAction()).getGroupAction()
+                                            .getGroupId();
                                     ListenableFuture<?> future = groupFutures.get(groupId);
                                     if (future == null) {
                                         okToInstall = false;
@@ -451,16 +466,19 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                 /* Flows */
                 List<Table> tables = flowNode.get().getTable() != null ? flowNode.get().getTable()
                         : Collections.<Table>emptyList();
+                int flowCount = 0;
                 for (Table table : tables) {
                     final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdentity.child(Table.class,
                             table.key());
                     List<Flow> flows = table.getFlow() != null ? table.getFlow() : Collections.<Flow>emptyList();
+                    flowCount += flows.size();
                     for (Flow flow : flows) {
                         final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
                                 flow.key());
                         provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity);
                     }
                 }
+                OF_EVENT_LOG.debug("Reconciliation Finish, Node: {}, flow count: {}", dpnId, flowCount);
             }
             return true;
         }
@@ -474,9 +492,9 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
          * @param group
          *            The group to add.
          */
-        private void addGroup(Map<Long, ListenableFuture<?>> map, Group group) {
+        private void addGroup(Map<Uint32, ListenableFuture<?>> map, Group group) {
             KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdentity.child(Group.class, group.key());
-            final Long groupId = group.getGroupId().getValue();
+            final Uint32 groupId = group.getGroupId().getValue();
             ListenableFuture<?> future = JdkFutureAdapters
                     .listenInPoolThread(provider.getGroupCommiter().add(groupIdent, group, nodeIdentity));
 
@@ -533,6 +551,8 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         }
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private BigInteger getDpnIdFromNodeName(String nodeName) {
 
         String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
@@ -683,6 +703,28 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         return nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId())));
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private List<ListenableFuture<RpcResult<AddBundleMessagesOutput>>> addBundleMessages(final FlowCapableNode flowNode,
+                                                         final BundleId bundleIdValue,
+                                                         final InstanceIdentifier<FlowCapableNode> nodeIdentity) {
+        List<ListenableFuture<RpcResult<AddBundleMessagesOutput>>> futureList = new ArrayList<>();
+        for (Group group : flowNode.nonnullGroup()) {
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdentity.child(Group.class, group.key());
+            futureList.add(provider.getBundleGroupListener().add(groupIdent, group, nodeIdentity, bundleIdValue));
+        }
+
+        for (Table table : flowNode.nonnullTable()) {
+            final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdentity.child(Table.class, table.key());
+            for (Flow flow : table.nonnullFlow()) {
+                final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
+                futureList.add(provider.getBundleFlowListener().add(flowIdent, flow, nodeIdentity, bundleIdValue));
+            }
+        }
+        OF_EVENT_LOG.debug("Flow/Group count is {}", futureList.size());
+        return futureList;
+    }
+
     private void handleStaleEntityDeletionResultFuture(FluentFuture<?> submitFuture) {
         submitFuture.addCallback(new FutureCallback<Object>() {
             @Override
@@ -710,7 +752,9 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         return groupBuilder.build();
     }
 
-    private Messages createMessages(final NodeRef nodeRef, final Optional<FlowCapableNode> flowNode) {
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private Messages createMessages(final NodeRef nodeRef) {
         final List<Message> messages = new ArrayList<>();
         messages.add(new MessageBuilder().setNode(nodeRef)
                 .setBundleInnerMessage(new BundleRemoveFlowCaseBuilder()
@@ -721,29 +765,6 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                 .setBundleInnerMessage(new BundleRemoveGroupCaseBuilder()
                         .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()).build())
                 .build());
-
-        if (flowNode.get().getGroup() != null) {
-            for (Group gr : flowNode.get().getGroup()) {
-                NodeId nodeId = nodeRef.getValue().firstKeyOf(Node.class).getId();
-                provider.getDevicesGroupRegistry().storeGroup(nodeId,gr.getGroupId().getValue());
-                messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(new BundleAddGroupCaseBuilder()
-                        .setAddGroupCaseData(new AddGroupCaseDataBuilder(gr).build()).build()).build());
-            }
-        }
-
-        if (flowNode.get().getTable() != null) {
-            for (Table table : flowNode.get().getTable()) {
-                for (Flow flow : table.getFlow()) {
-                    messages.add(
-                            new MessageBuilder().setNode(nodeRef)
-                                    .setBundleInnerMessage(new BundleAddFlowCaseBuilder()
-                                            .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build())
-                                    .build());
-                }
-            }
-        }
-
-        LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
         return new MessagesBuilder().setMessage(messages).build();
     }
 }