Refactor reconciliation wiring
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / DeviceMastershipManager.java
index ee063c52c00c03d8b3b0c00470479396b90df58a..3299043337915f4fc01993e54a7e39605ce9e319 100644 (file)
@@ -9,6 +9,11 @@ package org.opendaylight.openflowplugin.applications.frm.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Collection;
 import java.util.Set;
@@ -23,7 +28,6 @@ import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
 import org.opendaylight.mdsal.binding.api.DataTreeModification;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeRegistration;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeService;
@@ -33,10 +37,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.Fl
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutputBuilder;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,28 +59,22 @@ public class DeviceMastershipManager implements ClusteredDataTreeChangeListener<
     private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
             .builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
 
-    private final ClusterSingletonServiceProvider clusterSingletonService;
-    private final FlowNodeReconciliation reconcliationAgent;
     private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap<>();
     private final Set<InstanceIdentifier<FlowCapableNode>> activeNodes = ConcurrentHashMap.newKeySet();
     private final ReentrantLock lock = new ReentrantLock();
+    private final FlowNodeReconciliation reconcliationAgent;
     private final RpcProviderService rpcProviderService;
-    private final FrmReconciliationService reconcliationService;
 
-    private ListenerRegistration<DeviceMastershipManager> listenerRegistration;
+    private Registration listenerRegistration;
     private MastershipChangeRegistration mastershipChangeServiceRegistration;
 
     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Non-final for mocking")
-    public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
-                                   final FlowNodeReconciliation reconcliationAgent,
+    public DeviceMastershipManager(final FlowNodeReconciliation reconcliationAgent,
                                    final DataBroker dataBroker,
                                    final MastershipChangeServiceManager mastershipChangeServiceManager,
-                                   final RpcProviderService rpcProviderService,
-                                   final FrmReconciliationService reconciliationService) {
-        this.clusterSingletonService = clusterSingletonService;
+                                   final RpcProviderService rpcProviderService) {
         this.reconcliationAgent = reconcliationAgent;
         this.rpcProviderService = rpcProviderService;
-        reconcliationService = reconciliationService;
         listenerRegistration = dataBroker.registerDataTreeChangeListener(
             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
                 InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)), this);
@@ -193,14 +197,45 @@ public class DeviceMastershipManager implements ClusteredDataTreeChangeListener<
         final var membership = deviceMasterships.computeIfAbsent(deviceInfo.getNodeId(),
             device -> new DeviceMastership(deviceInfo.getNodeId()));
         membership.reconcile();
-        membership.registerReconciliationRpc(rpcProviderService, reconcliationService);
+        membership.registerReconcileNode(rpcProviderService, this::reconcileNode);
+    }
+
+    private ListenableFuture<RpcResult<ReconcileNodeOutput>> reconcileNode(final ReconcileNodeInput input) {
+        final var nodeId = input.requireNodeId();
+        LOG.debug("Triggering reconciliation for node: {}", nodeId);
+
+        final var nodeDpn = new NodeBuilder().setId(new NodeId("openflow:" + nodeId)).build();
+        final var connectedNode = InstanceIdentifier.builder(Nodes.class)
+                .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
+                .build();
+        final var rpcResult = SettableFuture.<RpcResult<ReconcileNodeOutput>>create();
+        Futures.addCallback(reconcliationAgent.reconcileConfiguration(connectedNode), new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                rpcResult.set(result
+                    ? RpcResultBuilder.success(new ReconcileNodeOutputBuilder().setResult(result).build()).build()
+                        : RpcResultBuilder.<ReconcileNodeOutput>failed()
+                            .withError(ErrorType.APPLICATION, "Error while triggering reconciliation")
+                            .build());
+            }
+
+            @Override
+            public void onFailure(final Throwable error) {
+                LOG.error("initReconciliation failed", error);
+                rpcResult.set(RpcResultBuilder.<ReconcileNodeOutput>failed()
+                        .withError(ErrorType.RPC, "Error while calling RPC").build());
+            }
+        }, MoreExecutors.directExecutor());
+
+        LOG.debug("Completing reconciliation for node: {}", nodeId);
+        return rpcResult;
     }
 
     @Override
     public void onLoseOwnership(@NonNull final DeviceInfo deviceInfo) {
         final var mastership = deviceMasterships.remove(deviceInfo.getNodeId());
         if (mastership != null) {
-            mastership.deregisterReconciliationRpc();
+            mastership.deregisterReconcileNode();
             mastership.close();
             LOG.debug("Unregistered deviceMastership for device : {}", deviceInfo.getNodeId());
         }