From: Arunprakash D Date: Thu, 1 Aug 2019 06:53:10 +0000 (+0000) Subject: Merge "Sending each flow/group in separate bundle add rpc instead of adding all messa... X-Git-Tag: release/sodium~6 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=c3fcd7218d1c957366e0681b6c0e88a2efdbdf2a;hp=683cb105d3487ffa55964ead0170f713e8aa2254;p=openflowplugin.git Merge "Sending each flow/group in separate bundle add rpc instead of adding all messages and calling the bundle add rpc. This would allow for the echo message to be processed not waiting for all the bundle messages to be sent." --- diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/BundleMessagesCommiter.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/BundleMessagesCommiter.java new file mode 100644 index 0000000000..47b357de1d --- /dev/null +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/BundleMessagesCommiter.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.applications.frm; + +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; + +public interface BundleMessagesCommiter { + + /** + * Method removes DataObject which is identified by InstanceIdentifier from + * device. + * + * @param identifier + * - the whole path to DataObject + * @param del + * - DataObject for removing + * @param nodeIdent + * Node InstanceIdentifier + */ + void remove(InstanceIdentifier identifier, D del, InstanceIdentifier nodeIdent, + BundleId bundleId); + + /** + * Method updates the original DataObject to the update DataObject in device.ForwardingRulesManager.java. + * Both are identified by same InstanceIdentifier. + * + * @param identifier + * - the whole path to DataObject + * @param original + * - original DataObject (for update) + * @param update + * - changed DataObject (contain updates) + * @param nodeIdent + * Node InstanceIdentifier + */ + void update(InstanceIdentifier identifier, D original, D update, + InstanceIdentifier nodeIdent, BundleId bundleId); + + /** + * Method adds the DataObject which is identified by InstanceIdentifier to + * device. + * + * @param identifier + * - the whole path to new DataObject + * @param add + * - new DataObject + * @param nodeIdent + * Node InstanceIdentifier + * @return A future associated with RPC task. {@code null} is set to the future + * if this method does not invoke RPC. + */ + ListenableFuture> add(InstanceIdentifier identifier, D add, + InstanceIdentifier nodeIdent, BundleId bundleId); + +} diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesManager.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesManager.java index 67242f46b0..8f748cf72c 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesManager.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesManager.java @@ -137,6 +137,20 @@ public interface ForwardingRulesManager extends ConfigurationListener, AutoClose */ ForwardingRulesCommiter getTableFeaturesCommiter(); + /** + * Return BundleFlowListener instance. + * + * @return BundleFlowListener + */ + BundleMessagesCommiter getBundleFlowListener(); + + /** + * Return BundleGroupListener instance. + * + * @return BundleGroupListener + */ + BundleMessagesCommiter getBundleGroupListener(); + /** * Check if reconciliation is disabled by user. * diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java index b1827a3140..f703547117 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java @@ -25,10 +25,10 @@ import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; import org.opendaylight.mdsal.binding.api.ReadTransaction; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri; @@ -64,7 +64,7 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BundleFlowForwarder { +public class BundleFlowForwarder implements BundleMessagesCommiter { private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class); private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true); @@ -119,7 +119,9 @@ public class BundleFlowForwarder { }); } - public Future> add(final InstanceIdentifier identifier, final Flow flow, + @Override + public ListenableFuture> add(final InstanceIdentifier identifier, + final Flow flow, final InstanceIdentifier nodeIdent, final BundleId bundleId) { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> { diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java index 9510e633f3..12d68f6b3c 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; +import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -47,7 +48,7 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BundleGroupForwarder { +public class BundleGroupForwarder implements BundleMessagesCommiter { private static final Logger LOG = LoggerFactory.getLogger(BundleGroupForwarder.class); private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true); @@ -110,8 +111,9 @@ public class BundleGroupForwarder { }); } - public ListenableFuture> add(final InstanceIdentifier identifier, final Group group, - final InstanceIdentifier nodeIdent, final BundleId bundleId) { + @Override + public ListenableFuture> add(final InstanceIdentifier identifier, + final Group group, final InstanceIdentifier nodeIdent, final BundleId bundleId) { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); final Long groupId = group.getGroupId().getValue(); return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> { diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java index 8ea9001ce2..07744ffc45 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java @@ -84,11 +84,9 @@ public class FlowForwarder extends AbstractListeningCommiter { private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS"; private ListenerRegistration listenerRegistration; - private final BundleFlowForwarder bundleFlowForwarder; public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db) { super(manager, db); - bundleFlowForwarder = new BundleFlowForwarder(manager); } @Override @@ -127,7 +125,7 @@ public class FlowForwarder extends AbstractListeningCommiter { if (tableIdValidationPrecondition(tableKey, removeDataObj)) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { - bundleFlowForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId); + provider.getBundleFlowListener().remove(identifier, removeDataObj, nodeIdent, bundleId); } else { final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj); builder.setFlowRef(new FlowRef(identifier)); @@ -178,7 +176,7 @@ public class FlowForwarder extends AbstractListeningCommiter { if (tableIdValidationPrecondition(tableKey, update)) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { - bundleFlowForwarder.update(identifier, original, update, nodeIdent, bundleId); + provider.getBundleFlowListener().update(identifier, original, update, nodeIdent, bundleId); } else { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); nodeConfigurator.enqueueJob(nodeId.getValue(), () -> { @@ -230,7 +228,7 @@ public class FlowForwarder extends AbstractListeningCommiter { if (tableIdValidationPrecondition(tableKey, addDataObj)) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { - return bundleFlowForwarder.add(identifier, addDataObj, nodeIdent, bundleId); + return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId); } else { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); nodeConfigurator.enqueueJob(nodeId.getValue(), () -> { diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java index 797da80a56..0a53760c2c 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java @@ -65,7 +65,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId; @@ -80,12 +79,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType; @@ -204,10 +199,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setType(BundleControlType.ONFBCTCOMMITREQUEST).build(); - final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder() + final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder() .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) - .setMessages(createMessages(nodeRef, flowNode)).build(); + .setMessages(createMessages(nodeRef)).build(); + LOG.debug("Closing openflow bundle for device {}", dpnId); /* Close previously opened bundle on the openflow switch if any */ ListenableFuture> closeBundle = salBundleService.controlBundle(closeBundleInput); @@ -219,23 +215,31 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { MoreExecutors.directExecutor()); /* Push groups and flows via bundle add messages */ - ListenableFuture> addBundleMessagesFuture + ListenableFuture> deleteAllFlowGroupsFuture = Futures.transformAsync(openBundle, rpcResult -> { if (rpcResult.isSuccessful()) { - return salBundleService.addBundleMessages(addBundleMessagesInput); + return salBundleService.addBundleMessages(deleteAllFlowGroupsInput); } return Futures.immediateFuture(null); }, MoreExecutors.directExecutor()); - /* Commit the bundle on the openflow switch */ - ListenableFuture> commitBundleFuture - = Futures.transformAsync(addBundleMessagesFuture, rpcResult -> { + /* Push flows and groups via bundle add messages */ + Optional finalFlowNode = flowNode; + ListenableFuture>> addbundlesFuture + = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> { if (rpcResult.isSuccessful()) { - return salBundleService.controlBundle(commitBundleInput); + LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId); + return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue, + nodeIdentity)); } return Futures.immediateFuture(null); }, MoreExecutors.directExecutor()); + /* Commit the bundle on the openflow switch */ + ListenableFuture> commitBundleFuture + = Futures.transformAsync(addbundlesFuture, rpcResult -> + salBundleService.controlBundle(commitBundleInput), MoreExecutors.directExecutor()); + /* Bundles not supported for meters */ List meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter() : Collections.emptyList(); @@ -250,9 +254,9 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } return Futures.immediateFuture(null); }, MoreExecutors.directExecutor()); - try { - if (commitBundleFuture.get().isSuccessful()) { + RpcResult bundleFuture = commitBundleFuture.get(); + if (bundleFuture != null && bundleFuture.isSuccessful()) { LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId); OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId); return true; @@ -260,7 +264,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { return false; } } catch (InterruptedException | ExecutionException e) { - LOG.error("Error while doing bundle based reconciliation for device ID:{}", nodeIdentity); + LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId); return false; } } @@ -690,6 +694,26 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { return nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId()))); } + private List>> addBundleMessages(final FlowCapableNode flowNode, + final BundleId bundleIdValue, + final InstanceIdentifier nodeIdentity) { + List>> futureList = new ArrayList<>(); + for (Group group : flowNode.nonnullGroup()) { + final KeyedInstanceIdentifier groupIdent = nodeIdentity.child(Group.class, group.key()); + futureList.add(provider.getBundleGroupListener().add(groupIdent, group, nodeIdentity, bundleIdValue)); + } + + for (Table table : flowNode.nonnullTable()) { + final KeyedInstanceIdentifier tableIdent = nodeIdentity.child(Table.class, table.key()); + for (Flow flow : table.nonnullFlow()) { + final KeyedInstanceIdentifier flowIdent = tableIdent.child(Flow.class, flow.key()); + futureList.add(provider.getBundleFlowListener().add(flowIdent, flow, nodeIdentity, bundleIdValue)); + } + } + OF_EVENT_LOG.debug("Flow/Group count is {}", futureList.size()); + return futureList; + } + private void handleStaleEntityDeletionResultFuture(FluentFuture submitFuture) { submitFuture.addCallback(new FutureCallback() { @Override @@ -717,7 +741,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { return groupBuilder.build(); } - private Messages createMessages(final NodeRef nodeRef, final Optional flowNode) { + private Messages createMessages(final NodeRef nodeRef) { final List messages = new ArrayList<>(); messages.add(new MessageBuilder().setNode(nodeRef) .setBundleInnerMessage(new BundleRemoveFlowCaseBuilder() @@ -728,29 +752,6 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { .setBundleInnerMessage(new BundleRemoveGroupCaseBuilder() .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()).build()) .build()); - - NodeId nodeId = nodeRef.getValue().firstKeyOf(Node.class).getId(); - if (flowNode.get().getGroup() != null) { - for (Group gr : flowNode.get().getGroup()) { - provider.getDevicesGroupRegistry().storeGroup(nodeId,gr.getGroupId().getValue()); - messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(new BundleAddGroupCaseBuilder() - .setAddGroupCaseData(new AddGroupCaseDataBuilder(gr).build()).build()).build()); - } - } - - if (flowNode.get().getTable() != null) { - for (Table table : flowNode.get().getTable()) { - for (Flow flow : table.getFlow()) { - messages.add( - new MessageBuilder().setNode(nodeRef) - .setBundleInnerMessage(new BundleAddFlowCaseBuilder() - .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build()) - .build()); - } - } - } - - LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size()); return new MessagesBuilder().setMessage(messages).build(); } } diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/ForwardingRulesManagerImpl.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/ForwardingRulesManagerImpl.java index 334944c215..3aa4f0b04d 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/ForwardingRulesManagerImpl.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/ForwardingRulesManagerImpl.java @@ -27,6 +27,7 @@ import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService; import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager; +import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter; import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesCommiter; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; @@ -88,6 +89,8 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager { private ForwardingRulesCommiter groupListener; private ForwardingRulesCommiter meterListener; private ForwardingRulesCommiter tableListener; + private BundleMessagesCommiter bundleFlowListener; + private BundleMessagesCommiter bundleGroupListener; private FlowNodeReconciliation nodeListener; private NotificationRegistration reconciliationNotificationRegistration; private FlowNodeConnectorInventoryTranslatorImpl flowNodeConnectorInventoryTranslatorImpl; @@ -166,6 +169,8 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager { new FrmReconciliationServiceImpl(this)); flowNodeConnectorInventoryTranslatorImpl = new FlowNodeConnectorInventoryTranslatorImpl(dataService); + this.bundleFlowListener = new BundleFlowForwarder(this); + this.bundleGroupListener = new BundleGroupForwarder(this); this.flowListener = new FlowForwarder(this, dataService); this.groupListener = new GroupForwarder(this, dataService); this.meterListener = new MeterForwarder(this, dataService); @@ -292,6 +297,16 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager { return tableListener; } + @Override + public BundleMessagesCommiter getBundleFlowListener() { + return bundleFlowListener; + } + + @Override + public BundleMessagesCommiter getBundleGroupListener() { + return bundleGroupListener; + } + @Override public ArbitratorReconcileService getArbitratorReconciliationManager() { return arbitratorReconciliationManager; diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java index 58a66bdef2..ae54153081 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java @@ -62,11 +62,9 @@ public class GroupForwarder extends AbstractListeningCommiter { private static final Logger LOG = LoggerFactory.getLogger(GroupForwarder.class); private ListenerRegistration listenerRegistration; - private final BundleGroupForwarder bundleGroupForwarder; public GroupForwarder(final ForwardingRulesManager manager, final DataBroker db) { super(manager, db); - this.bundleGroupForwarder = new BundleGroupForwarder(manager); } @SuppressWarnings("IllegalCatch") @@ -108,7 +106,7 @@ public class GroupForwarder extends AbstractListeningCommiter { final InstanceIdentifier nodeIdent) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { - bundleGroupForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId); + provider.getBundleGroupListener().remove(identifier, removeDataObj, nodeIdent, bundleId); } else { final Group group = removeDataObj; final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group); @@ -146,7 +144,7 @@ public class GroupForwarder extends AbstractListeningCommiter { final InstanceIdentifier nodeIdent) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { - bundleGroupForwarder.update(identifier, original, update, nodeIdent, bundleId); + provider.getBundleGroupListener().update(identifier, original, update, nodeIdent, bundleId); } else { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); nodeConfigurator.enqueueJob(nodeId.getValue(), () -> { @@ -175,7 +173,7 @@ public class GroupForwarder extends AbstractListeningCommiter { final InstanceIdentifier nodeIdent) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { - return bundleGroupForwarder.add(identifier, addDataObj, nodeIdent, bundleId); + return provider.getBundleGroupListener().add(identifier, addDataObj, nodeIdent, bundleId); } else { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); return nodeConfigurator diff --git a/openflowjava/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java b/openflowjava/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java index af0265d832..0be00028a1 100644 --- a/openflowjava/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java +++ b/openflowjava/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java @@ -8,6 +8,7 @@ package org.opendaylight.openflowjava.protocol.api.connection; import com.google.common.annotations.Beta; +import java.math.BigInteger; import java.net.InetSocketAddress; import java.util.concurrent.Future; import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener; @@ -115,4 +116,10 @@ public interface ConnectionAdapter extends OpenflowProtocolService { */ @Beta void setPacketInFiltering(boolean enabled); + + /** + * Set datapathId for the dpn. + * @param datapathId of the dpn + */ + void setDatapathId(BigInteger datapathId); } diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java index ebb170e6f3..927d7310f6 100644 --- a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java +++ b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java @@ -13,6 +13,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.math.BigInteger; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -235,8 +236,10 @@ abstract class AbstractOutboundQueueManager outputManager; private OFVersionDetector versionDetector; + private BigInteger datapathId; private final boolean useBarrier; @@ -91,7 +93,9 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i @Override public void consumeDeviceMessage(final DataObject message) { - LOG.debug("ConsumeIntern msg on {}", channel); + LOG.debug("ConsumeIntern msg {} for dpn {} on {}", message.implementedInterface().getSimpleName(), + datapathId, channel); + LOG.trace("ConsumeIntern msg {}", message); if (disconnectOccured) { return; } @@ -107,7 +111,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i // OpenFlow messages } else if (message instanceof EchoRequestMessage) { if (outputManager != null) { - outputManager.onEchoRequest((EchoRequestMessage) message); + outputManager.onEchoRequest((EchoRequestMessage) message, datapathId); } else { messageListener.onEchoRequestMessage((EchoRequestMessage) message); } @@ -228,4 +232,9 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i versionDetector.setFilterPacketIns(enabled); LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis"); } + + @Override + public void setDatapathId(final BigInteger datapathId) { + this.datapathId = datapathId; + } } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/HandshakeManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/HandshakeManagerImpl.java index ccba07f55a..ac8aa0f9c8 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/HandshakeManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/HandshakeManagerImpl.java @@ -384,6 +384,7 @@ public class HandshakeManagerImpl implements HandshakeManager { LOG.trace("features are back"); if (rpcFeatures.isSuccessful()) { GetFeaturesOutput featureOutput = rpcFeatures.getResult(); + connectionAdapter.setDatapathId(featureOutput.getDatapathId()); if (!deviceConnectionRateLimiter.tryAquire()) { LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying" + " the connection from device {}", featureOutput.getDatapathId());