X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Farbitratorreconciliation%2Fimpl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Farbitratorreconciliation%2Fimpl%2FArbitratorReconciliationManagerImpl.java;h=8eec6568fb3ecc2c58d8897a42a2926238b13967;hb=777c94332871b8c34f56f7f2010de1536cb759ba;hp=c0ef4db5fb6d240d182768899f5cca74a2d81c76;hpb=d7d3bc5587b714f788b9a533e29eae64a3a4c6d8;p=openflowplugin.git diff --git a/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java b/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java index c0ef4db5fb..8eec6568fb 100644 --- a/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java +++ b/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java @@ -5,34 +5,33 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl; +import static java.util.Objects.requireNonNull; + import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; -import org.apache.aries.blueprint.annotation.service.Reference; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration; @@ -40,13 +39,9 @@ import org.opendaylight.openflowplugin.applications.reconciliation.Reconciliatio import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener; import org.opendaylight.serviceutils.upgrade.UpgradeState; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; @@ -62,7 +57,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder; @@ -77,11 +74,17 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.RpcService; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; +import org.opendaylight.yangtools.yang.common.Uint64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,25 +101,39 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS private static final String SERVICE_NAME = "ArbitratorReconciliationManager"; private static final String SEPARATOR = ":"; + private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder() + .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder() + .setTableId(OFConstants.OFPTT_ALL) + .build()) + .build(); + private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder() + .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder() + .setGroupType(GroupTypes.GroupAll) + .setGroupId(new GroupId(OFConstants.OFPG_ALL)) + .build()).build()) + .build(); + private final SalBundleService salBundleService; private final ReconciliationManager reconciliationManager; - private final RoutedRpcRegistration routedRpcReg; + private final RpcProviderService rpcProviderService; private final UpgradeState upgradeState; private NotificationRegistration registration; - private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); - private final Map bundleIdMap = new ConcurrentHashMap<>(); + private final ListeningExecutorService executor = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(THREAD_POOL_SIZE)); + private final Map bundleIdMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> rpcRegistrations = new ConcurrentHashMap<>(); @Inject - public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry, - @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) { + public ArbitratorReconciliationManagerImpl(final ReconciliationManager reconciliationManager, + final RpcProviderService rpcProviderService, final RpcConsumerRegistry rpcRegistry, + final UpgradeState upgradeState) { Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !"); - this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager, - "ReconciliationManager cannot be null!"); - this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class), - "RPC SalBundlService not found."); - this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class, - this); - this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!"); + this.reconciliationManager = requireNonNull(reconciliationManager, "ReconciliationManager cannot be null!"); + salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class), + "RPC SalBundleService not found."); + this.rpcProviderService = rpcProviderService; + this.upgradeState = requireNonNull(upgradeState, "UpgradeState cannot be null!"); } @PostConstruct @@ -137,35 +154,39 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS @Override public ListenableFuture> commitActiveBundle( - CommitActiveBundleInput input) { - BigInteger nodeId = input.getNodeId(); + final CommitActiveBundleInput input) { + Uint64 nodeId = input.getNodeId(); if (bundleIdMap.containsKey(nodeId)) { BundleId bundleId = bundleIdMap.get(nodeId).getBundleId(); if (bundleId != null) { final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder() - .setNode(input.getNode()).setBundleId(bundleId) + .setNode(input.getNode()) + .setBundleId(bundleId) .setFlags(BUNDLE_FLAGS) - .setType(BundleControlType.ONFBCTCOMMITREQUEST).build(); + .setType(BundleControlType.ONFBCTCOMMITREQUEST) + .build(); ListenableFuture> rpcResult = salBundleService .controlBundle(commitBundleInput); bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult)); - Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), + + Futures.addCallback(rpcResult, + new CommitActiveBundleCallback(nodeId), MoreExecutors.directExecutor()); return Futures.transform( rpcResult, - this.createRpcResultCondenser("committed active bundle"), + createRpcResultCondenser("committed active bundle"), MoreExecutors.directExecutor()); } } return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder() .setResult(null).build()) - .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, - null, "No active bundle found for the node" + nodeId.toString()))).buildFuture(); + .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION, + null, "No active bundle found for the node" + nodeId))).buildFuture(); } @Override - public ListenableFuture> getActiveBundle(GetActiveBundleInput input) { - BigInteger nodeId = input.getNodeId(); + public ListenableFuture> getActiveBundle(final GetActiveBundleInput input) { + Uint64 nodeId = input.getNodeId(); BundleDetails bundleDetails = bundleIdMap.get(nodeId); if (bundleDetails != null) { try { @@ -173,10 +194,12 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS // pipeline when the commit bundle is ongoing. bundleDetails.getResult().get(); return RpcResultBuilder.success(new GetActiveBundleOutputBuilder() - .setResult(bundleDetails.getBundleId()).build()).buildFuture(); - } catch (InterruptedException | ExecutionException | NullPointerException e) { + .setResult(bundleDetails.getBundleId()) + .build()) + .buildFuture(); + } catch (InterruptedException | ExecutionException e) { return RpcResultBuilder.failed() - .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, + .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null, e.getMessage()))).buildFuture(); } } @@ -185,23 +208,23 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS } @Override - public ListenableFuture startReconciliation(DeviceInfo node) { + public ListenableFuture startReconciliation(final DeviceInfo node) { registerRpc(node); if (upgradeState.isUpgradeInProgress()) { LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId()); return reconcileConfiguration(node); } LOG.trace("arbitrator reconciliation is disabled"); - return Futures.immediateFuture(true); + return FluentFutures.immediateTrueFluentFuture(); } @Override - public ListenableFuture endReconciliation(DeviceInfo node) { - BigInteger datapathId = node.getDatapathId(); + public ListenableFuture endReconciliation(final DeviceInfo node) { + Uint64 datapathId = node.getDatapathId(); LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId); bundleIdMap.remove(datapathId); deregisterRpc(node); - return Futures.immediateFuture(true); + return FluentFutures.immediateTrueFluentFuture(); } @Override @@ -219,40 +242,24 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS return ResultState.DONOTHING; } - private ListenableFuture reconcileConfiguration(DeviceInfo node) { + private ListenableFuture reconcileConfiguration(final DeviceInfo node) { LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId()); ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node); - return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask)); + return executor.submit(upgradeReconTask); } - private Messages createMessages(final NodeRef nodeRef) { + private static Messages createMessages(final NodeRef nodeRef) { final List messages = new ArrayList<>(); - messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage( - new BundleRemoveFlowCaseBuilder() - .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build()) - .build()).build()); - - messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage( - new BundleRemoveGroupCaseBuilder() - .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()) - .build()).build()); + messages.add(new MessageBuilder() + .setNode(nodeRef) + .setBundleInnerMessage(DELETE_ALL_FLOW).build()); + messages.add(new MessageBuilder() + .setNode(nodeRef) + .setBundleInnerMessage(DELETE_ALL_GROUP).build()); LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size()); return new MessagesBuilder().setMessage(messages).build(); } - private Flow getDeleteAllFlow() { - final FlowBuilder flowBuilder = new FlowBuilder(); - flowBuilder.setTableId(OFConstants.OFPTT_ALL); - return flowBuilder.build(); - } - - private Group getDeleteAllGroup() { - final GroupBuilder groupBuilder = new GroupBuilder(); - groupBuilder.setGroupType(GroupTypes.GroupAll); - groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL)); - return groupBuilder.build(); - } - private class ArbitratorReconciliationTask implements Callable { final DeviceInfo deviceInfo; @@ -265,21 +272,30 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS InstanceIdentifier nodeIdentity = deviceInfo.getNodeInstanceIdentifier() .augmentation(FlowCapableNode.class); String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue(); - BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement()); + BundleId bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement())); LOG.debug("Triggering arbitrator reconciliation for device :{}", node); final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class)); - final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) - .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) - .setType(BundleControlType.ONFBCTCLOSEREQUEST).build(); + final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder() + .setNode(nodeRef) + .setBundleId(bundleIdValue) + .setFlags(BUNDLE_FLAGS) + .setType(BundleControlType.ONFBCTCLOSEREQUEST) + .build(); - final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) - .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) - .setType(BundleControlType.ONFBCTOPENREQUEST).build(); + final ControlBundleInput openBundleInput = new ControlBundleInputBuilder() + .setNode(nodeRef) + .setBundleId(bundleIdValue) + .setFlags(BUNDLE_FLAGS) + .setType(BundleControlType.ONFBCTOPENREQUEST) + .build(); final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder() - .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) - .setMessages(createMessages(nodeRef)).build(); + .setNode(nodeRef) + .setBundleId(bundleIdValue) + .setFlags(BUNDLE_FLAGS) + .setMessages(createMessages(nodeRef)) + .build(); ListenableFuture> closeBundle = salBundleService .controlBundle(closeBundleInput); @@ -294,15 +310,13 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS return salBundleService .addBundleMessages(addBundleMessagesInput); } - return Futures.immediateFuture(null); + return FluentFutures.immediateNullFluentFuture(); }, MoreExecutors.directExecutor()); - BigInteger nodeId = getDpnIdFromNodeName(node); + Uint64 nodeId = getDpnIdFromNodeName(node); try { if (addBundleMessagesFuture.get().isSuccessful()) { - bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue, - Futures.immediateFuture(null))); - LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up" - + " for application programming.", nodeId); + bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture())); + LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId); return true; } else { LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId); @@ -316,26 +330,25 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS } public final class CommitActiveBundleCallback implements FutureCallback> { - private final BigInteger nodeId; + private final Uint64 nodeId; - private CommitActiveBundleCallback(final BigInteger nodeId) { + private CommitActiveBundleCallback(final Uint64 nodeId) { this.nodeId = nodeId; } @Override - public void onSuccess(RpcResult rpcResult) { + public void onSuccess(final RpcResult rpcResult) { LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId); bundleIdMap.remove(nodeId); } @Override - public void onFailure(Throwable throwable) { - LOG.error("Error while performing arbitrator reconciliation for device {}", - nodeId, throwable); + public void onFailure(final Throwable throwable) { + LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable); } } - private Function, + private static Function, RpcResult> createRpcResultCondenser(final String action) { return input -> { final RpcResultBuilder resultSink; @@ -349,31 +362,37 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS } } else { resultSink = RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed"); + .withError(ErrorType.APPLICATION, "action of " + action + " failed"); } return resultSink.build(); }; } - private void registerRpc(DeviceInfo node) { + private void registerRpc(final DeviceInfo node) { KeyedInstanceIdentifier path = InstanceIdentifier.create(Nodes.class) .child(Node.class, new NodeKey(node.getNodeId())); LOG.debug("The path is registered : {}", path); - routedRpcReg.registerPath(NodeContext.class, path); + ObjectRegistration rpcRegistration = + rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class, this, Set.of(path)); + rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration); } - private void deregisterRpc(DeviceInfo node) { - KeyedInstanceIdentifier path = InstanceIdentifier.create(Nodes.class).child(Node.class, - new NodeKey(node.getNodeId())); + private void deregisterRpc(final DeviceInfo node) { + KeyedInstanceIdentifier path = InstanceIdentifier.create(Nodes.class) + .child(Node.class, new NodeKey(node.getNodeId())); LOG.debug("The path is unregistered : {}", path); - routedRpcReg.unregisterPath(NodeContext.class, path); + ObjectRegistration rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue()); + if (rpcRegistration != null) { + rpcRegistration.close(); + rpcRegistrations.remove(node.getNodeId().getValue()); + } } private static class BundleDetails { private final BundleId bundleId; private final ListenableFuture> result; - BundleDetails(BundleId bundleId, ListenableFuture> result) { + BundleDetails(final BundleId bundleId, final ListenableFuture> result) { this.bundleId = bundleId; this.result = result; } @@ -387,9 +406,8 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS } } - private BigInteger getDpnIdFromNodeName(String nodeName) { + private static Uint64 getDpnIdFromNodeName(final String nodeName) { String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); - return new BigInteger(dpnId); + return Uint64.valueOf(dpnId); } - }