import static java.util.Objects.requireNonNull;
import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableClassToInstanceMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessages;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundle;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundle;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundle;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
-import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.binding.Rpc;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.common.Uint64;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
-public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
- ReconciliationNotificationListener, AutoCloseable {
-
+@Component(service = { })
+public final class ArbitratorReconciliationManagerImpl implements ReconciliationNotificationListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
- private static final int THREAD_POOL_SIZE = 4;
private static final AtomicLong BUNDLE_ID = new AtomicLong();
private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
- private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
- .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
private static final String SEPARATOR = ":";
private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
- .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
- .setTableId(OFConstants.OFPTT_ALL)
- .build())
- .build();
+ .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
+ .setTableId(OFConstants.OFPTT_ALL)
+ .build())
+ .build();
private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
- .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
- .setGroupType(GroupTypes.GroupAll)
- .setGroupId(new GroupId(OFConstants.OFPG_ALL))
- .build()).build())
- .build();
-
- private final SalBundleService salBundleService;
- private final ReconciliationManager reconciliationManager;
+ .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
+ .setGroupType(GroupTypes.GroupAll)
+ .setGroupId(new GroupId(OFConstants.OFPG_ALL))
+ .build()).build())
+ .build();
+
+ // FIXME: use CM to control this constant
+ private static final int THREAD_POOL_SIZE = 4;
+ // FIXME: use CM to control this constant
+ private static final int ARBITRATOR_RECONCILIATION_PRIORITY =
+ Integer.getInteger("arbitrator.reconciliation.manager.priority", 0 /*default*/);
+
+ private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+ private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Registration> rpcRegistrations = new ConcurrentHashMap<>();
+
private final RpcProviderService rpcProviderService;
private final UpgradeState upgradeState;
- private NotificationRegistration registration;
- private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
- Executors.newFixedThreadPool(THREAD_POOL_SIZE));
- private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
- private final ConcurrentMap<String,
- ObjectRegistration<? extends RpcService>> rpcRegistrations = new ConcurrentHashMap<>();
+ private final NotificationRegistration registration;
+ private final AddBundleMessages addBundleMessages;
+ private final ControlBundle controlBundle;
@Inject
- public ArbitratorReconciliationManagerImpl(final ReconciliationManager reconciliationManager,
- final RpcProviderService rpcProviderService, final RpcConsumerRegistry rpcRegistry,
- final UpgradeState upgradeState) {
- Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
- this.reconciliationManager = requireNonNull(reconciliationManager, "ReconciliationManager cannot be null!");
- salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class),
- "RPC SalBundleService not found.");
- this.rpcProviderService = rpcProviderService;
- this.upgradeState = requireNonNull(upgradeState, "UpgradeState cannot be null!");
- }
-
- @PostConstruct
- public void start() {
+ @Activate
+ public ArbitratorReconciliationManagerImpl(@Reference final ReconciliationManager reconciliationManager,
+ @Reference final RpcProviderService rpcProviderService, @Reference final RpcConsumerRegistry rpcService,
+ @Reference final UpgradeState upgradeState) {
+ this.rpcProviderService = requireNonNull(rpcProviderService);
+ this.upgradeState = requireNonNull(upgradeState);
+ addBundleMessages = requireNonNull(rpcService.getRpc(AddBundleMessages.class));
+ controlBundle = requireNonNull(rpcService.getRpc(ControlBundle.class));
registration = reconciliationManager.registerService(this);
LOG.info("ArbitratorReconciliationManager has started successfully.");
}
- @Override
+ @Deactivate
@PreDestroy
+ @Override
public void close() throws Exception {
executor.shutdown();
- if (registration != null) {
- registration.close();
- registration = null;
- }
+ registration.close();
}
- @Override
- public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
+ private ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
final CommitActiveBundleInput input) {
- Uint64 nodeId = input.getNodeId();
- if (bundleIdMap.containsKey(nodeId)) {
- BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
- if (bundleId != null) {
- final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
- .setNode(input.getNode())
- .setBundleId(bundleId)
- .setFlags(BUNDLE_FLAGS)
- .setType(BundleControlType.ONFBCTCOMMITREQUEST)
- .build();
- ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
- .controlBundle(commitBundleInput);
- bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
-
- Futures.addCallback(rpcResult,
- new CommitActiveBundleCallback(nodeId),
- MoreExecutors.directExecutor());
- return Futures.transform(
- rpcResult,
- createRpcResultCondenser("committed active bundle"),
- MoreExecutors.directExecutor());
- }
+ final var nodeId = input.getNodeId();
+ final var details = bundleIdMap.get(nodeId);
+ if (details != null) {
+ final var rpcResult = controlBundle.invoke(new ControlBundleInputBuilder()
+ .setNode(input.getNode())
+ .setBundleId(details.bundleId)
+ .setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTCOMMITREQUEST)
+ .build());
+ bundleIdMap.put(nodeId, new BundleDetails(details.bundleId, rpcResult));
+
+ Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), MoreExecutors.directExecutor());
+ return Futures.transform(rpcResult, createRpcResultCondenser("committed active bundle"),
+ MoreExecutors.directExecutor());
}
return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
.setResult(null).build())
null, "No active bundle found for the node" + nodeId))).buildFuture();
}
- @Override
- public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
+ private ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
Uint64 nodeId = input.getNodeId();
BundleDetails bundleDetails = bundleIdMap.get(nodeId);
if (bundleDetails != null) {
try {
//This blocking call is used to prevent the applications from pushing flows and groups via the default
// pipeline when the commit bundle is ongoing.
- bundleDetails.getResult().get();
+ bundleDetails.result.get();
return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
- .setResult(bundleDetails.getBundleId())
+ .setResult(bundleDetails.bundleId)
.build())
.buildFuture();
} catch (InterruptedException | ExecutionException e) {
null, e.getMessage()))).buildFuture();
}
}
- return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
- .setResult(null).build()).buildFuture();
+ return RpcResultBuilder.success(new GetActiveBundleOutputBuilder().setResult(null).build()).buildFuture();
}
@Override
return ResultState.DONOTHING;
}
- private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
- LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
- ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
- return executor.submit(upgradeReconTask);
- }
private static Messages createMessages(final NodeRef nodeRef) {
- final List<Message> messages = new ArrayList<>();
- messages.add(new MessageBuilder()
- .setNode(nodeRef)
- .setBundleInnerMessage(DELETE_ALL_FLOW).build());
- messages.add(new MessageBuilder()
- .setNode(nodeRef)
- .setBundleInnerMessage(DELETE_ALL_GROUP).build());
+ final var messages = List.of(
+ new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_FLOW).build(),
+ new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_GROUP).build());
LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
return new MessagesBuilder().setMessage(messages).build();
}
- private class ArbitratorReconciliationTask implements Callable<Boolean> {
- final DeviceInfo deviceInfo;
+ private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
+ LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
+ return Futures.submit(new ArbitratorReconciliationTask(node), executor);
+ }
+
+ private final class ArbitratorReconciliationTask implements Callable<Boolean> {
+ private final DeviceInfo deviceInfo;
ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
- this.deviceInfo = deviceInfo;
+ this.deviceInfo = requireNonNull(deviceInfo);
}
@Override
InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
.augmentation(FlowCapableNode.class);
String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
- BundleId bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
+ final var bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
- final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder()
+ final var openBundleMessagesFuture = Futures.transformAsync(
+ controlBundle.invoke(new ControlBundleInputBuilder()
.setNode(nodeRef)
.setBundleId(bundleIdValue)
.setFlags(BUNDLE_FLAGS)
.setType(BundleControlType.ONFBCTCLOSEREQUEST)
- .build();
-
- final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
+ .build()),
+ rpcResult -> controlBundle.invoke(new ControlBundleInputBuilder()
.setNode(nodeRef)
.setBundleId(bundleIdValue)
.setFlags(BUNDLE_FLAGS)
.setType(BundleControlType.ONFBCTOPENREQUEST)
- .build();
+ .build()), MoreExecutors.directExecutor());
- final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
- .setNode(nodeRef)
- .setBundleId(bundleIdValue)
- .setFlags(BUNDLE_FLAGS)
- .setMessages(createMessages(nodeRef))
- .build();
-
- ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
- .controlBundle(closeBundleInput);
-
- ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
- .transformAsync(closeBundle, rpcResult -> salBundleService
- .controlBundle(openBundleInput), MoreExecutors.directExecutor());
-
- ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
- .transformAsync(openBundleMessagesFuture, rpcResult -> {
- if (rpcResult.isSuccessful()) {
- return salBundleService
- .addBundleMessages(addBundleMessagesInput);
- }
- return FluentFutures.immediateNullFluentFuture();
- }, MoreExecutors.directExecutor());
- Uint64 nodeId = getDpnIdFromNodeName(node);
+ final var addBundleMessagesFuture = Futures.transformAsync(openBundleMessagesFuture,
+ rpcResult -> rpcResult.isSuccessful()
+ ? addBundleMessages.invoke(new AddBundleMessagesInputBuilder()
+ .setNode(nodeRef)
+ .setBundleId(bundleIdValue)
+ .setFlags(BUNDLE_FLAGS)
+ .setMessages(createMessages(nodeRef))
+ .build())
+ : FluentFutures.immediateNullFluentFuture(), MoreExecutors.directExecutor());
+ final var nodeId = getDpnIdFromNodeName(node);
try {
if (addBundleMessagesFuture.get().isSuccessful()) {
- bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture()));
+ bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
+ FluentFutures.immediateNullFluentFuture()));
LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
return true;
} else {
}
}
- private static <D> Function<RpcResult<D>,
- RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
+ private static <D> Function<RpcResult<D>, RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(
+ final String action) {
return input -> {
final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
if (input != null) {
- List<RpcError> errors = new ArrayList<>();
+ final var errors = new ArrayList<RpcError>();
if (!input.isSuccessful()) {
errors.addAll(input.getErrors());
resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
}
private void registerRpc(final DeviceInfo node) {
- KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(node.getNodeId()));
+ final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
LOG.debug("The path is registered : {}", path);
- ObjectRegistration<? extends RpcService> rpcRegistration =
- rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class, this, Set.of(path));
- rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration);
+ rpcRegistrations.put(node.getNodeId().getValue(), rpcProviderService.registerRpcImplementations(
+ ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
+ .put(GetActiveBundle.class, this::getActiveBundle)
+ .put(CommitActiveBundle.class, this::commitActiveBundle)
+ .build(), Set.of(path)));
}
private void deregisterRpc(final DeviceInfo node) {
- KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(node.getNodeId()));
+ final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
LOG.debug("The path is unregistered : {}", path);
- ObjectRegistration<? extends RpcService> rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue());
- if (rpcRegistration != null) {
- rpcRegistration.close();
- rpcRegistrations.remove(node.getNodeId().getValue());
+ final var reg = rpcRegistrations.remove(node.getNodeId().getValue());
+ if (reg != null) {
+ reg.close();
}
}
- private static class BundleDetails {
- private final BundleId bundleId;
- private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
-
- BundleDetails(final BundleId bundleId, final ListenableFuture<RpcResult<ControlBundleOutput>> result) {
- this.bundleId = bundleId;
- this.result = result;
- }
-
- public BundleId getBundleId() {
- return bundleId;
- }
-
- public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
- return result;
+ @NonNullByDefault
+ record BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
+ BundleDetails {
+ requireNonNull(bundleId);
+ requireNonNull(result);
}
}