import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.lang.management.ManagementFactory;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.Reconcile;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationCounter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.common.Uint64;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// FIXME: this is not just a CLI component, it should live somewhere else
-public final class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(ReconciliationServiceImpl.class);
+@Singleton
+@Component(service = ReconcileService.class, immediate = true)
+// FIXME: this should probably live in FRM, but how does it integrate with its functionality?
+public final class DefaultReconcileService implements Reconcile, ReconcileService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultReconcileService.class);
private static final ObjectName ALARM_NAME;
static {
private final DataBroker broker;
private ExecutorService executor = Executors.newWorkStealingPool(10);
- private boolean unregister;
+ private boolean unregister = false;
- public ReconciliationServiceImpl(final DataBroker broker, final ForwardingRulesManager frm,
- final DpnTracker dpnTracker, final FlowGroupCacheManager flowGroupCacheManager) {
+ @Inject
+ @Activate
+ public DefaultReconcileService(@Reference final DataBroker broker, @Reference final ForwardingRulesManager frm,
+ @Reference final DpnTracker dpnTracker, @Reference final FlowGroupCacheManager flowGroupCacheManager) {
this.broker = requireNonNull(broker);
flowNodeReconciliation = frm.getFlowNodeReconciliation();
this.dpnTracker = requireNonNull(dpnTracker);
reconciliationStates = flowGroupCacheManager.getReconciliationStates();
- unregister = false;
final var mbs = ManagementFactory.getPlatformMBeanServer();
if (!mbs.isRegistered(ALARM_NAME)) {
try {
}
}
+ @PreDestroy
+ @Deactivate
@Override
public void close() {
if (unregister) {
}
@Override
- public ListenableFuture<RpcResult<ReconcileOutput>> reconcile(final ReconcileInput input) {
- boolean reconcileAllNodes = input.getReconcileAllNodes();
- Set<Uint64> inputNodes = input.getNodes();
- if (inputNodes == null) {
- inputNodes = Set.of();
+ public ListenableFuture<RpcResult<ReconcileOutput>> reconcile(final Set<Uint64> nodes) {
+ if (nodes == null || nodes.isEmpty()) {
+ return buildErrorResponse("Error executing command reconcile. No Node information was specified.");
}
- if (reconcileAllNodes && inputNodes.size() > 0) {
+
+ final var allNodes = getAllNodes();
+ final var unresolvedNodes = nodes.stream().filter(node -> !allNodes.contains(node))
+ .collect(Collectors.toList());
+ if (!unresolvedNodes.isEmpty()) {
return buildErrorResponse("Error executing command reconcile. "
- + "If 'all' option is enabled, no Node must be specified as input parameter.");
+ + "Node(s) not found: " + String.join(", ", unresolvedNodes.toString()));
}
- if (!reconcileAllNodes && inputNodes.size() == 0) {
- return buildErrorResponse("Error executing command reconcile. No Node information was specified.");
+ return doReconcile(nodes.stream().map(Uint64::longValue).collect(Collectors.toList()));
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<ReconcileOutput>> reconcileAll() {
+ return doReconcile(getAllNodes());
+ }
+
+ private @NonNull ListenableFuture<RpcResult<ReconcileOutput>> doReconcile(final List<Long> nodes) {
+ if (!nodes.isEmpty()) {
+ return buildErrorResponse(
+ "Error executing command reconcile. No node information is found for reconciliation");
}
- SettableFuture<RpcResult<ReconcileOutput>> result = SettableFuture.create();
- List<Long> nodeList = getAllNodes();
- List<Long> nodesToReconcile = reconcileAllNodes ? nodeList :
- inputNodes.stream().distinct().map(Uint64::longValue).collect(Collectors.toList());
- if (nodesToReconcile.size() > 0) {
- List<Long> unresolvedNodes =
- nodesToReconcile.stream().filter(node -> !nodeList.contains(node)).collect(Collectors.toList());
- if (!unresolvedNodes.isEmpty()) {
- return buildErrorResponse("Error executing command reconcile. "
- + "Node(s) not found: " + String.join(", ", unresolvedNodes.toString()));
+ final var inprogressNodes = ImmutableSet.<Uint64>builder();
+ nodes.parallelStream().forEach(nodeId -> {
+ ReconciliationState state = getReconciliationState(nodeId);
+ if (state != null && state.getState().equals(STARTED)) {
+ inprogressNodes.add(Uint64.valueOf(nodeId));
+ } else {
+ final var alarmText = getAlarmText(nodeId, " started reconciliation");
+ final var source = getSourceText(nodeId);
+ LOG.debug("Raising NodeReconciliationOperationOngoing alarm, alarmText {} source {}", alarmText,
+ source);
+ alarm.raiseAlarm("NodeReconciliationOperationOngoing", alarmText, source);
+ LOG.info("Executing reconciliation for node {} with state ", nodeId);
+ NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
+ executor.execute(new ReconciliationTask(Uint64.valueOf(nodeId), nodeKey));
}
- ImmutableSet.Builder<Uint64> inprogressNodes = ImmutableSet.builder();
- nodesToReconcile.parallelStream().forEach(nodeId -> {
- ReconciliationState state = getReconciliationState(nodeId);
- if (state != null && state.getState().equals(STARTED)) {
- inprogressNodes.add(Uint64.valueOf(nodeId));
- } else {
- final var alarmText = getAlarmText(nodeId, " started reconciliation");
- final var source = getSourceText(nodeId);
- LOG.debug("Raising NodeReconciliationOperationOngoing alarm, alarmText {} source {}", alarmText,
- source);
- alarm.raiseAlarm("NodeReconciliationOperationOngoing", alarmText, source);
- LOG.info("Executing reconciliation for node {} with state ", nodeId);
- NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
- executor.execute(new ReconciliationTask(Uint64.valueOf(nodeId), nodeKey));
- }
- });
- ReconcileOutput reconcilingInProgress = new ReconcileOutputBuilder()
- .setInprogressNodes(inprogressNodes.build())
- .build();
- result.set(RpcResultBuilder.success(reconcilingInProgress).build());
- return result;
- } else {
- return buildErrorResponse("Error executing command reconcile. "
- + "No node information is found for reconciliation");
+ });
+ return RpcResultBuilder.success(new ReconcileOutputBuilder()
+ .setInprogressNodes(inprogressNodes.build())
+ .build())
+ .buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<ReconcileOutput>> invoke(final ReconcileInput input) {
+ final var reconcileAllNodes = input.getReconcileAllNodes();
+ final var nodes = input.getNodes();
+ if (reconcileAllNodes != null && reconcileAllNodes) {
+ if (nodes != null && nodes.size() > 0) {
+ return buildErrorResponse("Error executing command reconcile. If 'all' option is enabled, no Node "
+ + "must be specified as input parameter.");
+ }
+ return reconcileAll();
}
+ return reconcile(nodes == null ? Set.of() : nodes);
}
private ReconciliationState getReconciliationState(final Long nodeId) {
return reconciliationStates.get(nodeId.toString());
}
- private static ListenableFuture<RpcResult<ReconcileOutput>> buildErrorResponse(final String msg) {
+ private static @NonNull ListenableFuture<RpcResult<ReconcileOutput>> buildErrorResponse(final String msg) {
LOG.error("Error {}", msg);
return RpcResultBuilder.<ReconcileOutput>failed()
.withError(ErrorType.PROTOCOL, new ErrorTag("reconcile"), msg)
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.apache.karaf.shell.commands.Option;
import org.apache.karaf.shell.console.OsgiCommandSupport;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInputBuilder;
+import org.opendaylight.openflowplugin.applications.southboundcli.ReconcileService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.Uint64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Reconciliation extends OsgiCommandSupport {
private static final Logger LOG = LoggerFactory.getLogger(Reconciliation.class);
- private ReconciliationService reconciliationService = null;
+ private ReconcileService reconciliationService = null;
- public void setReconciliationService(final ReconciliationService reconciliationService) {
+ public void setReconciliationService(final ReconcileService reconciliationService) {
this.reconciliationService = reconciliationService;
}
@SuppressWarnings("checkstyle:RegexpSinglelineJava")
@Override
protected Object doExecute() throws Exception {
- Set<Uint64> nodes = nodeIds == null
- ? Set.of()
- : nodeIds.stream().distinct().map(Uint64::valueOf).collect(Collectors.toUnmodifiableSet());
+ final var nodes = nodeIds == null ? Set.<Uint64>of()
+ : nodeIds.stream().map(Uint64::valueOf).collect(Collectors.toSet());
+ final var rpcOutput = reconcileAllNodes ? reconciliationService.reconcileAll()
+ : reconciliationService.reconcile(nodes);
LOG.debug("Triggering reconciliation for nodes {}", nodes);
- ReconcileInput rpcInput = new ReconcileInputBuilder().setNodes(nodes)
- .setReconcileAllNodes(reconcileAllNodes).build();
- Future<RpcResult<ReconcileOutput>> rpcOutput = reconciliationService.reconcile(rpcInput);
try {
- RpcResult<ReconcileOutput> rpcResult = rpcOutput.get();
+ final var rpcResult = rpcOutput.get();
if (rpcResult.isSuccessful()) {
System.out.println("Reconciliation triggered for the node(s)");
printInProgressNodes(rpcResult.getResult());
@SuppressWarnings("checkstyle:RegexpSinglelineJava")
private static void printInProgressNodes(final ReconcileOutput reconcileOutput) {
- Set<Uint64> inprogressNodes = reconcileOutput.getInprogressNodes();
+ final var inprogressNodes = reconcileOutput.getInprogressNodes();
if (inprogressNodes.size() > 0) {
- StringBuilder stringBuilder = new StringBuilder();
- final Formatter formatter = new Formatter(stringBuilder);
- System.out.println(getReconcileHeaderOutput());
- System.out.println("----------------------------------------------------");
- for (Uint64 node : inprogressNodes) {
- System.out.println(formatter.format("%-15s %n",node).toString());
- stringBuilder.setLength(0);
+ final var stringBuilder = new StringBuilder();
+ try (var formatter = new Formatter(stringBuilder)) {
+ System.out.println(getReconcileHeaderOutput());
+ System.out.println("----------------------------------------------------");
+ for (Uint64 node : inprogressNodes) {
+ System.out.println(formatter.format("%-15s %n",node).toString());
+ stringBuilder.setLength(0);
+ }
}
}
}
private static String getReconcileHeaderOutput() {
- final Formatter formatter = new Formatter();
- String header = formatter.format("%-15s %n", "Reconciliation already InProgress for below node(s)").toString();
- formatter.close();
- return header;
+ try (var formatter = new Formatter()) {
+ return formatter.format("%-15s %n", "Reconciliation already InProgress for below node(s)").toString();
+ }
}
}