import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import java.util.Set;
import org.opendaylight.mdsal.binding.api.DataTreeModification;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeRegistration;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutputBuilder;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
.builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
- private final ClusterSingletonServiceProvider clusterSingletonService;
- private final FlowNodeReconciliation reconcliationAgent;
private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap<>();
private final Set<InstanceIdentifier<FlowCapableNode>> activeNodes = ConcurrentHashMap.newKeySet();
private final ReentrantLock lock = new ReentrantLock();
+ private final FlowNodeReconciliation reconcliationAgent;
private final RpcProviderService rpcProviderService;
- private final FrmReconciliationService reconcliationService;
- private ListenerRegistration<DeviceMastershipManager> listenerRegistration;
+ private Registration listenerRegistration;
private MastershipChangeRegistration mastershipChangeServiceRegistration;
@SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Non-final for mocking")
- public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
- final FlowNodeReconciliation reconcliationAgent,
+ public DeviceMastershipManager(final FlowNodeReconciliation reconcliationAgent,
final DataBroker dataBroker,
final MastershipChangeServiceManager mastershipChangeServiceManager,
- final RpcProviderService rpcProviderService,
- final FrmReconciliationService reconciliationService) {
- this.clusterSingletonService = clusterSingletonService;
+ final RpcProviderService rpcProviderService) {
this.reconcliationAgent = reconcliationAgent;
this.rpcProviderService = rpcProviderService;
- reconcliationService = reconciliationService;
listenerRegistration = dataBroker.registerDataTreeChangeListener(
DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)), this);
final var membership = deviceMasterships.computeIfAbsent(deviceInfo.getNodeId(),
device -> new DeviceMastership(deviceInfo.getNodeId()));
membership.reconcile();
- membership.registerReconciliationRpc(rpcProviderService, reconcliationService);
+ membership.registerReconcileNode(rpcProviderService, this::reconcileNode);
+ }
+
+ private ListenableFuture<RpcResult<ReconcileNodeOutput>> reconcileNode(final ReconcileNodeInput input) {
+ final var nodeId = input.requireNodeId();
+ LOG.debug("Triggering reconciliation for node: {}", nodeId);
+
+ final var nodeDpn = new NodeBuilder().setId(new NodeId("openflow:" + nodeId)).build();
+ final var connectedNode = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
+ .build();
+ final var rpcResult = SettableFuture.<RpcResult<ReconcileNodeOutput>>create();
+ Futures.addCallback(reconcliationAgent.reconcileConfiguration(connectedNode), new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Boolean result) {
+ rpcResult.set(result
+ ? RpcResultBuilder.success(new ReconcileNodeOutputBuilder().setResult(result).build()).build()
+ : RpcResultBuilder.<ReconcileNodeOutput>failed()
+ .withError(ErrorType.APPLICATION, "Error while triggering reconciliation")
+ .build());
+ }
+
+ @Override
+ public void onFailure(final Throwable error) {
+ LOG.error("initReconciliation failed", error);
+ rpcResult.set(RpcResultBuilder.<ReconcileNodeOutput>failed()
+ .withError(ErrorType.RPC, "Error while calling RPC").build());
+ }
+ }, MoreExecutors.directExecutor());
+
+ LOG.debug("Completing reconciliation for node: {}", nodeId);
+ return rpcResult;
}
@Override
public void onLoseOwnership(@NonNull final DeviceInfo deviceInfo) {
final var mastership = deviceMasterships.remove(deviceInfo.getNodeId());
if (mastership != null) {
- mastership.deregisterReconciliationRpc();
+ mastership.deregisterReconcileNode();
mastership.close();
LOG.debug("Unregistered deviceMastership for device : {}", deviceInfo.getNodeId());
}