/* * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration; import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager; import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener; import org.opendaylight.serviceutils.upgrade.UpgradeState; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService, ReconciliationNotificationListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class); private static final int THREAD_POOL_SIZE = 4; private static final AtomicLong BUNDLE_ID = new AtomicLong(); private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true); private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer .getInteger("arbitrator.reconciliation.manager.priority", 1/*default*/); private static final String SERVICE_NAME = "ArbitratorReconciliationManager"; private static final String SEPARATOR = ":"; private final SalBundleService salBundleService; private final ReconciliationManager reconciliationManager; private final RoutedRpcRegistration routedRpcReg; private final UpgradeState upgradeState; private NotificationRegistration registration; private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); private final Map bundleIdMap = new ConcurrentHashMap<>(); public ArbitratorReconciliationManagerImpl(final RpcProviderRegistry rpcRegistry, final ReconciliationManager reconciliationManager, final UpgradeState upgradeState) { Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !"); this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager, "ReconciliationManager cannot be null!"); this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class), "RPC SalBundlService not found."); this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class, this); this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!"); } public void start() { registration = reconciliationManager.registerService(this); LOG.info("ArbitratorReconciliationManager has started successfully."); } @Override public void close() throws Exception { executor.shutdown(); registration.close(); } @Override public ListenableFuture> commitActiveBundle( CommitActiveBundleInput input) { BigInteger nodeId = input.getNodeId(); if (bundleIdMap.containsKey(nodeId)) { BundleId bundleId = bundleIdMap.get(nodeId).getBundleId(); if (bundleId != null) { final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder() .setNode(input.getNode()).setBundleId(bundleId) .setFlags(BUNDLE_FLAGS) .setType(BundleControlType.ONFBCTCOMMITREQUEST).build(); ListenableFuture> rpcResult = salBundleService .controlBundle(commitBundleInput); bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult)); Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), MoreExecutors.directExecutor()); return Futures.transform( rpcResult, this.createRpcResultCondenser("committed active bundle"), MoreExecutors.directExecutor()); } } return RpcResultBuilder.success((new CommitActiveBundleOutputBuilder() .setResult(null).build())) .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, "No active bundle found for the node" + nodeId.toString()))).buildFuture(); } @Override public ListenableFuture> getActiveBundle(GetActiveBundleInput input) { BigInteger nodeId = input.getNodeId(); BundleDetails bundleDetails = bundleIdMap.get(nodeId); if (bundleDetails != null) { try { //This blocking call is used to prevent the applications from pushing flows and groups via the default // pipeline when the commit bundle is ongoing. bundleDetails.getResult().get(); return RpcResultBuilder.success((new GetActiveBundleOutputBuilder() .setResult(bundleDetails.getBundleId()).build())).buildFuture(); } catch (InterruptedException | ExecutionException | NullPointerException e) { return RpcResultBuilder.failed() .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, e.getMessage()))).buildFuture(); } } return RpcResultBuilder.success((new GetActiveBundleOutputBuilder() .setResult(null).build())).buildFuture(); } @Override public ListenableFuture startReconciliation(DeviceInfo node) { registerRpc(node); if (upgradeState.isUpgradeInProgress()) { LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId()); return reconcileConfiguration(node); } LOG.trace("arbitrator reconciliation is disabled"); return Futures.immediateFuture(true); } @Override public ListenableFuture endReconciliation(DeviceInfo node) { LOG.trace("Stopping arbitrator reconciliation for node {}", node.getDatapathId()); InstanceIdentifier connectedNode = node.getNodeInstanceIdentifier() .augmentation(FlowCapableNode.class); bundleIdMap.remove(connectedNode); deregisterRpc(node); return Futures.immediateFuture(true); } @Override public int getPriority() { return ARBITRATOR_RECONCILIATION_PRIORITY; } @Override public String getName() { return SERVICE_NAME; } @Override public ResultState getResultState() { return ResultState.DONOTHING; } private ListenableFuture reconcileConfiguration(DeviceInfo node) { LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId()); ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node); return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask)); } private Messages createMessages(final NodeRef nodeRef) { final List messages = new ArrayList<>(); messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage( new BundleRemoveFlowCaseBuilder() .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build()) .build()).build()); messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage( new BundleRemoveGroupCaseBuilder() .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()) .build()).build()); LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size()); return new MessagesBuilder().setMessage(messages).build(); } private Flow getDeleteAllFlow() { final FlowBuilder flowBuilder = new FlowBuilder(); flowBuilder.setTableId(OFConstants.OFPTT_ALL); return flowBuilder.build(); } private Group getDeleteAllGroup() { final GroupBuilder groupBuilder = new GroupBuilder(); groupBuilder.setGroupType(GroupTypes.GroupAll); groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL)); return groupBuilder.build(); } private class ArbitratorReconciliationTask implements Callable { final DeviceInfo deviceInfo; ArbitratorReconciliationTask(final DeviceInfo deviceInfo) { this.deviceInfo = deviceInfo; } @Override public Boolean call() { InstanceIdentifier nodeIdentity = deviceInfo.getNodeInstanceIdentifier() .augmentation(FlowCapableNode.class); String node = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue(); BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement()); LOG.debug("Triggering arbitrator reconciliation for device :{}", node); final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class)); final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setType(BundleControlType.ONFBCTCLOSEREQUEST).build(); final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setType(BundleControlType.ONFBCTOPENREQUEST).build(); final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder() .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) .setMessages(createMessages(nodeRef)).build(); ListenableFuture> closeBundle = salBundleService .controlBundle(closeBundleInput); ListenableFuture> openBundleMessagesFuture = Futures .transformAsync(closeBundle, rpcResult -> salBundleService .controlBundle(openBundleInput), MoreExecutors.directExecutor()); ListenableFuture> addBundleMessagesFuture = Futures .transformAsync(openBundleMessagesFuture, rpcResult -> { if (rpcResult.isSuccessful()) { return salBundleService .addBundleMessages(addBundleMessagesInput); } return Futures.immediateFuture(null); }, MoreExecutors.directExecutor()); BigInteger nodeId = getDpnIdFromNodeName(node); try { if (addBundleMessagesFuture.get().isSuccessful()) { bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue, Futures.immediateFuture(null))); LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up" + " for application programming.", nodeId); return true; } else { LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId); return false; } } catch (InterruptedException | ExecutionException e) { LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e); return false; } } } public final class CommitActiveBundleCallback implements FutureCallback> { private final BigInteger nodeId; private CommitActiveBundleCallback(final BigInteger nodeId) { this.nodeId = nodeId; } @Override public void onSuccess(RpcResult rpcResult) { LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId); bundleIdMap.remove(nodeId); } @Override public void onFailure(Throwable throwable) { LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable); } } private Function, RpcResult> createRpcResultCondenser(final String action) { return input -> { final RpcResultBuilder resultSink; if (input != null) { List errors = new ArrayList<>(); if (!input.isSuccessful()) { errors.addAll(input.getErrors()); resultSink = RpcResultBuilder.failed().withRpcErrors(errors); } else { resultSink = RpcResultBuilder.success(); } } else { resultSink = RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed"); } return resultSink.build(); }; } private void registerRpc(DeviceInfo node) { KeyedInstanceIdentifier path = InstanceIdentifier.create(Nodes.class) .child(Node.class, new NodeKey(node.getNodeId())); LOG.debug("The path is registered : {}", path); routedRpcReg.registerPath(NodeContext.class, path); } private void deregisterRpc(DeviceInfo node) { KeyedInstanceIdentifier path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId())); LOG.debug("The path is unregistered : {}", path); routedRpcReg.unregisterPath(NodeContext.class, path); } private static class BundleDetails { private final BundleId bundleId; private final ListenableFuture> result; BundleDetails(BundleId bundleId, ListenableFuture> result) { this.bundleId = bundleId; this.result = result; } public BundleId getBundleId() { return bundleId; } public ListenableFuture> getResult() { return result; } } private BigInteger getDpnIdFromNodeName(String nodeName) { String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); return new BigInteger(dpnId); } }