import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.aries.blueprint.annotation.service.Reference;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.RpcService;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
.setRemoveFlowCaseData(
- new RemoveFlowCaseDataBuilder(new FlowBuilder().setTableId(OFConstants.OFPTT_ALL).build()).build())
+ new RemoveFlowCaseDataBuilder(new FlowBuilder().setTableId(OFConstants.OFPTT_ALL).build()).build())
.build();
private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
.setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
- .setGroupType(GroupTypes.GroupAll)
- .setGroupId(new GroupId(OFConstants.OFPG_ALL))
- .build()).build())
+ .setGroupType(GroupTypes.GroupAll)
+ .setGroupId(new GroupId(OFConstants.OFPG_ALL))
+ .build()).build())
.build();
private final SalBundleService salBundleService;
private final ReconciliationManager reconciliationManager;
- private final RoutedRpcRegistration routedRpcReg;
+ private final RpcProviderService rpcProviderService;
private final UpgradeState upgradeState;
private NotificationRegistration registration;
private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(THREAD_POOL_SIZE));
private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
+ private ObjectRegistration<? extends RpcService> rpcRegistration;
@Inject
- public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry,
- @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) {
+ public ArbitratorReconciliationManagerImpl(
+ @Reference ReconciliationManager reconciliationManager, @Reference RpcProviderService rpcProviderService,
+ @Reference final RpcConsumerRegistry rpcRegistry, @Reference UpgradeState upgradeState) {
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
"ReconciliationManager cannot be null!");
this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
- "RPC SalBundlService not found.");
- this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
- this);
+ "RPC SalBundleService not found.");
+ this.rpcProviderService = rpcProviderService;
this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
}
return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
.setResult(null).build())
.withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
- null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
+ null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
}
@Override
KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(node.getNodeId()));
LOG.debug("The path is registered : {}", path);
- routedRpcReg.registerPath(NodeContext.class, path);
+ rpcRegistration = rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class,
+ this, ImmutableSet.of(path));
}
private void deregisterRpc(DeviceInfo node) {
KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(node.getNodeId()));
LOG.debug("The path is unregistered : {}", path);
- routedRpcReg.unregisterPath(NodeContext.class, path);
+ if (rpcRegistration != null) {
+ rpcRegistration.close();
+ }
}
private static class BundleDetails {
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
synchronized (txLock) {
if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
- future = txChainShuttingDown();
+ future = txChainShuttingDown();
Preconditions.checkState(writeTx == null,
"We have some unexpected WriteTransaction.");
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onFailure(final Throwable throwable) {
- if (throwable instanceof TransactionCommitFailedException) {
+ if (throwable instanceof InterruptedException || throwable instanceof ExecutionException) {
LOG.error("Transaction commit failed. ", throwable);
} else {
if (throwable instanceof CancellationException) {
}
}
- public ListenableFuture<?> shuttingDown() {
+ public FluentFuture<?> shuttingDown() {
if (LOG.isDebugEnabled()) {
LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
}