From 136910d52e526c9205cc6d3b989f60b81551d87b Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 9 Feb 2024 04:13:38 +0100 Subject: [PATCH] Do not use JdkFutureAdapters Expose a ListenableFuture from our APIs, so that callers can hook to async processing. Where we are using an executor, wrap the invocation in Futures.submit(), so we get a proper ListenableFuture. Finally fix a thinko, where we would be reporting a success before the enqueued task finishes. Change-Id: Ideffb18ea6fbe47ca4d56616a52891b3155088f1 Signed-off-by: Robert Varga --- .../frm/ForwardingRulesCommiter.java | 6 +- .../applications/frm/impl/FlowForwarder.java | 90 +++++------ .../frm/impl/FlowNodeReconciliationImpl.java | 151 ++++++++---------- .../applications/frm/impl/GroupForwarder.java | 115 +++++-------- .../applications/frm/impl/MeterForwarder.java | 101 ++++-------- .../applications/frm/impl/TableForwarder.java | 8 +- .../SyncPlanPushStrategyIncrementalImpl.java | 4 +- 7 files changed, 188 insertions(+), 287 deletions(-) diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java index 0e37eaf5b7..40624e97d6 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java @@ -7,7 +7,7 @@ */ package org.opendaylight.openflowplugin.applications.frm; -import java.util.concurrent.Future; +import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yangtools.yang.binding.DataObject; @@ -66,7 +66,7 @@ public interface ForwardingRulesCommiter * @return A future associated with RPC task. {@code null} is set to the future * if this method does not invoke RPC. */ - Future> add(InstanceIdentifier identifier, D add, + ListenableFuture> add(InstanceIdentifier identifier, D add, InstanceIdentifier nodeIdent); /** @@ -83,7 +83,7 @@ public interface ForwardingRulesCommiter */ void createStaleMarkEntity(InstanceIdentifier identifier, D del, InstanceIdentifier nodeIdent); - Future> removeWithResult(InstanceIdentifier identifier, D del, + ListenableFuture> removeWithResult(InstanceIdentifier identifier, D del, InstanceIdentifier nodeIdent); } diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java index 6ff32f3089..663b03ebff 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java @@ -15,7 +15,6 @@ import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getN import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice; -import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -23,11 +22,11 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.ReadTransaction; import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri; @@ -193,49 +192,46 @@ public class FlowForwarder extends AbstractListeningCommiter { } @Override - public Future> add(final InstanceIdentifier identifier, final Flow addDataObj, - final InstanceIdentifier nodeIdent) { - - final TableKey tableKey = identifier.firstKeyOf(Table.class); - if (tableIdValidationPrecondition(tableKey, addDataObj)) { - BundleId bundleId = getActiveBundle(nodeIdent, provider); - if (bundleId != null) { - return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId); - } else { - final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent); - nodeConfigurator.enqueueJob(nodeId, () -> { - final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj); - - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setFlowRef(new FlowRef(identifier)); - builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - Uint32 groupId = isFlowDependentOnGroup(addDataObj); - if (groupId != null) { - LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present", - getFlowId(new FlowRef(identifier)), groupId); - if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) { - LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId, - getFlowId(new FlowRef(identifier))); - return provider.getSalFlowService().addFlow(builder.build()); - } else { - LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId); - ListenableFuture> groupFuture = pushDependentGroup(nodeIdent, - groupId); - SettableFuture> resultFuture = SettableFuture.create(); - Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId, - resultFuture), MoreExecutors.directExecutor()); - return resultFuture; - } - } + public ListenableFuture> add(final InstanceIdentifier identifier, + final Flow addDataObj, final InstanceIdentifier nodeIdent) { + final var tableKey = identifier.firstKeyOf(Table.class); + if (!tableIdValidationPrecondition(tableKey, addDataObj)) { + return Futures.immediateFuture(null); + } + final var bundleId = getActiveBundle(nodeIdent, provider); + if (bundleId != null) { + return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId); + } - LOG.trace("The flow {} is not dependent on any group. Adding the flow", - getFlowId(new FlowRef(identifier))); + final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent); + return nodeConfigurator.enqueueJob(nodeId, () -> { + final var builder = new AddFlowInputBuilder(addDataObj) + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setFlowRef(new FlowRef(identifier)) + .setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))) + .setTransactionUri(new Uri(provider.getNewTransactionId())); + final var groupId = isFlowDependentOnGroup(addDataObj); + if (groupId != null) { + LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present", + getFlowId(new FlowRef(identifier)), groupId); + if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) { + LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId, + getFlowId(new FlowRef(identifier))); return provider.getSalFlowService().addFlow(builder.build()); - }); + } + + LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId); + final var groupFuture = pushDependentGroup(nodeIdent, groupId); + final var resultFuture = SettableFuture.>create(); + Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId, + resultFuture), MoreExecutors.directExecutor()); + return resultFuture; } - } - return Futures.immediateFuture(null); + + LOG.trace("The flow {} is not dependent on any group. Adding the flow", + getFlowId(new FlowRef(identifier))); + return provider.getSalFlowService().addFlow(builder.build()); + }); } @Override @@ -274,14 +270,9 @@ public class FlowForwarder extends AbstractListeningCommiter { writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleFlowInstanceIdentifier(staleFlow, nodeIdent), staleFlow); - FluentFuture submitFuture = writeTransaction.commit(); - handleStaleFlowResultFuture(submitFuture); - } - - private static void handleStaleFlowResultFuture(final FluentFuture submitFuture) { - submitFuture.addCallback(new FutureCallback() { + writeTransaction.commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Object result) { + public void onSuccess(final CommitInfo result) { LOG.debug("Stale Flow creation success"); } @@ -290,7 +281,6 @@ public class FlowForwarder extends AbstractListeningCommiter { LOG.error("Stale Flow creation failed", throwable); } }, MoreExecutors.directExecutor()); - } private static InstanceIdentifier disconnectedNode) { - String node = disconnectedNode.firstKeyOf(Node.class).getId().getValue(); - BigInteger dpnId = getDpnIdFromNodeName(node); + final var node = disconnectedNode.firstKeyOf(Node.class).getId().getValue(); + // FIXME: BigInteger.toString() called here + final var dpnId = getDpnIdFromNodeName(node); reconciliationStates.remove(dpnId.toString()); } @@ -211,88 +204,82 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } if (flowNode.isPresent()) { - ReconciliationState reconciliationState = new ReconciliationState( - STARTED, LocalDateTime.now()); + final var reconciliationState = new ReconciliationState(STARTED, LocalDateTime.now()); //put the dpn info into the map reconciliationStates.put(dpnId.toString(), reconciliationState); LOG.debug("FlowNode present for Datapath ID {}", dpnId); OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId); - final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class)); + final var nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class)); - final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) + final var closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setType(BundleControlType.ONFBCTCLOSEREQUEST).build(); - final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) + final var openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setType(BundleControlType.ONFBCTOPENREQUEST).build(); - final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) + final var commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setType(BundleControlType.ONFBCTCOMMITREQUEST).build(); - final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder() + final var deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder() .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setMessages(createMessages(nodeRef)).build(); LOG.debug("Closing openflow bundle for device {}", dpnId); /* Close previously opened bundle on the openflow switch if any */ - ListenableFuture> closeBundle - = salBundleService.controlBundle(closeBundleInput); + final var closeBundle = salBundleService.controlBundle(closeBundleInput); /* Open a new bundle on the switch */ - ListenableFuture> openBundle - = Futures.transformAsync(closeBundle, rpcResult -> { - if (rpcResult.isSuccessful()) { - LOG.debug("Existing bundle is successfully closed for device {}", dpnId); - } - return salBundleService.controlBundle(openBundleInput); - }, service); - - /* Push groups and flows via bundle add messages */ - ListenableFuture> deleteAllFlowGroupsFuture - = Futures.transformAsync(openBundle, rpcResult -> { - if (rpcResult.isSuccessful()) { - LOG.debug("Open bundle is successful for device {}", dpnId); - return salBundleService.addBundleMessages(deleteAllFlowGroupsInput); - } - return Futures.immediateFuture(null); - }, service); + final var openBundle = Futures.transformAsync(closeBundle, rpcResult -> { + if (rpcResult.isSuccessful()) { + LOG.debug("Existing bundle is successfully closed for device {}", dpnId); + } + return salBundleService.controlBundle(openBundleInput); + }, service); + + /* Push groups and flows via bundle add messages */ + final var deleteAllFlowGroupsFuture = Futures.transformAsync(openBundle, rpcResult -> { + if (rpcResult.isSuccessful()) { + LOG.debug("Open bundle is successful for device {}", dpnId); + return salBundleService.addBundleMessages(deleteAllFlowGroupsInput); + } + return Futures.immediateFuture(null); + }, service); /* Push flows and groups via bundle add messages */ - Optional finalFlowNode = flowNode; - ListenableFuture>> addbundlesFuture = - Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> { - if (rpcResult.isSuccessful()) { - LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId); - return Futures.allAsList(addBundleMessages(finalFlowNode.orElseThrow(), bundleIdValue, - nodeIdentity)); - } - return Futures.immediateFuture(null); - }, service); + final var finalFlowNode = flowNode; + final var addbundlesFuture = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> { + if (rpcResult.isSuccessful()) { + LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId); + return Futures.allAsList(addBundleMessages(finalFlowNode.orElseThrow(), bundleIdValue, + nodeIdentity)); + } + return Futures.immediateFuture(null); + }, service); - /* Commit the bundle on the openflow switch */ - ListenableFuture> commitBundleFuture = - Futures.transformAsync(addbundlesFuture, rpcResult -> { - LOG.debug("Adding bundle messages completed for device {}", dpnId); - return salBundleService.controlBundle(commitBundleInput); - }, service); + /* Commit the bundle on the openflow switch */ + final var commitBundleFuture = Futures.transformAsync(addbundlesFuture, rpcResult -> { + LOG.debug("Adding bundle messages completed for device {}", dpnId); + return salBundleService.controlBundle(commitBundleInput); + }, service); /* Bundles not supported for meters */ - Collection meters = finalFlowNode.orElseThrow().nonnullMeter().values(); + final var meters = finalFlowNode.orElseThrow().nonnullMeter().values(); Futures.transformAsync(commitBundleFuture, rpcResult -> { if (rpcResult.isSuccessful()) { - for (Meter meter : meters) { - final KeyedInstanceIdentifier meterIdent = nodeIdentity - .child(Meter.class, meter.key()); - provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity); + for (var meter : meters) { + provider.getMeterCommiter().add(nodeIdentity.child(Meter.class, meter.key()), meter, + nodeIdentity); } } return Futures.immediateFuture(null); }, service); + try { - RpcResult bundleFuture = commitBundleFuture.get(); + final var bundleFuture = commitBundleFuture.get(); if (bundleFuture != null && bundleFuture.isSuccessful()) { reconciliationState.setState(COMPLETED, LocalDateTime.now()); LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId); @@ -533,12 +520,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { private void addGroup(final Map> map, final Group group) { KeyedInstanceIdentifier groupIdent = nodeIdentity.child(Group.class, group.key()); final Uint32 groupId = group.getGroupId().getValue(); - ListenableFuture future = JdkFutureAdapters - .listenInPoolThread(provider.getGroupCommiter().add(groupIdent, group, nodeIdentity)); + final var future = provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(final Object result) { + public void onSuccess(final RpcResult result) { if (LOG.isTraceEnabled()) { LOG.trace("add-group RPC completed: node={}, id={}", nodeIdentity.firstKeyOf(Node.class).getId().getValue(), groupId); @@ -678,36 +664,27 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } private void deleteDSStaleFlows(final List> flowsForBulkDelete) { - WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - - for (InstanceIdentifier staleFlowIId : flowsForBulkDelete) { + final var writeTransaction = dataBroker.newWriteOnlyTransaction(); + for (var staleFlowIId : flowsForBulkDelete) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleFlowIId); } - - FluentFuture submitFuture = writeTransaction.commit(); - handleStaleEntityDeletionResultFuture(submitFuture); + handleStaleEntityDeletionResultFuture(writeTransaction.commit()); } private void deleteDSStaleGroups(final List> groupsForBulkDelete) { - WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - - for (InstanceIdentifier staleGroupIId : groupsForBulkDelete) { + final var writeTransaction = dataBroker.newWriteOnlyTransaction(); + for (var staleGroupIId : groupsForBulkDelete) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleGroupIId); } - - FluentFuture submitFuture = writeTransaction.commit(); - handleStaleEntityDeletionResultFuture(submitFuture); + handleStaleEntityDeletionResultFuture(writeTransaction.commit()); } private void deleteDSStaleMeters(final List> metersForBulkDelete) { - WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - - for (InstanceIdentifier staleMeterIId : metersForBulkDelete) { + final var writeTransaction = dataBroker.newWriteOnlyTransaction(); + for (var staleMeterIId : metersForBulkDelete) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleMeterIId); } - - FluentFuture submitFuture = writeTransaction.commit(); - handleStaleEntityDeletionResultFuture(submitFuture); + handleStaleEntityDeletionResultFuture(writeTransaction.commit()); } private static InstanceIdentifier submitFuture) { - submitFuture.addCallback(new FutureCallback() { + private static void handleStaleEntityDeletionResultFuture(final FluentFuture submitFuture) { + submitFuture.addCallback(new FutureCallback() { @Override - public void onSuccess(final Object result) { + public void onSuccess(final CommitInfo result) { LOG.debug("Stale entity removal success"); } diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java index 7fe5f8fc93..9269926826 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java @@ -10,26 +10,21 @@ package org.opendaylight.openflowplugin.applications.frm.impl; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdValueFromNodeIdentifier; -import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import java.util.concurrent.Future; import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; import org.opendaylight.mdsal.binding.api.DataBroker; -import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.OriginalGroupBuilder; @@ -82,37 +77,30 @@ public class GroupForwarder extends AbstractListeningCommiter { } else { final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent); nodeConfigurator.enqueueJob(nodeId, () -> { - final Group group = removeDataObj; - final RemoveGroupInput removeGroup = new RemoveGroupInputBuilder(group) + final var removeGroup = new RemoveGroupInputBuilder(removeDataObj) .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) .setGroupRef(new GroupRef(identifier)) .setTransactionUri(new Uri(provider.getNewTransactionId())) .build(); - final ListenableFuture> resultFuture = - provider.getSalGroupService() - .removeGroup(removeGroup); + final var resultFuture = provider.getSalGroupService() .removeGroup(removeGroup); Futures.addCallback(resultFuture, new RemoveGroupCallBack(removeDataObj.getGroupId().getValue(), nodeId), MoreExecutors.directExecutor()); - LoggingFutures.addErrorLogging(resultFuture, LOG, "removeGroup"); - return resultFuture; + return LoggingFutures.addErrorLogging(resultFuture, LOG, "removeGroup"); }); } } // TODO: Pull this into ForwardingRulesCommiter and override it here @Override - public Future> removeWithResult(final InstanceIdentifier identifier, + public ListenableFuture> removeWithResult(final InstanceIdentifier identifier, final Group removeDataObj, final InstanceIdentifier nodeIdent) { - - final Group group = removeDataObj; - final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group); - - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setGroupRef(new GroupRef(identifier)); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - return provider.getSalGroupService().removeGroup(builder.build()); + return provider.getSalGroupService().removeGroup(new RemoveGroupInputBuilder(removeDataObj) + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setGroupRef(new GroupRef(identifier)) + .setTransactionUri(new Uri(provider.getNewTransactionId())) + .build()); } @Override @@ -124,18 +112,15 @@ public class GroupForwarder extends AbstractListeningCommiter { } else { final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent); nodeConfigurator.enqueueJob(nodeId, () -> { - final Group originalGroup = original; - final Group updatedGroup = update; - final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder(); - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setGroupRef(new GroupRef(identifier)); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - builder.setUpdatedGroup(new UpdatedGroupBuilder(updatedGroup).build()); - builder.setOriginalGroup(new OriginalGroupBuilder(originalGroup).build()); - UpdateGroupInput updateGroupInput = builder.build(); - final ListenableFuture> resultFuture = provider.getSalGroupService() - .updateGroup(updateGroupInput); - LoggingFutures.addErrorLogging(resultFuture, LOG, "updateGroup"); + final var updateGroupInput = new UpdateGroupInputBuilder() + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setGroupRef(new GroupRef(identifier)) + .setTransactionUri(new Uri(provider.getNewTransactionId())) + .setUpdatedGroup(new UpdatedGroupBuilder(update).build()) + .setOriginalGroup(new OriginalGroupBuilder(original).build()) + .build(); + final var resultFuture = LoggingFutures.addErrorLogging( + provider.getSalGroupService().updateGroup(updateGroupInput), LOG, "updateGroup"); Futures.addCallback(resultFuture, new UpdateGroupCallBack(updateGroupInput.getOriginalGroup().getGroupId().getValue(), nodeId), MoreExecutors.directExecutor()); @@ -145,59 +130,39 @@ public class GroupForwarder extends AbstractListeningCommiter { } @Override - public Future> add(final InstanceIdentifier identifier, final Group addDataObj, - final InstanceIdentifier nodeIdent) { - BundleId bundleId = getActiveBundle(nodeIdent, provider); + public ListenableFuture> add(final InstanceIdentifier identifier, + final Group addDataObj, final InstanceIdentifier nodeIdent) { + final var bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { return provider.getBundleGroupListener().add(identifier, addDataObj, nodeIdent, bundleId); - } else { - final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent); - return nodeConfigurator - .enqueueJob(nodeId, () -> { - final Group group = addDataObj; - final AddGroupInputBuilder builder = new AddGroupInputBuilder(group); - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setGroupRef(new GroupRef(identifier)); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - AddGroupInput addGroupInput = builder.build(); - final ListenableFuture> resultFuture; - resultFuture = provider.getSalGroupService().addGroup(addGroupInput); - Futures.addCallback(resultFuture, - new AddGroupCallBack(addGroupInput.getGroupId().getValue(), nodeId), - MoreExecutors.directExecutor()); - return resultFuture; - }); } + + final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent); + return nodeConfigurator.enqueueJob(nodeId, () -> { + final var addGroupInput = new AddGroupInputBuilder(addDataObj) + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setGroupRef(new GroupRef(identifier)) + .setTransactionUri(new Uri(provider.getNewTransactionId())) + .build(); + final var resultFuture = provider.getSalGroupService().addGroup(addGroupInput); + Futures.addCallback(resultFuture, + new AddGroupCallBack(addGroupInput.getGroupId().getValue(), nodeId), + MoreExecutors.directExecutor()); + return resultFuture; + }); } @Override public void createStaleMarkEntity(final InstanceIdentifier identifier, final Group del, final InstanceIdentifier nodeIdent) { LOG.debug("Creating Stale-Mark entry for the switch {} for Group {} ", nodeIdent, del); - StaleGroup staleGroup = makeStaleGroup(identifier, del, nodeIdent); - persistStaleGroup(staleGroup, nodeIdent); - - } - - private static StaleGroup makeStaleGroup(final InstanceIdentifier identifier, final Group del, - final InstanceIdentifier nodeIdent) { - StaleGroupBuilder staleGroupBuilder = new StaleGroupBuilder(del); - return staleGroupBuilder.setGroupId(del.getGroupId()).build(); - } - - private void persistStaleGroup(final StaleGroup staleGroup, final InstanceIdentifier nodeIdent) { - WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); + final var staleGroup = new StaleGroupBuilder(del).setGroupId(del.getGroupId()).build(); + final var writeTransaction = dataBroker.newWriteOnlyTransaction(); writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleGroupInstanceIdentifier(staleGroup, nodeIdent), staleGroup); - - FluentFuture submitFuture = writeTransaction.commit(); - handleStaleGroupResultFuture(submitFuture); - } - - private static void handleStaleGroupResultFuture(final FluentFuture submitFuture) { - submitFuture.addCallback(new FutureCallback() { + writeTransaction.commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Object result) { + public void onSuccess(final CommitInfo result) { LOG.debug("Stale Group creation success"); } diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java index 9c437f7f3f..9bf05d8df4 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java @@ -7,13 +7,12 @@ */ package org.opendaylight.openflowplugin.applications.frm.impl; -import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import java.util.concurrent.Future; import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; import org.opendaylight.mdsal.binding.api.DataBroker; -import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri; @@ -64,83 +63,59 @@ public class MeterForwarder extends AbstractListeningCommiter { @Override public void remove(final InstanceIdentifier identifier, final Meter removeDataObj, final InstanceIdentifier nodeIdent) { - - final RemoveMeterInputBuilder builder = new RemoveMeterInputBuilder(removeDataObj); - - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setMeterRef(new MeterRef(identifier)); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - - LoggingFutures.addErrorLogging(provider.getSalMeterService().removeMeter(builder.build()), LOG, - "removeMeter"); + LoggingFutures.addErrorLogging(provider.getSalMeterService() + .removeMeter(new RemoveMeterInputBuilder(removeDataObj) + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setMeterRef(new MeterRef(identifier)) + .setTransactionUri(new Uri(provider.getNewTransactionId())) + .build()), + LOG, "removeMeter"); } @Override - public Future> removeWithResult(final InstanceIdentifier identifier, + public ListenableFuture> removeWithResult(final InstanceIdentifier identifier, final Meter removeDataObj, final InstanceIdentifier nodeIdent) { - - final RemoveMeterInputBuilder builder = new RemoveMeterInputBuilder(removeDataObj); - - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setMeterRef(new MeterRef(identifier)); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - return provider.getSalMeterService().removeMeter(builder.build()); + return provider.getSalMeterService().removeMeter(new RemoveMeterInputBuilder(removeDataObj) + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setMeterRef(new MeterRef(identifier)) + .setTransactionUri(new Uri(provider.getNewTransactionId())) + .build()); } @Override public void update(final InstanceIdentifier identifier, final Meter original, final Meter update, final InstanceIdentifier nodeIdent) { - - final UpdateMeterInputBuilder builder = new UpdateMeterInputBuilder(); - - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setMeterRef(new MeterRef(identifier)); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - builder.setUpdatedMeter(new UpdatedMeterBuilder(update).build()); - builder.setOriginalMeter(new OriginalMeterBuilder(original).build()); - - LoggingFutures.addErrorLogging(provider.getSalMeterService().updateMeter(builder.build()), LOG, - "updateMeter"); + LoggingFutures.addErrorLogging(provider.getSalMeterService().updateMeter(new UpdateMeterInputBuilder() + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setMeterRef(new MeterRef(identifier)) + .setTransactionUri(new Uri(provider.getNewTransactionId())) + .setUpdatedMeter(new UpdatedMeterBuilder(update).build()) + .setOriginalMeter(new OriginalMeterBuilder(original).build()) + .build()), LOG, "updateMeter"); } @Override - public Future> add(final InstanceIdentifier identifier, final Meter addDataObj, - final InstanceIdentifier nodeIdent) { - - final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj); - - builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); - builder.setMeterRef(new MeterRef(identifier)); - builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - return provider.getSalMeterService().addMeter(builder.build()); + public ListenableFuture> add(final InstanceIdentifier identifier, + final Meter addDataObj, final InstanceIdentifier nodeIdent) { + return provider.getSalMeterService().addMeter(new AddMeterInputBuilder(addDataObj) + .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))) + .setMeterRef(new MeterRef(identifier)) + .setTransactionUri(new Uri(provider.getNewTransactionId())) + .build()); } @Override public void createStaleMarkEntity(final InstanceIdentifier identifier, final Meter del, final InstanceIdentifier nodeIdent) { LOG.debug("Creating Stale-Mark entry for the switch {} for meter {} ", nodeIdent, del); - StaleMeter staleMeter = makeStaleMeter(del); - persistStaleMeter(staleMeter, nodeIdent); - } - - private static StaleMeter makeStaleMeter(final Meter del) { - StaleMeterBuilder staleMeterBuilder = new StaleMeterBuilder(del); - return staleMeterBuilder.setMeterId(del.getMeterId()).build(); - } - - private void persistStaleMeter(final StaleMeter staleMeter, final InstanceIdentifier nodeIdent) { - WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleMeterInstanceIdentifier(staleMeter, nodeIdent), - staleMeter); - FluentFuture submitFuture = writeTransaction.commit(); - handleStaleMeterResultFuture(submitFuture); - } - - private static void handleStaleMeterResultFuture(final FluentFuture submitFuture) { - submitFuture.addCallback(new FutureCallback() { + final var staleMeter = new StaleMeterBuilder(del).setMeterId(del.getMeterId()).build(); + final var writeTransaction = dataBroker.newWriteOnlyTransaction(); + writeTransaction.put(LogicalDatastoreType.CONFIGURATION, + nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId()))), staleMeter); + writeTransaction.commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Object result) { + public void onSuccess(final CommitInfo result) { LOG.debug("Stale Meter creation success"); } @@ -150,10 +125,4 @@ public class MeterForwarder extends AbstractListeningCommiter { } }, MoreExecutors.directExecutor()); } - - private static InstanceIdentifier getStaleMeterInstanceIdentifier( - final StaleMeter staleMeter, final InstanceIdentifier nodeIdent) { - return nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId()))); - } } diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java index b2ff5f0ac5..ae050038fa 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java @@ -8,8 +8,8 @@ package org.opendaylight.openflowplugin.applications.frm.impl; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.util.Collections; -import java.util.concurrent.Future; import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; @@ -89,7 +89,7 @@ public class TableForwarder extends AbstractListeningCommiter { } @Override - public Future> add(final InstanceIdentifier identifier, + public ListenableFuture> add(final InstanceIdentifier identifier, final TableFeatures addDataObj, final InstanceIdentifier nodeIdent) { return Futures.immediateFuture(null); } @@ -101,8 +101,8 @@ public class TableForwarder extends AbstractListeningCommiter { } @Override - public Future> removeWithResult(final InstanceIdentifier identifier, + public ListenableFuture> removeWithResult(final InstanceIdentifier identifier, final TableFeatures del, final InstanceIdentifier nodeIdent) { - return null; + return Futures.immediateFuture(null); } } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyIncrementalImpl.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyIncrementalImpl.java index 30971137bb..5e3ae5dea5 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyIncrementalImpl.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyIncrementalImpl.java @@ -310,8 +310,8 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy // // at // // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer // // .serializeHeader(AbstractOxmMatchEntrySerializer.java:31 -// // allResults.add(JdkFutureAdapters.listenInPoolThread( -// // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent))); +// // allResults.add( +// // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)); // } // } // } -- 2.36.6