OPNFLWPLUG-1005 : Implementation of arbitrator reconciliation(frm changes) 90/71090/28
authorgobinath <gobinath@ericsson.com>
Wed, 18 Apr 2018 20:16:49 +0000 (01:46 +0530)
committergobinath <gobinath@ericsson.com>
Tue, 7 Aug 2018 08:53:01 +0000 (14:23 +0530)
This patch adds new forwarder types BundleFlowForwarder and
BundleGroupForwarder which would would be used to push the flows during
the upgrade process.

Change-Id: I7681019236aa5674c6242e39b967fda9757913b1
Signed-off-by: gobinath <gobinath@ericsson.com>
13 files changed:
applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java
applications/forwardingrules-manager/pom.xml
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesManager.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/AbstractListeningCommiter.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java [new file with mode: 0644]
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java [new file with mode: 0644]
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/ForwardingRulesManagerImpl.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/util/FrmUtil.java [new file with mode: 0644]
applications/forwardingrules-manager/src/test/java/test/mock/util/ArbitratorReconcileServiceMock.java [new file with mode: 0644]
applications/forwardingrules-manager/src/test/java/test/mock/util/RpcProviderRegistryMock.java
features-aggregator/odl-openflowplugin-app-forwardingrules-manager/pom.xml

index 112082be102c45fa4a936c7998a8668a85c33d4c..b9087807217e8ef08f9cb36278464fada3cc25fd 100644 (file)
@@ -90,7 +90,7 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
     private static final AtomicLong BUNDLE_ID = new AtomicLong();
     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
-            .getInteger("arbitrator.reconciliation.manager.priority", 1/*default*/);
+            .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
     private static final String SEPARATOR = ":";
 
index 5759cf53bdcee0bf679458d8a185abb306dc1e84..0ccebba11132b6a19ac8b877bdb2592779b6ce8d 100644 (file)
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+    <groupId>org.opendaylight.openflowplugin.applications</groupId>
+      <artifactId>arbitratorreconciliation-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <build>
index 0d57c8e2a9c01d3c85964da341dcc40844b3fcbd..dd0c8f4ca2d07d522b71230d6c6b5407feabfd52 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.Sal
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -193,4 +194,10 @@ public interface ForwardingRulesManager extends ConfigurationListener, AutoClose
      *
      */
     void addRecoverableListener(RecoverableListener recoverableListener);
+
+    /**
+     * Method exposes the ArbitratorReconciliationManager service used for performing Arbitrator Based Reconciliation.
+     * @return ArbitratorReconciliationManager
+     */
+    ArbitratorReconcileService getArbitratorReconciliationManager();
 }
index 0f1ab4d0fd2e33bb690b18df88379d3cbe659780..5d137465aefea8d2318df4820859e40015710f0e 100644 (file)
@@ -17,12 +17,6 @@ import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
 import org.opendaylight.serviceutils.srm.RecoverableListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
@@ -36,9 +30,10 @@ public abstract class AbstractListeningCommiter<T extends DataObject> implements
         RecoverableListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractListeningCommiter.class);
-    protected final DataBroker dataBroker;
-    ForwardingRulesManager provider;
+
+    final ForwardingRulesManager provider;
     NodeConfigurator nodeConfigurator;
+    protected final DataBroker dataBroker;
 
     public AbstractListeningCommiter(final ForwardingRulesManager provider, final DataBroker dataBroker) {
         this.provider = Preconditions.checkNotNull(provider, "ForwardingRulesManager can not be null!");
@@ -119,14 +114,5 @@ public abstract class AbstractListeningCommiter<T extends DataObject> implements
         return provider.isNodeOwner(nodeIdent)
                 && (provider.isNodeActive(nodeIdent) || provider.checkNodeInOperationalDataStore(nodeIdent));
     }
-
-    NodeId getNodeIdFromNodeIdentifier(InstanceIdentifier<FlowCapableNode> nodeIdent) {
-        return nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId();
-    }
-
-    String getFlowId(FlowRef flowRef) {
-        return flowRef.getValue().firstKeyOf(Flow .class, FlowKey .class).getId().getValue();
-    }
-
 }
 
diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleFlowForwarder.java
new file mode 100644 (file)
index 0000000..a3cca7e
--- /dev/null
@@ -0,0 +1,236 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm.impl;
+
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
+import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
+import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.BundleInnerMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleUpdateFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.update.flow._case.UpdateFlowCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BundleFlowForwarder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
+    private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+    private final ForwardingRulesManager forwardingRulesManager;
+    private final NodeConfigurator nodeConfigurator;
+
+    public BundleFlowForwarder(ForwardingRulesManager forwardingRulesManager) {
+        this.forwardingRulesManager = Preconditions.checkNotNull(forwardingRulesManager,
+                "ForwardingRulesManager can not be null!");
+        this.nodeConfigurator = Preconditions.checkNotNull(forwardingRulesManager.getNodeConfigurator(),
+                "NodeConfigurator can not be null!");
+    }
+
+    public void remove(final InstanceIdentifier<Flow> identifier, final Flow flow,
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+        final List<Message> messages = new ArrayList<>(1);
+        String node = nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
+        BundleInnerMessage bundleInnerMessage = new BundleRemoveFlowCaseBuilder()
+                .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(flow).build()).build();
+        Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                .setBundleInnerMessage(bundleInnerMessage).build();
+        messages.add(message);
+        AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
+                .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
+        final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
+                .getSalBundleService().addBundleMessages(addBundleMessagesInput);
+        LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
+                bundleId.getValue(), node);
+        JdkFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
+    }
+
+    public void update(final InstanceIdentifier<Flow> identifier, final Flow originalFlow, final Flow updatedFlow,
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+            BundleInnerMessage bundleInnerMessage = new BundleUpdateFlowCaseBuilder()
+                    .setUpdateFlowCaseData(new UpdateFlowCaseDataBuilder(updatedFlow).build()).build();
+            Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                    .setBundleInnerMessage(bundleInnerMessage).build();
+            ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent,
+                    updatedFlow, identifier, bundleId);
+            ListenableFuture<RpcResult<AddFlowOutput>> flowFuture = SettableFuture.create();
+            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, flowFuture));
+            return flowFuture;
+        });
+    }
+
+    public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow flow,
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+            BundleInnerMessage bundleInnerMessage = new BundleAddFlowCaseBuilder()
+                    .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build();
+            Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                    .setBundleInnerMessage(bundleInnerMessage).build();
+            ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
+                    identifier, bundleId);
+            ListenableFuture<RpcResult<AddFlowOutput>> flowFuture = SettableFuture.create();
+            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, flowFuture),
+                    MoreExecutors.directExecutor());
+            return flowFuture;
+        });
+    }
+
+    private ListenableFuture<RpcResult<AddBundleMessagesOutput>> pushDependentGroup(
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, Flow updatedFlow, InstanceIdentifier<Flow> identifier,
+            BundleId bundleId) {
+        //TODO This read to the DS might have a performance impact.
+        //if the dependent group is not installed than we should just cache the parent group,
+        //till we receive the dependent group DTCN and then push it.
+        Long groupId = isFlowDependentOnGroup(updatedFlow);
+        ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
+        if (groupId != null) {
+            LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
+                    getFlowId(new FlowRef(identifier)), groupId);
+            if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
+                LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
+                        getFlowId(new FlowRef(identifier)));
+                resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
+            } else {
+                LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
+                InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
+                LOG.info("Reading the group from config inventory: {}", groupId);
+                try {
+                    Optional<Group> group = forwardingRulesManager.getReadTranaction()
+                            .read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
+                    if (group.isPresent()) {
+                        final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
+                        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
+                        builder.setGroupRef(new GroupRef(nodeIdent));
+                        builder.setTransactionUri(new Uri(forwardingRulesManager.getNewTransactionId()));
+                        BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
+                                .setAddGroupCaseData(new AddGroupCaseDataBuilder(group.get()).build()).build();
+                        Message groupMessage = new MessageBuilder().setNode(
+                                new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                                .setBundleInnerMessage(bundleInnerMessage).build();
+                        final List<Message> messages = new ArrayList<>(1);
+                        messages.add(groupMessage);
+                        AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
+                                .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build())
+                                .build();
+                        LOG.trace("Pushing flow update message {} to bundle {} for device {}", addBundleMessagesInput,
+                                bundleId.getValue(), getNodeIdFromNodeIdentifier(nodeIdent));
+                        resultFuture = forwardingRulesManager
+                                .getSalBundleService().addBundleMessages(addBundleMessagesInput);
+                        Futures.transformAsync(resultFuture, rpcResult -> {
+                            if (rpcResult.isSuccessful()) {
+                                forwardingRulesManager.getDevicesGroupRegistry()
+                                        .storeGroup(getNodeIdFromNodeIdentifier(nodeIdent), groupId);
+                                LOG.trace("Group {} stored in cache", groupId);
+                            }
+                            return Futures.immediateFuture(null);
+                        }, MoreExecutors.directExecutor());
+                    } else {
+                        LOG.debug("Group {} not present in the config inventory", groupId);
+                        resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success()
+                                .build());
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
+                    resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
+                }
+            }
+        } else {
+            resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
+        }
+        return resultFuture;
+    }
+
+    private final class BundleFlowCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
+        private final InstanceIdentifier<FlowCapableNode> nodeIdent;
+        private final BundleId bundleId;
+        private final Message message;
+        private final NodeId nodeId;
+        private ListenableFuture<RpcResult<AddBundleMessagesOutput>> flowFuture;
+
+        BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, Message message,
+                ListenableFuture<RpcResult<AddFlowOutput>> flowFuture) {
+            this.nodeIdent = nodeIdent;
+            this.bundleId = bundleId;
+            this.message = message;
+            nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        }
+
+        @Override
+        public void onSuccess(RpcResult<AddBundleMessagesOutput> rpcResult) {
+            if (rpcResult.isSuccessful()) {
+                List<Message> messages = new ArrayList<>(1);
+                messages.add(message);
+                AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                        .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
+                        .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
+                LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
+                        bundleId.getValue(), nodeId.getValue());
+                flowFuture = forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
+            } else {
+                flowFuture = Futures.immediateFuture(null);
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            flowFuture = Futures.immediateFailedFuture(null);
+            LOG.error("Error while pushing flow add bundle {} for device {}", message, nodeId);
+        }
+    }
+}
diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/BundleGroupForwarder.java
new file mode 100644 (file)
index 0000000..bebbdf7
--- /dev/null
@@ -0,0 +1,216 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm.impl;
+
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
+import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
+import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.BundleInnerMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleUpdateGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.update.group._case.UpdateGroupCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BundleGroupForwarder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BundleGroupForwarder.class);
+    private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+    private final ForwardingRulesManager forwardingRulesManager;
+    private final NodeConfigurator nodeConfigurator;
+
+    public BundleGroupForwarder(final ForwardingRulesManager forwardingRulesManager) {
+        this.forwardingRulesManager = Preconditions.checkNotNull(forwardingRulesManager,
+                "ForwardingRulesManager can not be null!");
+        this.nodeConfigurator = Preconditions.checkNotNull(forwardingRulesManager.getNodeConfigurator(),
+                "NodeConfigurator can not be null!");
+    }
+
+    public void remove(final InstanceIdentifier<Group> identifier, final Group group,
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+            final List<Message> messages = new ArrayList<>(1);
+            BundleInnerMessage bundleInnerMessage = new BundleRemoveGroupCaseBuilder()
+                    .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(group).build()).build();
+            Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                    .setBundleInnerMessage(bundleInnerMessage).build();
+            messages.add(message);
+            AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                    .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
+                    .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
+            LOG.trace("Pushing group remove message {} to bundle {} for device {}", addBundleMessagesInput,
+                    bundleId.getValue(), nodeId.getValue());
+            final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
+                    .getSalBundleService().addBundleMessages(addBundleMessagesInput);
+            Futures.addCallback(resultFuture, new BundleRemoveGroupCallBack(group.getGroupId().getValue(), nodeId));
+            JdkFutures.addErrorLogging(resultFuture, LOG, "removeBundleGroup");
+            return resultFuture;
+        });
+
+    }
+
+    public void update(final InstanceIdentifier<Group> identifier, final Group originalGroup, final Group updatedGroup,
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+            final List<Message> messages = new ArrayList<>(1);
+            BundleInnerMessage bundleInnerMessage = new BundleUpdateGroupCaseBuilder()
+                    .setUpdateGroupCaseData(new UpdateGroupCaseDataBuilder(updatedGroup).build()).build();
+            Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                    .setBundleInnerMessage(bundleInnerMessage).build();
+            messages.add(message);
+            AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                    .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
+                    .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
+            LOG.trace("Pushing group update message {} to bundle {} for device {}", addBundleMessagesInput,
+                    bundleId.getValue(), nodeId.getValue());
+            final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
+                    .getSalBundleService().addBundleMessages(addBundleMessagesInput);
+            Futures.addCallback(resultFuture, new BundleUpdateGroupCallBack(originalGroup.getGroupId().getValue(),
+                    nodeId));
+            JdkFutures.addErrorLogging(resultFuture, LOG, "updateBundleGroup");
+            return resultFuture;
+        });
+    }
+
+    public ListenableFuture<? extends RpcResult<?>> add(final InstanceIdentifier<Group> identifier, final Group group,
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        final Long groupId = group.getGroupId().getValue();
+        return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+            if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
+                LOG.debug("Group {} already exists in the device. Ignoring the add DTCN", groupId);
+                return Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
+            }
+            final List<Message> messages = new ArrayList<>(1);
+            BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
+                    .setAddGroupCaseData(new AddGroupCaseDataBuilder(group).build()).build();
+            Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                    .setBundleInnerMessage(bundleInnerMessage).build();
+            messages.add(message);
+            AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                    .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
+                    .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
+            LOG.trace("Pushing group add message {} to bundle {} for device {}", addBundleMessagesInput,
+                    bundleId.getValue(), nodeId.getValue());
+            ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
+                    .getSalBundleService().addBundleMessages(addBundleMessagesInput);
+            Futures.addCallback(resultFuture, new BundleAddGroupCallBack(groupId, nodeId));
+            return resultFuture;
+        });
+    }
+
+    private final class BundleAddGroupCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
+        private final Long groupId;
+        private final NodeId nodeId;
+
+        private BundleAddGroupCallBack(final Long groupId, final NodeId nodeId) {
+            this.groupId = groupId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
+            if (result.isSuccessful()) {
+                forwardingRulesManager.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
+                LOG.debug("Group add with id {} finished without error for node {}", groupId, nodeId);
+            } else {
+                LOG.debug("Group add with id {} failed for node {} with error {}", groupId, nodeId,
+                        result.getErrors().toString());
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.error("Service call for adding group {} failed for node with error {}", groupId, nodeId, throwable);
+        }
+    }
+
+    private final class BundleUpdateGroupCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
+        private final Long groupId;
+        private final NodeId nodeId;
+
+        private BundleUpdateGroupCallBack(final Long groupId, final NodeId nodeId) {
+            this.groupId = groupId;
+            this.nodeId = nodeId;
+        }
+
+        public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
+            if (result.isSuccessful()) {
+                forwardingRulesManager.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
+                LOG.debug("Group update with id {} finished without error for node {}", groupId, nodeId);
+            } else {
+                LOG.debug("Group update with id {} failed for node {} with error {}", groupId, nodeId,
+                        result.getErrors().toString());
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.error("Service call for updating group {} failed for node {} with error {}", groupId, nodeId,
+                    throwable);
+        }
+    }
+
+    private final class BundleRemoveGroupCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
+        private final Long groupId;
+        private final NodeId nodeId;
+
+        private BundleRemoveGroupCallBack(final Long groupId, final NodeId nodeId) {
+            this.groupId = groupId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
+            if (result.isSuccessful()) {
+                LOG.debug("Group remove with id {} finished without error for node {}", groupId, nodeId);
+                forwardingRulesManager.getDevicesGroupRegistry().removeGroup(nodeId, groupId);
+            } else {
+                LOG.debug("Group remove with id {} failed for node {} with error {}", groupId, nodeId,
+                        result.getErrors().toString());
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.error("Service call for removing group {} failed for node with error {}", groupId, nodeId, throwable);
+        }
+    }
+
+}
index 23cee7a5226205ebc6c17f5432ccd5671d27c6e3..b9ac9d3d15c5c3fd165c052d267bba8c37171dee 100644 (file)
@@ -7,6 +7,13 @@
  */
 package org.opendaylight.openflowplugin.applications.frm.impl;
 
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
@@ -14,8 +21,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -23,11 +28,8 @@ import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
-import org.opendaylight.openflowplugin.applications.frm.ActionType;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
@@ -48,20 +50,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.Upda
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.ApplyActionsCase;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -84,9 +82,11 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
     private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS";
 
     private ListenerRegistration<FlowForwarder> listenerRegistration;
+    private final BundleFlowForwarder bundleFlowForwarder;
 
     public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db) {
         super(manager, db);
+        bundleFlowForwarder = new BundleFlowForwarder(manager);
     }
 
     @SuppressWarnings("IllegalCatch")
@@ -123,19 +123,24 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
 
         final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class);
         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
-            final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
-            builder.setFlowRef(new FlowRef(identifier));
-            builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
-            builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
-
-            // This method is called only when a given flow object has been
-            // removed from datastore. So FRM always needs to set strict flag
-            // into remove-flow input so that only a flow entry associated with
-            // a given flow object is removed.
-            builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
-            final Future<RpcResult<RemoveFlowOutput>> resultFuture =
-                    provider.getSalFlowService().removeFlow(builder.build());
-            JdkFutures.addErrorLogging(resultFuture, LOG, "removeFlow");
+            BundleId bundleId = getActiveBundle(nodeIdent, provider);
+            if (bundleId != null) {
+                bundleFlowForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId);
+            } else {
+                final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
+                builder.setFlowRef(new FlowRef(identifier));
+                builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
+                builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
+
+                // This method is called only when a given flow object has been
+                // removed from datastore. So FRM always needs to set strict flag
+                // into remove-flow input so that only a flow entry associated with
+                // a given flow object is removed.
+                builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
+                final Future<RpcResult<RemoveFlowOutput>> resultFuture =
+                        provider.getSalFlowService().removeFlow(builder.build());
+                JdkFutures.addErrorLogging(resultFuture, LOG, "removeFlow");
+            }
         }
     }
 
@@ -170,87 +175,97 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
 
         final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class);
         if (tableIdValidationPrecondition(tableKey, update)) {
-            final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
-            final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-
-            builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
-            builder.setFlowRef(new FlowRef(identifier));
-            builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-
-            // This method is called only when a given flow object in datastore
-            // has been updated. So FRM always needs to set strict flag into
-            // update-flow input so that only a flow entry associated with
-            // a given flow object is updated.
-            builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build());
-            builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
-
-            nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
-                Long groupId = isFlowDependentOnGroup(update);
-                ListenableFuture<RpcResult<UpdateFlowOutput>> future = Futures.immediateFuture(null);
-                if (groupId != null) {
-                    LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
-                            getFlowId(new FlowRef(identifier)), groupId);
-                    if (isGroupExistsOnDevice(nodeIdent, groupId)) {
-                        LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
+            BundleId bundleId = getActiveBundle(nodeIdent, provider);
+            if (bundleId != null) {
+                bundleFlowForwarder.update(identifier, original, update, nodeIdent, bundleId);
+            } else {
+                final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+                nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+                    final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
+                    builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
+                    builder.setFlowRef(new FlowRef(identifier));
+                    builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
+
+                    // This method is called only when a given flow object in datastore
+                    // has been updated. So FRM always needs to set strict flag into
+                    // update-flow input so that only a flow entry associated with
+                    // a given flow object is updated.
+                    builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build());
+                    builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
+
+                    Long groupId = isFlowDependentOnGroup(update);
+                    ListenableFuture<RpcResult<UpdateFlowOutput>> future = Futures.immediateFuture(null);
+                    if (groupId != null) {
+                        LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
+                                getFlowId(new FlowRef(identifier)), groupId);
+                        if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
+                            LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
+                                    getFlowId(new FlowRef(identifier)));
+                            future = provider.getSalFlowService().updateFlow(builder.build());
+                            JdkFutures.addErrorLogging(future, LOG, "updateFlow");
+                        } else {
+                            LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
+                            ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
+                                    groupId);
+                            Futures.addCallback(groupFuture,
+                                    new UpdateFlowCallBack(builder.build(), nodeId, future, groupId),
+                                    MoreExecutors.directExecutor());
+                        }
+                    } else {
+                        LOG.trace("The flow {} is not dependent on any group. Updating the flow",
                                 getFlowId(new FlowRef(identifier)));
                         future = provider.getSalFlowService().updateFlow(builder.build());
                         JdkFutures.addErrorLogging(future, LOG, "updateFlow");
-                    } else {
-                        LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
-                        ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
-                                groupId);
-                        Futures.addCallback(groupFuture,
-                                new UpdateFlowCallBack(builder.build(), nodeId, future, groupId),
-                                MoreExecutors.directExecutor());
                     }
-                } else {
-                    LOG.trace("The flow {} is not dependent on any group. Updating the flow",
-                            getFlowId(new FlowRef(identifier)));
-                    future = provider.getSalFlowService().updateFlow(builder.build());
-                    JdkFutures.addErrorLogging(future, LOG, "updateFlow");
-                }
-                return future;
-            });
+                    return future;
+                });
+            }
         }
     }
 
     @Override
-    public Future<RpcResult<AddFlowOutput>> add(final InstanceIdentifier<Flow> identifier, final Flow addDataObj,
+    public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow addDataObj,
             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
 
         final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class);
         if (tableIdValidationPrecondition(tableKey, addDataObj)) {
-            final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
-            final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-
-            builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
-            builder.setFlowRef(new FlowRef(identifier));
-            builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
-            builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-            nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
-                Long groupId = isFlowDependentOnGroup(addDataObj);
-                ListenableFuture<RpcResult<AddFlowOutput>> future = SettableFuture.create();
-                if (groupId != null) {
-                    LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
-                            getFlowId(new FlowRef(identifier)), groupId);
-                    if (isGroupExistsOnDevice(nodeIdent, groupId)) {
-                        LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId,
+            BundleId bundleId = getActiveBundle(nodeIdent, provider);
+            if (bundleId != null) {
+                return bundleFlowForwarder.add(identifier, addDataObj, nodeIdent, bundleId);
+            } else {
+                final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+                nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+                    final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
+
+                    builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
+                    builder.setFlowRef(new FlowRef(identifier));
+                    builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
+                    builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
+                    Long groupId = isFlowDependentOnGroup(addDataObj);
+                    ListenableFuture<RpcResult<AddFlowOutput>> future = SettableFuture.create();
+                    if (groupId != null) {
+                        LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
+                                getFlowId(new FlowRef(identifier)), groupId);
+                        if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
+                            LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId,
+                                    getFlowId(new FlowRef(identifier)));
+                            future = provider.getSalFlowService().addFlow(builder.build());
+                        } else {
+                            LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
+                            ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
+                                    groupId);
+                            Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, future,
+                                            groupId),
+                                    MoreExecutors.directExecutor());
+                        }
+                    } else {
+                        LOG.trace("The flow {} is not dependent on any group. Adding the flow",
                                 getFlowId(new FlowRef(identifier)));
                         future = provider.getSalFlowService().addFlow(builder.build());
-                    } else {
-                        LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
-                        ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
-                                groupId);
-                        Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, future, groupId),
-                                MoreExecutors.directExecutor());
                     }
-                } else {
-                    LOG.trace("The flow {} is not dependent on any group. Adding the flow",
-                            getFlowId(new FlowRef(identifier)));
-                    future = provider.getSalFlowService().addFlow(builder.build());
-                }
-                return future;
-            });
+                    return future;
+                });
+            }
         }
         return Futures.immediateFuture(null);
     }
@@ -352,41 +367,6 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         return resultFuture;
     }
 
-    private Long isFlowDependentOnGroup(final Flow flow) {
-        LOG.debug("Check if flow {} is dependent on group", flow);
-        if (flow.getInstructions() != null) {
-            List<Instruction> instructions = flow.getInstructions().getInstruction();
-            for (Instruction instruction : instructions) {
-                List<Action> actions = Collections.emptyList();
-                if (instruction.getInstruction().getImplementedInterface()
-                        .equals(ActionType.APPLY_ACTION.getActionType())) {
-                    actions = ((ApplyActionsCase) (instruction.getInstruction())).getApplyActions().getAction();
-                }
-                for (Action action : actions) {
-                    if (action.getAction().getImplementedInterface()
-                            .equals(ActionType.GROUP_ACTION.getActionType())) {
-                        return ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
-                    }
-                }
-            }
-        }
-        return null;
-    }
-
-    private boolean isGroupExistsOnDevice(final InstanceIdentifier<FlowCapableNode> nodeIdent, final Long groupId) {
-        NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-        return provider.getDevicesGroupRegistry().isGroupPresent(nodeId, groupId);
-    }
-
-    private InstanceIdentifier<Group> buildGroupInstanceIdentifier(final InstanceIdentifier<FlowCapableNode> nodeIdent,
-            final Long groupId) {
-        NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-        InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
-                .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class)
-                .child(Group.class, new GroupKey(new GroupId(groupId))).build();
-        return groupInstanceId;
-    }
-
     private final class AddFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
         private final AddFlowInput addFlowInput;
         private final NodeId nodeId;
@@ -419,6 +399,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
                 } else {
                     LOG.error("Flow add with id {} failed for node {} with error {}",
                             getFlowId(addFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
+                    future = Futures.immediateFuture(null);
                 }
             }
         }
@@ -427,6 +408,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         public void onFailure(Throwable throwable) {
             LOG.error("Service call for adding flow with id {} failed for node {} with error {}",
                     getFlowId(addFlowInput.getFlowRef()), nodeId, throwable.getCause());
+            Futures.immediateFailedFuture(null);
         }
     }
 
@@ -462,6 +444,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
                 } else {
                     LOG.error("Flow update with id {} failed for node {} with error {}",
                             getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
+                    future = Futures.immediateFuture(null);
                 }
             }
         }
@@ -470,6 +453,7 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
         public void onFailure(Throwable throwable) {
             LOG.error("Service call for updating flow with id {} failed for node {} with error {}",
                     getFlowId(updateFlowInput.getFlowRef()), nodeId, throwable);
+            future = Futures.immediateFailedFuture(null);
         }
     }
 }
index e3d27dade409c6734fa9aea50592200cbd94451a..a40e05e4adcd7991d5c4b169f9d4e08e9f970d56 100644 (file)
@@ -43,6 +43,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.forwardingrules.manager.config.rev160511.ForwardingRulesManagerConfig;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
@@ -65,7 +66,7 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
 
     static final int STARTUP_LOOP_TICK = 500;
     static final int STARTUP_LOOP_MAX_RETRIES = 8;
-    private static final int FRM_RECONCILIATION_PRIORITY = Integer.getInteger("frm.reconciliation.priority", 0);
+    private static final int FRM_RECONCILIATION_PRIORITY = Integer.getInteger("frm.reconciliation.priority", 1);
     private static final String SERVICE_NAME = "FRM";
 
     private final AtomicLong txNum = new AtomicLong();
@@ -90,7 +91,7 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
     private final ReconciliationManager reconciliationManager;
     private DevicesGroupRegistry devicesGroupRegistry;
     private NodeConfigurator nodeConfigurator;
-
+    private ArbitratorReconcileService arbitratorReconciliationManager;
     private boolean disableReconciliation;
     private boolean staleMarkingEnabled;
     private int reconciliationRetryCount;
@@ -135,6 +136,9 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
                 "Openflow service recovery handler cannot be null");
         this.serviceRecoveryRegistry = Preconditions.checkNotNull(serviceRecoveryRegistry,
                 "Service recovery registry cannot be null");
+        this.arbitratorReconciliationManager = Preconditions
+                .checkNotNull(rpcRegistry.getRpcService(ArbitratorReconcileService.class),
+                        "ArbitratorReconciliationManager can not be null!");
     }
 
     @Override
@@ -283,6 +287,11 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
         return tableListener;
     }
 
+    @Override
+    public ArbitratorReconcileService getArbitratorReconciliationManager() {
+        return arbitratorReconciliationManager;
+    }
+
     @Override
     public boolean isReconciliationDisabled() {
         return disableReconciliation;
index 6e91ad42eea19994a4987b67180d1da525a9be62..5a08be041baac7c520850dfd7c0d7b43e4deeb3f 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.openflowplugin.applications.frm.impl;
 
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -40,6 +43,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -57,9 +61,11 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
 
     private static final Logger LOG = LoggerFactory.getLogger(GroupForwarder.class);
     private ListenerRegistration<GroupForwarder> listenerRegistration;
+    private final BundleGroupForwarder bundleGroupForwarder;
 
     public GroupForwarder(final ForwardingRulesManager manager, final DataBroker db) {
         super(manager, db);
+        this.bundleGroupForwarder = new BundleGroupForwarder(manager);
     }
 
     @SuppressWarnings("IllegalCatch")
@@ -99,21 +105,25 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
     @Override
     public void remove(final InstanceIdentifier<Group> identifier, final Group removeDataObj,
             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
-
-        final Group group = removeDataObj;
-        final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group);
-        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-
-        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
-        builder.setGroupRef(new GroupRef(identifier));
-        builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-
-        final ListenableFuture<RpcResult<RemoveGroupOutput>> resultFuture =
-                this.provider.getSalGroupService().removeGroup(builder.build());
-        Futures.addCallback(resultFuture,
-                new RemoveGroupCallBack(removeDataObj.getGroupId().getValue(), nodeId),
-                MoreExecutors.directExecutor());
-        JdkFutures.addErrorLogging(resultFuture, LOG, "removeGroup");
+        BundleId bundleId = getActiveBundle(nodeIdent, provider);
+        if (bundleId != null) {
+            bundleGroupForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId);
+        } else {
+            final Group group = removeDataObj;
+            final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group);
+            final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+
+            builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
+            builder.setGroupRef(new GroupRef(identifier));
+            builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
+
+            final ListenableFuture<RpcResult<RemoveGroupOutput>> resultFuture =
+                    this.provider.getSalGroupService().removeGroup(builder.build());
+            Futures.addCallback(resultFuture,
+                    new RemoveGroupCallBack(removeDataObj.getGroupId().getValue(), nodeId),
+                    MoreExecutors.directExecutor());
+            JdkFutures.addErrorLogging(resultFuture, LOG, "removeGroup");
+        }
     }
 
     // TODO: Pull this into ForwardingRulesCommiter and override it here
@@ -133,49 +143,56 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
     @Override
     public void update(final InstanceIdentifier<Group> identifier, final Group original, final Group update,
             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
-
-        final Group originalGroup = original;
-        final Group updatedGroup = update;
-        final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder();
-        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
-        builder.setGroupRef(new GroupRef(identifier));
-        builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-        builder.setUpdatedGroup(new UpdatedGroupBuilder(updatedGroup).build());
-        builder.setOriginalGroup(new OriginalGroupBuilder(originalGroup).build());
-        nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
-            UpdateGroupInput updateGroupInput = builder.build();
-            final ListenableFuture<RpcResult<UpdateGroupOutput>> resultFuture;
-            resultFuture = this.provider.getSalGroupService().updateGroup(updateGroupInput);
-            JdkFutures.addErrorLogging(resultFuture, LOG, "updateGroup");
-            Futures.addCallback(resultFuture,
-                    new UpdateGroupCallBack(updateGroupInput.getOriginalGroup().getGroupId().getValue(), nodeId),
-                    MoreExecutors.directExecutor());
-            return resultFuture;
-        });
+        BundleId bundleId = getActiveBundle(nodeIdent, provider);
+        if (bundleId != null) {
+            bundleGroupForwarder.update(identifier, original, update, nodeIdent, bundleId);
+        } else {
+            final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+            nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+                final Group originalGroup = original;
+                final Group updatedGroup = update;
+                final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder();
+                builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
+                builder.setGroupRef(new GroupRef(identifier));
+                builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
+                builder.setUpdatedGroup(new UpdatedGroupBuilder(updatedGroup).build());
+                builder.setOriginalGroup(new OriginalGroupBuilder(originalGroup).build());
+                UpdateGroupInput updateGroupInput = builder.build();
+                final ListenableFuture<RpcResult<UpdateGroupOutput>> resultFuture;
+                resultFuture = this.provider.getSalGroupService().updateGroup(updateGroupInput);
+                JdkFutures.addErrorLogging(resultFuture, LOG, "updateGroup");
+                Futures.addCallback(resultFuture,
+                        new UpdateGroupCallBack(updateGroupInput.getOriginalGroup().getGroupId().getValue(), nodeId),
+                        MoreExecutors.directExecutor());
+                return resultFuture;
+            });
+        }
     }
 
     @Override
-    public Future<RpcResult<AddGroupOutput>> add(final InstanceIdentifier<Group> identifier, final Group addDataObj,
+    public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Group> identifier, final Group addDataObj,
             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
-
-        final Group group = addDataObj;
-        final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
-        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-
-        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
-        builder.setGroupRef(new GroupRef(identifier));
-        builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-        AddGroupInput addGroupInput = builder.build();
-        return nodeConfigurator
-                .enqueueJob(nodeId.getValue(), () -> {
-                    final ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
-                    resultFuture = this.provider.getSalGroupService().addGroup(addGroupInput);
-                    Futures.addCallback(resultFuture,
-                            new AddGroupCallBack(addGroupInput.getGroupId().getValue(), nodeId),
-                            MoreExecutors.directExecutor());
-                    return resultFuture;
-                });
+        BundleId bundleId = getActiveBundle(nodeIdent, provider);
+        if (bundleId != null) {
+            return bundleGroupForwarder.add(identifier, addDataObj, nodeIdent, bundleId);
+        } else {
+            final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+            return nodeConfigurator
+                    .enqueueJob(nodeId.getValue(), () -> {
+                        final Group group = addDataObj;
+                        final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
+                        builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
+                        builder.setGroupRef(new GroupRef(identifier));
+                        builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
+                        AddGroupInput addGroupInput = builder.build();
+                        final ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
+                        resultFuture = this.provider.getSalGroupService().addGroup(addGroupInput);
+                        Futures.addCallback(resultFuture,
+                                new AddGroupCallBack(addGroupInput.getGroupId().getValue(), nodeId),
+                                MoreExecutors.directExecutor());
+                        return resultFuture;
+                    });
+        }
     }
 
     @Override
diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/util/FrmUtil.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/util/FrmUtil.java
new file mode 100644 (file)
index 0000000..cf79da8
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm.util;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.openflowplugin.applications.frm.ActionType;
+import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.ApplyActionsCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class FrmUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(FrmUtil.class);
+    private static final String SEPARATOR = ":";
+    private static final long RPC_RESULT_TIMEOUT = 2500;
+
+    private FrmUtil() {
+        throw new IllegalStateException("This class should not be instantiated.");
+    }
+
+    public static NodeId getNodeIdFromNodeIdentifier(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+        return nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId();
+    }
+
+    public static String getFlowId(final FlowRef flowRef) {
+        return flowRef.getValue().firstKeyOf(Flow.class, FlowKey.class).getId().getValue();
+    }
+
+    public static BigInteger getDpnIdFromNodeName(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+        String nodeId = nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
+        String dpId = nodeId.substring(nodeId.lastIndexOf(SEPARATOR) + 1);
+        return new BigInteger(dpId);
+    }
+
+    public static Long isFlowDependentOnGroup(final Flow flow) {
+        LOG.debug("Check if flow {} is dependent on group", flow);
+        if (flow.getInstructions() != null) {
+            List<Instruction> instructions = flow.getInstructions().getInstruction();
+            for (Instruction instruction : instructions) {
+                List<Action> actions = Collections.emptyList();
+                if (instruction.getInstruction().getImplementedInterface()
+                        .equals(ActionType.APPLY_ACTION.getActionType())) {
+                    actions = ((ApplyActionsCase) (instruction.getInstruction()))
+                            .getApplyActions().getAction();
+                }
+                for (Action action : actions) {
+                    if (action.getAction().getImplementedInterface()
+                            .equals(ActionType.GROUP_ACTION.getActionType())) {
+                        return ((GroupActionCase) action.getAction()).getGroupAction()
+                                .getGroupId();
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public static InstanceIdentifier<Group> buildGroupInstanceIdentifier(
+            final InstanceIdentifier<FlowCapableNode> nodeIdent, final Long groupId) {
+        NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
+                .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class)
+                .child(Group.class, new GroupKey(new GroupId(groupId))).build();
+        return groupInstanceId;
+    }
+
+    public static BundleId getActiveBundle(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final ForwardingRulesManager provider) {
+        BigInteger dpId = getDpnIdFromNodeName(nodeIdent);
+        final NodeRef nodeRef = new NodeRef(nodeIdent.firstIdentifierOf(Node.class));
+        GetActiveBundleInputBuilder input = new GetActiveBundleInputBuilder().setNodeId(dpId).setNode(nodeRef);
+        RpcResult<GetActiveBundleOutput> result = null;
+        try {
+            result = provider.getArbitratorReconciliationManager()
+                    .getActiveBundle(input.build()).get(RPC_RESULT_TIMEOUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Error while retrieving active bundle present for node {}", dpId , e);
+        }
+        if (!result.isSuccessful()) {
+            LOG.trace("Error while retrieving active bundle present for node {}", dpId);
+        } else {
+            return result.getResult().getResult();
+        }
+        return null;
+    }
+
+    public static boolean isGroupExistsOnDevice(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final Long groupId, final ForwardingRulesManager provider) {
+        NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+        return provider.getDevicesGroupRegistry().isGroupPresent(nodeId, groupId);
+    }
+}
diff --git a/applications/forwardingrules-manager/src/test/java/test/mock/util/ArbitratorReconcileServiceMock.java b/applications/forwardingrules-manager/src/test/java/test/mock/util/ArbitratorReconcileServiceMock.java
new file mode 100644 (file)
index 0000000..e03debd
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package test.mock.util;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+public class ArbitratorReconcileServiceMock implements ArbitratorReconcileService {
+    @Override
+    public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
+            CommitActiveBundleInput input) {
+        return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder().setResult(null).build()).buildFuture();
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(
+            GetActiveBundleInput input) {
+        return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
+                .setResult(null).build())).buildFuture();
+    }
+}
index b57858ea0fff862514e037105d7d27c234ace851..a51b3741f2f28f643bfc1927b01e883220e0fb95 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalF
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -52,6 +53,8 @@ public class RpcProviderRegistryMock implements RpcProviderRegistry {
             return (T) new SalTableServiceMock();
         } else if (serviceInterface.equals(SalBundleService.class)) {
             return (T) new SalBundleServiceMock();
+        } else if (serviceInterface.equals(ArbitratorReconcileService.class)) {
+            return (T) new ArbitratorReconcileServiceMock();
         } else {
             return null;
         }
index 64c509a749e7d169603622520871f1c3c5220452..0d0ad58ae37f686eeed1e394747f2cd16518367e 100644 (file)
             <classifier>features</classifier>
             <type>xml</type>
         </dependency>
-
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin.applications</groupId>
+            <artifactId>arbitratorreconciliation-impl</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.openflowplugin.applications</groupId>
             <artifactId>forwardingrules-manager</artifactId>