Merge "Sending each flow/group in separate bundle add rpc instead of adding all messa...
authorArunprakash D <d.arunprakash@ericsson.com>
Thu, 1 Aug 2019 06:53:10 +0000 (06:53 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 1 Aug 2019 06:53:10 +0000 (06:53 +0000)
12 files changed:
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/BundleMessagesCommiter.java [new file with mode: 0644]
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesManager.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/ForwardingRulesManagerImpl.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java
openflowjava/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/HandshakeManagerImpl.java

diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/BundleMessagesCommiter.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/BundleMessagesCommiter.java
new file mode 100644 (file)
index 0000000..47b357d
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public interface BundleMessagesCommiter<D extends DataObject> {
+
+    /**
+     * Method removes DataObject which is identified by InstanceIdentifier from
+     * device.
+     *
+     * @param identifier
+     *            - the whole path to DataObject
+     * @param del
+     *            - DataObject for removing
+     * @param nodeIdent
+     *            Node InstanceIdentifier
+     */
+    void remove(InstanceIdentifier<D> identifier, D del, InstanceIdentifier<FlowCapableNode> nodeIdent,
+            BundleId bundleId);
+
+    /**
+     * Method updates the original DataObject to the update DataObject in device.ForwardingRulesManager.java.
+     * Both are identified by same InstanceIdentifier.
+     *
+     * @param identifier
+     *            - the whole path to DataObject
+     * @param original
+     *            - original DataObject (for update)
+     * @param update
+     *            - changed DataObject (contain updates)
+     * @param nodeIdent
+     *            Node InstanceIdentifier
+     */
+    void update(InstanceIdentifier<D> identifier, D original, D update,
+            InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId);
+
+    /**
+     * Method adds the DataObject which is identified by InstanceIdentifier to
+     * device.
+     *
+     * @param identifier
+     *            - the whole path to new DataObject
+     * @param add
+     *            - new DataObject
+     * @param nodeIdent
+     *            Node InstanceIdentifier
+     * @return A future associated with RPC task. {@code null} is set to the future
+     *         if this method does not invoke RPC.
+     */
+    ListenableFuture<RpcResult<AddBundleMessagesOutput>> add(InstanceIdentifier<D> identifier, D add,
+                                                     InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId);
+
+}
index 67242f46b0db475245f7ed7d3f6e2c54dedfaed4..8f748cf72cc554eab87010c3df01e4790e3ca208 100644 (file)
@@ -137,6 +137,20 @@ public interface ForwardingRulesManager extends ConfigurationListener, AutoClose
      */
     ForwardingRulesCommiter<TableFeatures> getTableFeaturesCommiter();
 
+    /**
+     * Return BundleFlowListener instance.
+     *
+     * @return BundleFlowListener
+     */
+    BundleMessagesCommiter<Flow> getBundleFlowListener();
+
+    /**
+     * Return BundleGroupListener instance.
+     *
+     * @return BundleGroupListener
+     */
+    BundleMessagesCommiter<Group> getBundleGroupListener();
+
     /**
      * Check if reconciliation is disabled by user.
      *
index b1827a31403cd9b4b25e7eb1d3c669c1656546c0..f703547117b49694c0489ec3bc4608d2808383a3 100644 (file)
@@ -25,10 +25,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
 import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
@@ -64,7 +64,7 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BundleFlowForwarder {
+public class BundleFlowForwarder implements BundleMessagesCommiter<Flow> {
 
     private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
@@ -119,7 +119,9 @@ public class BundleFlowForwarder {
         });
     }
 
-    public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow flow,
+    @Override
+    public ListenableFuture<RpcResult<AddBundleMessagesOutput>> add(final InstanceIdentifier<Flow> identifier,
+                                                                    final Flow flow,
             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
         final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
         return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
index 9510e633f32cd2794d597f74b11e338a1f41ce64..12d68f6b3c39d7b405d1e7a683c04f61bb0dc7ac 100644 (file)
@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -47,7 +48,7 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BundleGroupForwarder {
+public class BundleGroupForwarder implements BundleMessagesCommiter<Group> {
 
     private static final Logger LOG = LoggerFactory.getLogger(BundleGroupForwarder.class);
     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
@@ -110,8 +111,9 @@ public class BundleGroupForwarder {
         });
     }
 
-    public ListenableFuture<? extends RpcResult<?>> add(final InstanceIdentifier<Group> identifier, final Group group,
-            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+    @Override
+    public ListenableFuture<RpcResult<AddBundleMessagesOutput>> add(final InstanceIdentifier<Group> identifier,
+                    final Group group, final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
         final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
         final Long groupId = group.getGroupId().getValue();
         return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
index 8ea9001ce2a341bb4bd102a2ddb174acad301338..07744ffc45e17bb2cf2032b6310dfe17d4eeb25d 100644 (file)
@@ -84,11 +84,9 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
     private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS";
 
     private ListenerRegistration<FlowForwarder> listenerRegistration;
-    private final BundleFlowForwarder bundleFlowForwarder;
 
     public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db) {
         super(manager, db);
-        bundleFlowForwarder = new BundleFlowForwarder(manager);
     }
 
     @Override
@@ -127,7 +125,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
             BundleId bundleId = getActiveBundle(nodeIdent, provider);
             if (bundleId != null) {
-                bundleFlowForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId);
+                provider.getBundleFlowListener().remove(identifier, removeDataObj, nodeIdent, bundleId);
             } else {
                 final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
                 builder.setFlowRef(new FlowRef(identifier));
@@ -178,7 +176,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (tableIdValidationPrecondition(tableKey, update)) {
             BundleId bundleId = getActiveBundle(nodeIdent, provider);
             if (bundleId != null) {
-                bundleFlowForwarder.update(identifier, original, update, nodeIdent, bundleId);
+                provider.getBundleFlowListener().update(identifier, original, update, nodeIdent, bundleId);
             } else {
                 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
                 nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
@@ -230,7 +228,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         if (tableIdValidationPrecondition(tableKey, addDataObj)) {
             BundleId bundleId = getActiveBundle(nodeIdent, provider);
             if (bundleId != null) {
-                return bundleFlowForwarder.add(identifier, addDataObj, nodeIdent, bundleId);
+                return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId);
             } else {
                 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
                 nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
index 797da80a565254bd4a973f0b57c92e03d36a0897..0a53760c2c96df5eb97a54c539558cabfde250cf 100644 (file)
@@ -65,7 +65,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
@@ -80,12 +79,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
@@ -204,10 +199,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                         .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
                         .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
 
-                final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
                         .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                        .setMessages(createMessages(nodeRef, flowNode)).build();
+                        .setMessages(createMessages(nodeRef)).build();
 
+                LOG.debug("Closing openflow bundle for device {}", dpnId);
                 /* Close previously opened bundle on the openflow switch if any */
                 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
                         = salBundleService.controlBundle(closeBundleInput);
@@ -219,23 +215,31 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                             MoreExecutors.directExecutor());
 
                 /* Push groups and flows via bundle add messages */
-                ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture
+                ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
                         = Futures.transformAsync(openBundle, rpcResult -> {
                             if (rpcResult.isSuccessful()) {
-                                return salBundleService.addBundleMessages(addBundleMessagesInput);
+                                return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
                             }
                             return Futures.immediateFuture(null);
                         }, MoreExecutors.directExecutor());
 
-                /* Commit the bundle on the openflow switch */
-                ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
-                        = Futures.transformAsync(addBundleMessagesFuture, rpcResult -> {
+                /* Push flows and groups via bundle add messages */
+                Optional<FlowCapableNode> finalFlowNode = flowNode;
+                ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
+                        = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
                             if (rpcResult.isSuccessful()) {
-                                return salBundleService.controlBundle(commitBundleInput);
+                                LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId);
+                                return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
+                                        nodeIdentity));
                             }
                             return Futures.immediateFuture(null);
                         }, MoreExecutors.directExecutor());
 
+                /* Commit the bundle on the openflow switch */
+                ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
+                        = Futures.transformAsync(addbundlesFuture, rpcResult ->
+                        salBundleService.controlBundle(commitBundleInput), MoreExecutors.directExecutor());
+
                 /* Bundles not supported for meters */
                 List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
                         : Collections.emptyList();
@@ -250,9 +254,9 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                         }
                         return Futures.immediateFuture(null);
                     }, MoreExecutors.directExecutor());
-
                 try {
-                    if (commitBundleFuture.get().isSuccessful()) {
+                    RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
+                    if (bundleFuture != null && bundleFuture.isSuccessful()) {
                         LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
                         OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
                         return true;
@@ -260,7 +264,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                         return false;
                     }
                 } catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Error while doing bundle based reconciliation for device ID:{}", nodeIdentity);
+                    LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
                     return false;
                 }
             }
@@ -690,6 +694,26 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         return nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId())));
     }
 
+    private List<ListenableFuture<RpcResult<AddBundleMessagesOutput>>> addBundleMessages(final FlowCapableNode flowNode,
+                                                         final BundleId bundleIdValue,
+                                                         final InstanceIdentifier<FlowCapableNode> nodeIdentity) {
+        List<ListenableFuture<RpcResult<AddBundleMessagesOutput>>> futureList = new ArrayList<>();
+        for (Group group : flowNode.nonnullGroup()) {
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdentity.child(Group.class, group.key());
+            futureList.add(provider.getBundleGroupListener().add(groupIdent, group, nodeIdentity, bundleIdValue));
+        }
+
+        for (Table table : flowNode.nonnullTable()) {
+            final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdentity.child(Table.class, table.key());
+            for (Flow flow : table.nonnullFlow()) {
+                final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
+                futureList.add(provider.getBundleFlowListener().add(flowIdent, flow, nodeIdentity, bundleIdValue));
+            }
+        }
+        OF_EVENT_LOG.debug("Flow/Group count is {}", futureList.size());
+        return futureList;
+    }
+
     private void handleStaleEntityDeletionResultFuture(FluentFuture<?> submitFuture) {
         submitFuture.addCallback(new FutureCallback<Object>() {
             @Override
@@ -717,7 +741,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         return groupBuilder.build();
     }
 
-    private Messages createMessages(final NodeRef nodeRef, final Optional<FlowCapableNode> flowNode) {
+    private Messages createMessages(final NodeRef nodeRef) {
         final List<Message> messages = new ArrayList<>();
         messages.add(new MessageBuilder().setNode(nodeRef)
                 .setBundleInnerMessage(new BundleRemoveFlowCaseBuilder()
@@ -728,29 +752,6 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                 .setBundleInnerMessage(new BundleRemoveGroupCaseBuilder()
                         .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()).build())
                 .build());
-
-        NodeId nodeId = nodeRef.getValue().firstKeyOf(Node.class).getId();
-        if (flowNode.get().getGroup() != null) {
-            for (Group gr : flowNode.get().getGroup()) {
-                provider.getDevicesGroupRegistry().storeGroup(nodeId,gr.getGroupId().getValue());
-                messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(new BundleAddGroupCaseBuilder()
-                        .setAddGroupCaseData(new AddGroupCaseDataBuilder(gr).build()).build()).build());
-            }
-        }
-
-        if (flowNode.get().getTable() != null) {
-            for (Table table : flowNode.get().getTable()) {
-                for (Flow flow : table.getFlow()) {
-                    messages.add(
-                            new MessageBuilder().setNode(nodeRef)
-                                    .setBundleInnerMessage(new BundleAddFlowCaseBuilder()
-                                            .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build())
-                                    .build());
-                }
-            }
-        }
-
-        LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
         return new MessagesBuilder().setMessage(messages).build();
     }
 }
index 334944c215d5060c856bfeed59866f345ad7d631..3aa4f0b04db31d466d40e3aa3f772464896067da 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
+import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesCommiter;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
@@ -88,6 +89,8 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
     private ForwardingRulesCommiter<Group> groupListener;
     private ForwardingRulesCommiter<Meter> meterListener;
     private ForwardingRulesCommiter<TableFeatures> tableListener;
+    private BundleMessagesCommiter<Flow> bundleFlowListener;
+    private BundleMessagesCommiter<Group> bundleGroupListener;
     private FlowNodeReconciliation nodeListener;
     private NotificationRegistration reconciliationNotificationRegistration;
     private FlowNodeConnectorInventoryTranslatorImpl flowNodeConnectorInventoryTranslatorImpl;
@@ -166,6 +169,8 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
                 new FrmReconciliationServiceImpl(this));
         flowNodeConnectorInventoryTranslatorImpl = new FlowNodeConnectorInventoryTranslatorImpl(dataService);
 
+        this.bundleFlowListener = new BundleFlowForwarder(this);
+        this.bundleGroupListener = new BundleGroupForwarder(this);
         this.flowListener = new FlowForwarder(this, dataService);
         this.groupListener = new GroupForwarder(this, dataService);
         this.meterListener = new MeterForwarder(this, dataService);
@@ -292,6 +297,16 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
         return tableListener;
     }
 
+    @Override
+    public BundleMessagesCommiter<Flow> getBundleFlowListener() {
+        return bundleFlowListener;
+    }
+
+    @Override
+    public BundleMessagesCommiter<Group> getBundleGroupListener() {
+        return bundleGroupListener;
+    }
+
     @Override
     public ArbitratorReconcileService getArbitratorReconciliationManager() {
         return arbitratorReconciliationManager;
index 58a66bdef24b6e38f97fc8a489b4133434cf7477..ae54153081528469dfd5d3169356317fd9a6dc21 100644 (file)
@@ -62,11 +62,9 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
 
     private static final Logger LOG = LoggerFactory.getLogger(GroupForwarder.class);
     private ListenerRegistration<GroupForwarder> listenerRegistration;
-    private final BundleGroupForwarder bundleGroupForwarder;
 
     public GroupForwarder(final ForwardingRulesManager manager, final DataBroker db) {
         super(manager, db);
-        this.bundleGroupForwarder = new BundleGroupForwarder(manager);
     }
 
     @SuppressWarnings("IllegalCatch")
@@ -108,7 +106,7 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
         BundleId bundleId = getActiveBundle(nodeIdent, provider);
         if (bundleId != null) {
-            bundleGroupForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId);
+            provider.getBundleGroupListener().remove(identifier, removeDataObj, nodeIdent, bundleId);
         } else {
             final Group group = removeDataObj;
             final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group);
@@ -146,7 +144,7 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
         BundleId bundleId = getActiveBundle(nodeIdent, provider);
         if (bundleId != null) {
-            bundleGroupForwarder.update(identifier, original, update, nodeIdent, bundleId);
+            provider.getBundleGroupListener().update(identifier, original, update, nodeIdent, bundleId);
         } else {
             final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
             nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
@@ -175,7 +173,7 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
         BundleId bundleId = getActiveBundle(nodeIdent, provider);
         if (bundleId != null) {
-            return bundleGroupForwarder.add(identifier, addDataObj, nodeIdent, bundleId);
+            return provider.getBundleGroupListener().add(identifier, addDataObj, nodeIdent, bundleId);
         } else {
             final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
             return nodeConfigurator
index af0265d832af337ecefbd8f28bb9f5cb2000c418..0be00028a1374aad7d16d1e5ad26f4690575d285 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.openflowjava.protocol.api.connection;
 
 import com.google.common.annotations.Beta;
+import java.math.BigInteger;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Future;
 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
@@ -115,4 +116,10 @@ public interface ConnectionAdapter extends OpenflowProtocolService {
      */
     @Beta
     void setPacketInFiltering(boolean enabled);
+
+    /**
+     * Set datapathId for the dpn.
+     * @param datapathId of the dpn
+     */
+    void setDatapathId(BigInteger datapathId);
 }
index ebb170e6f371e55f71e2c9693ba17b38de3203e0..927d7310f620e512181f03ac1965dc2e704b191f 100644 (file)
@@ -13,6 +13,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
+import java.math.BigInteger;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -235,8 +236,10 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
      * Method immediately response on Echo message.
      *
      * @param message incoming Echo message from device
+     * @param datapathId the dpnId of the node
      */
-    void onEchoRequest(final EchoRequestMessage message) {
+    void onEchoRequest(final EchoRequestMessage message, BigInteger datapathId) {
+        LOG.debug("echo request received: {} for the DPN {}", message.getXid(), datapathId);
         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
         parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
index a938596d10fb2f8fb487ff6c0e6e0467f0c88f37..017be600087ef0f3b95376fc5bc8e707c2801a7f 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
+import java.math.BigInteger;
 import java.net.InetSocketAddress;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
@@ -52,6 +53,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
     private AlienMessageListener alienMessageListener;
     private AbstractOutboundQueueManager<?, ?> outputManager;
     private OFVersionDetector versionDetector;
+    private BigInteger datapathId;
 
     private final boolean useBarrier;
 
@@ -91,7 +93,9 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
 
     @Override
     public void consumeDeviceMessage(final DataObject message) {
-        LOG.debug("ConsumeIntern msg on {}", channel);
+        LOG.debug("ConsumeIntern msg {} for dpn {} on {}", message.implementedInterface().getSimpleName(),
+                datapathId, channel);
+        LOG.trace("ConsumeIntern msg {}", message);
         if (disconnectOccured) {
             return;
         }
@@ -107,7 +111,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
             // OpenFlow messages
             } else if (message instanceof EchoRequestMessage) {
                 if (outputManager != null) {
-                    outputManager.onEchoRequest((EchoRequestMessage) message);
+                    outputManager.onEchoRequest((EchoRequestMessage) message, datapathId);
                 } else {
                     messageListener.onEchoRequestMessage((EchoRequestMessage) message);
                 }
@@ -228,4 +232,9 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
         versionDetector.setFilterPacketIns(enabled);
         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
     }
+
+    @Override
+    public void setDatapathId(final BigInteger datapathId) {
+        this.datapathId = datapathId;
+    }
 }
index ccba07f55a60cd21780debed74cc300d288c32ec..ac8aa0f9c8b99bdf9110b954f96077c6453734f8 100644 (file)
@@ -384,6 +384,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                         LOG.trace("features are back");
                         if (rpcFeatures.isSuccessful()) {
                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+                            connectionAdapter.setDatapathId(featureOutput.getDatapathId());
                             if (!deviceConnectionRateLimiter.tryAquire()) {
                                 LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
                                         + " the connection from device {}", featureOutput.getDatapathId());