OPNFLWPLUG-1005 : Implementation of arbitrator reconciliation 84/70184/47
authorgobinath <gobinath@ericsson.com>
Thu, 29 Mar 2018 10:53:13 +0000 (16:23 +0530)
committerArunprakash D <d.arunprakash@ericsson.com>
Mon, 6 Aug 2018 05:10:30 +0000 (05:10 +0000)
This change adds a new module "arbitratorreconciliation" within the frm. This
module contains the RPCs for completing the arbitrator reconciliation(by committing the bundles) and the RPC to fetch the bundle opened for a node have been added and the implementation of these RPCs. These RPCs are exposed which in tun can be used by appliations.

Change-Id: Ib71b9ed7a846ccd4a4592684cfd74fbd6106c986
Signed-off-by: gobinath <gobinath@ericsson.com>
13 files changed:
applications/arbitratorreconciliation/api/pom.xml [new file with mode: 0644]
applications/arbitratorreconciliation/api/src/main/yang/arbitrator-reconcile.yang [new file with mode: 0644]
applications/arbitratorreconciliation/impl/pom.xml [new file with mode: 0644]
applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java [new file with mode: 0644]
applications/arbitratorreconciliation/impl/src/main/resources/org/opendaylight/blueprint/arbitratorreconciliation-manager.xml [new file with mode: 0644]
applications/arbitratorreconciliation/pom.xml [new file with mode: 0644]
applications/pom.xml
artifacts/pom.xml
features-aggregator/features-openflowplugin/pom.xml
features-aggregator/odl-openflowplugin-app-arbitratorreconciliation/pom.xml [new file with mode: 0644]
features-aggregator/odl-openflowplugin-flow-services/pom.xml
features-aggregator/odl-openflowplugin-southbound/pom.xml
features-aggregator/pom.xml

diff --git a/applications/arbitratorreconciliation/api/pom.xml b/applications/arbitratorreconciliation/api/pom.xml
new file mode 100644 (file)
index 0000000..8d68cf8
--- /dev/null
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>applications</artifactId>
+        <groupId>org.opendaylight.openflowplugin</groupId>
+        <version>0.7.0-SNAPSHOT</version>
+        <relativePath>../..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.opendaylight.openflowplugin.applications</groupId>
+    <artifactId>arbitratorreconciliation-api</artifactId>
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin</groupId>
+            <artifactId>openflowplugin-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin</groupId>
+            <artifactId>openflowplugin-extension-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin.applications</groupId>
+            <artifactId>reconciliation-framework</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- TODO Remove this when we upgrade to odlparent with a fix for ODLPARENT-146 -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <configuration>
+                    <failOnError>false</failOnError>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/applications/arbitratorreconciliation/api/src/main/yang/arbitrator-reconcile.yang b/applications/arbitratorreconciliation/api/src/main/yang/arbitrator-reconcile.yang
new file mode 100644 (file)
index 0000000..0f43f95
--- /dev/null
@@ -0,0 +1,50 @@
+module arbitrator-reconcile {
+    namespace "urn:opendaylight:params:xml:ns:yang:openflowplugin:app:arbitrator-reconcile:service";
+    prefix arbitrator-reconcile;
+
+    import opendaylight-inventory {prefix inv; revision-date "2013-08-19";}
+    import onf-extensions {prefix onf-ext; revision-date "2017-01-24";}
+
+    description
+        "RPCs to perform arbitrator based reconciliation";
+
+    revision "2018-02-27" {
+        description "Initial proposal";
+    }
+
+    rpc get-active-bundle {
+        description "Fetches the active available bundle in openflowplugin";
+        input {
+            uses "inv:node-context-ref";
+            leaf node-id {
+                description "Node for which the bundle active has to be fetched";
+                type uint64;
+            }
+        }
+
+        output {
+            leaf result {
+                description "The retrieved active bundle for the node";
+                type "onf-ext:bundle-id";
+            }
+        }
+    }
+
+    rpc commit-active-bundle {
+        description "Commits the active available bundle in openflowplugin";
+        input {
+            uses "inv:node-context-ref";
+            leaf node-id {
+                description "Node for which the commit bundle to be executed";
+                type uint64;
+            }
+        }
+
+        output {
+            leaf result {
+                description "Success/Failure of the commit bundle for the node";
+                type boolean;
+            }
+        }
+    }
+}
diff --git a/applications/arbitratorreconciliation/impl/pom.xml b/applications/arbitratorreconciliation/impl/pom.xml
new file mode 100644 (file)
index 0000000..4be5d5a
--- /dev/null
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>applications</artifactId>
+        <groupId>org.opendaylight.openflowplugin</groupId>
+        <version>0.7.0-SNAPSHOT</version>
+        <relativePath>../..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.opendaylight.openflowplugin.applications</groupId>
+    <artifactId>arbitratorreconciliation-impl</artifactId>
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin.applications</groupId>
+            <artifactId>arbitratorreconciliation-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.serviceutils</groupId>
+            <artifactId>upgrade</artifactId>
+            <version>${serviceutils.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- TODO Remove this when we upgrade to odlparent with a fix for ODLPARENT-146 -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <configuration>
+                    <failOnError>false</failOnError>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java b/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java
new file mode 100644 (file)
index 0000000..112082b
--- /dev/null
@@ -0,0 +1,387 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
+import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
+import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
+import org.opendaylight.serviceutils.upgrade.UpgradeState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
+        ReconciliationNotificationListener, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
+    private static final int THREAD_POOL_SIZE = 4;
+    private static final AtomicLong BUNDLE_ID = new AtomicLong();
+    private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+    private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
+            .getInteger("arbitrator.reconciliation.manager.priority", 1/*default*/);
+    private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
+    private static final String SEPARATOR = ":";
+
+    private final SalBundleService salBundleService;
+    private final ReconciliationManager reconciliationManager;
+    private final RoutedRpcRegistration routedRpcReg;
+    private final UpgradeState upgradeState;
+    private NotificationRegistration registration;
+    private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+    private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
+
+    public ArbitratorReconciliationManagerImpl(final RpcProviderRegistry rpcRegistry,
+            final ReconciliationManager reconciliationManager, final UpgradeState upgradeState) {
+        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
+        this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
+                "ReconciliationManager cannot be null!");
+        this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
+                "RPC SalBundlService not found.");
+        this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
+                this);
+        this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
+    }
+
+    public void start() {
+        registration = reconciliationManager.registerService(this);
+        LOG.info("ArbitratorReconciliationManager has started successfully.");
+    }
+
+    @Override
+    public void close() throws Exception {
+        executor.shutdown();
+        registration.close();
+
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
+            CommitActiveBundleInput input) {
+        BigInteger nodeId = input.getNodeId();
+        if (bundleIdMap.containsKey(nodeId)) {
+            BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
+            if (bundleId != null) {
+                final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
+                        .setNode(input.getNode()).setBundleId(bundleId)
+                        .setFlags(BUNDLE_FLAGS)
+                        .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
+                ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
+                        .controlBundle(commitBundleInput);
+                bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
+                Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
+                        MoreExecutors.directExecutor());
+                return Futures.transform(
+                        rpcResult,
+                        this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
+                        MoreExecutors.directExecutor());
+            }
+        }
+        return RpcResultBuilder.success((new CommitActiveBundleOutputBuilder()
+                .setResult(null).build()))
+                .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
+                null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
+        BigInteger nodeId = input.getNodeId();
+        BundleDetails bundleDetails = bundleIdMap.get(nodeId);
+        if (bundleDetails != null) {
+            try {
+                //This blocking call is used to prevent the applications from pushing flows and groups via the default
+                // pipeline when the commit bundle is ongoing.
+                bundleDetails.getResult().get();
+                return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
+                        .setResult(bundleDetails.getBundleId()).build())).buildFuture();
+            } catch (InterruptedException | ExecutionException | NullPointerException e) {
+                return RpcResultBuilder.<GetActiveBundleOutput>failed()
+                        .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
+                                null, e.getMessage()))).buildFuture();
+            }
+        }
+        return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
+                .setResult(null).build())).buildFuture();
+    }
+
+    @Override
+    public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
+        registerRpc(node);
+        if (upgradeState.isUpgradeInProgress()) {
+            LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
+            return reconcileConfiguration(node);
+        }
+        LOG.trace("arbitrator reconciliation is disabled");
+        return Futures.immediateFuture(true);
+    }
+
+    @Override
+    public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
+        LOG.trace("Stopping arbitrator reconciliation for node {}", node.getDatapathId());
+        InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class);
+        bundleIdMap.remove(connectedNode);
+        deregisterRpc(node);
+        return Futures.immediateFuture(true);
+    }
+
+    @Override
+    public int getPriority() {
+        return ARBITRATOR_RECONCILIATION_PRIORITY;
+    }
+
+    @Override
+    public String getName() {
+        return SERVICE_NAME;
+    }
+
+    @Override
+    public ResultState getResultState() {
+        return ResultState.DONOTHING;
+    }
+
+    private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
+        LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
+        ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
+        return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
+    }
+
+    private Messages createMessages(final NodeRef nodeRef) {
+        final List<Message> messages = new ArrayList<>();
+        messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
+                new BundleRemoveFlowCaseBuilder()
+                .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
+                .build()).build());
+
+        messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
+                new BundleRemoveGroupCaseBuilder()
+                .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
+                .build()).build());
+        LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
+        return new MessagesBuilder().setMessage(messages).build();
+    }
+
+    private Flow getDeleteAllFlow() {
+        final FlowBuilder flowBuilder = new FlowBuilder();
+        flowBuilder.setTableId(OFConstants.OFPTT_ALL);
+        return flowBuilder.build();
+    }
+
+    private Group getDeleteAllGroup() {
+        final GroupBuilder groupBuilder = new GroupBuilder();
+        groupBuilder.setGroupType(GroupTypes.GroupAll);
+        groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
+        return groupBuilder.build();
+    }
+
+    private class ArbitratorReconciliationTask implements Callable<Boolean> {
+        final DeviceInfo deviceInfo;
+
+        ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
+            this.deviceInfo = deviceInfo;
+        }
+
+        @Override
+        public Boolean call() {
+            InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
+                    .augmentation(FlowCapableNode.class);
+            String node = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
+            BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
+            LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
+            final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
+
+            final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                    .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                    .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
+
+            final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                    .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                    .setType(BundleControlType.ONFBCTOPENREQUEST).build();
+
+            final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                    .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                    .setMessages(createMessages(nodeRef)).build();
+
+            ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
+                    .controlBundle(closeBundleInput);
+
+            ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
+                    .transformAsync(closeBundle, rpcResult -> salBundleService
+                            .controlBundle(openBundleInput), MoreExecutors.directExecutor());
+
+            ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
+                    .transformAsync(openBundleMessagesFuture, rpcResult -> {
+                        if (rpcResult.isSuccessful()) {
+                            return salBundleService
+                                    .addBundleMessages(addBundleMessagesInput);
+                        }
+                        return Futures.immediateFuture(null);
+                    }, MoreExecutors.directExecutor());
+            BigInteger nodeId = getDpnIdFromNodeName(node);
+            try {
+                if (addBundleMessagesFuture.get().isSuccessful()) {
+                    bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
+                            Futures.immediateFuture(null)));
+                    LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
+                            + " for application programming.", nodeId);
+                    return true;
+                } else {
+                    LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
+                    return false;
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
+                return false;
+            }
+        }
+    }
+
+    public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
+        private final BigInteger nodeId;
+
+        private CommitActiveBundleCallback(final BigInteger nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public void onSuccess(RpcResult<?> rpcResult) {
+            LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
+            bundleIdMap.remove(nodeId);
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.error("Error while performing arbitrator reconciliation for device {}",
+                    nodeId, throwable);
+        }
+    }
+
+    private <D> Function<RpcResult<D>,
+            RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
+        return input -> {
+            final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
+            if (input != null) {
+                List<RpcError> errors = new ArrayList<>();
+                if (!input.isSuccessful()) {
+                    errors.addAll(input.getErrors());
+                    resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
+                } else {
+                    resultSink = RpcResultBuilder.success();
+                }
+            } else {
+                resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
+                        .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
+            }
+            return resultSink.build();
+        };
+    }
+
+    private void registerRpc(DeviceInfo node) {
+        KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
+                .child(Node.class, new NodeKey(node.getNodeId()));
+        LOG.debug("The path is registered : {}", path);
+        routedRpcReg.registerPath(NodeContext.class, path);
+    }
+
+    private void deregisterRpc(DeviceInfo node) {
+        KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
+                new NodeKey(node.getNodeId()));
+        LOG.debug("The path is unregistered : {}", path);
+        routedRpcReg.unregisterPath(NodeContext.class, path);
+    }
+
+    private static class BundleDetails {
+        private final BundleId bundleId;
+        private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
+
+        BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
+            this.bundleId = bundleId;
+            this.result = result;
+        }
+
+        public BundleId getBundleId() {
+            return bundleId;
+        }
+
+        public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
+            return result;
+        }
+    }
+
+    private BigInteger getDpnIdFromNodeName(String nodeName) {
+        String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
+        return new BigInteger(dpnId);
+    }
+
+}
diff --git a/applications/arbitratorreconciliation/impl/src/main/resources/org/opendaylight/blueprint/arbitratorreconciliation-manager.xml b/applications/arbitratorreconciliation/impl/src/main/resources/org/opendaylight/blueprint/arbitratorreconciliation-manager.xml
new file mode 100644 (file)
index 0000000..fa5ca37
--- /dev/null
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
+           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+           odl:use-default-for-reference-types="true">
+
+    <reference id="rpcRegistry" interface="org.opendaylight.controller.sal.binding.api.RpcProviderRegistry"/>
+    <reference id="reconciliationservice" interface="org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager"/>
+    <reference id="upgradeState" interface="org.opendaylight.serviceutils.upgrade.UpgradeState"/>
+
+    <bean id="arbitratorReconciliationManager"
+          class="org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl.ArbitratorReconciliationManagerImpl"
+        init-method="start" destroy-method="close">
+        <argument ref="rpcRegistry"/>
+        <argument ref="reconciliationservice"/>
+        <argument ref="upgradeState"/>
+    </bean>
+
+</blueprint>
diff --git a/applications/arbitratorreconciliation/pom.xml b/applications/arbitratorreconciliation/pom.xml
new file mode 100644 (file)
index 0000000..72dc048
--- /dev/null
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>applications</artifactId>
+        <groupId>org.opendaylight.openflowplugin</groupId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>arbitratorreconciliation-aggregator</artifactId>
+    <groupId>org.opendaylight.openflowplugin.applications</groupId>
+    <packaging>pom</packaging>
+    <name>ODL :: openflowplugin :: ${project.artifactId}</name>
+
+    <modules>
+        <module>api</module>
+        <module>impl</module>
+    </modules>
+
+    <!-- DO NOT install or deploy the repo root pom as it's only needed to initiate a build -->
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-install-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
index c91c41430c2c6a2d90563fde80646cd92b207622..0580c511abae6b30b9242ba87c50976937bedcca 100644 (file)
@@ -46,6 +46,7 @@
         <module>reconciliation-framework</module>
         <module>southbound-cli</module>
         <module>device-ownership-service</module>
+        <module>arbitratorreconciliation</module>
     </modules>
 
 </project>
index e757168a5af26d0ef8080ec3ff394afdcfa9b4bd..c6b1f449178e9c5661623fa2c54e217c6ecbf629 100644 (file)
                 <type>xml</type>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>odl-openflowplugin-app-arbitratorreconciliation</artifactId>
+                <classifier>features</classifier>
+                <type>xml</type>
+                <version>${project.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>
index ccb0c950f8eda3c4898d99ddbe7134ac39f1c495..a12f0da1b9eb7740437a533295f3c6cd8728be29 100644 (file)
             <type>xml</type>
             <classifier>features</classifier>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>odl-openflowplugin-app-arbitratorreconciliation</artifactId>
+            <version>${project.version}</version>
+            <type>xml</type>
+            <classifier>features</classifier>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/features-aggregator/odl-openflowplugin-app-arbitratorreconciliation/pom.xml b/features-aggregator/odl-openflowplugin-app-arbitratorreconciliation/pom.xml
new file mode 100644 (file)
index 0000000..8444cfd
--- /dev/null
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.odlparent</groupId>
+        <artifactId>single-feature-parent</artifactId>
+        <version>3.1.2</version>
+        <relativePath/>
+    </parent>
+
+    <groupId>org.opendaylight.openflowplugin</groupId>
+    <artifactId>odl-openflowplugin-app-arbitratorreconciliation</artifactId>
+    <packaging>feature</packaging>
+    <version>0.7.0-SNAPSHOT</version>
+
+    <name>OpenDaylight :: Openflow Plugin :: Application - Arbitrator Reconciliation</name>
+
+    <dependencies>
+        <!-- feature dependencies -->
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>odl-openflowplugin-southbound</artifactId>
+            <version>${project.version}</version>
+            <classifier>features</classifier>
+            <type>xml</type>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin.applications</groupId>
+            <artifactId>arbitratorreconciliation-impl</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
index bb090920a9c0dcd04f50d50c36165ba249bcff6d..5bc683ed69034fef41595cb3583cf47f3e0ee907 100644 (file)
             <classifier>features</classifier>
             <type>xml</type>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin</groupId>
+            <artifactId>odl-openflowplugin-app-arbitratorreconciliation</artifactId>
+            <version>${project.version}</version>
+            <classifier>features</classifier>
+            <type>xml</type>
+        </dependency>
     </dependencies>
 
 </project>
index c400f06475432c1cc1f8e30e43f315ad3f4d31b1..ae0350151a4a0cefcfffc8342c4a7b38491f119d 100644 (file)
             <type>xml</type>
             <classifier>features</classifier>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.serviceutils</groupId>
+            <artifactId>odl-serviceutils-tools</artifactId>
+            <version>${serviceutils.version}</version>
+            <type>xml</type>
+            <classifier>features</classifier>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.openflowplugin</groupId>
             <artifactId>odl-openflowplugin-nsf-model</artifactId>
index b2f04c90118f784d8f1b0033fca063a21c4ec0d0..2fee82fa767686b18a47065168fc590f48107c8d 100644 (file)
@@ -33,6 +33,7 @@
         <module>odl-openflowplugin-app-reconciliation-framework</module>
         <module>odl-openflowplugin-app-southbound-cli</module>
         <module>odl-openflowplugin-libraries</module>
+        <module>odl-openflowplugin-app-arbitratorreconciliation</module>
     </modules>
 
 </project>