X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2FMdsalLowLevelTestProvider.java;h=e0e8d99d1aab3eac8df847bf7075de6e15b0257e;hb=d97061af6814ad7b085af10797a252aa4aa5cda6;hp=f0569fa07dc4e8842d92504168b117832b99acec;hpb=d5fcf5d66568519595b533cc20651634d66d34fb;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index f0569fa07d..e0e8d99d1a 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -26,8 +26,10 @@ import java.io.StringWriter; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; @@ -85,6 +87,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput; @@ -134,17 +137,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final DOMDataTreeChangeService domDataTreeChangeService; private final ActorSystem actorSystem; - private Map, DOMRpcImplementationRegistration> routedRegistrations = + private final Map, DOMRpcImplementationRegistration> routedRegistrations = new HashMap<>(); - private Map> ynlRegistrations = new HashMap<>(); + private final Map> ynlRegistrations = new HashMap<>(); private DOMRpcImplementationRegistration globalGetConstantRegistration = null; private ClusterSingletonServiceRegistration getSingletonConstantRegistration; private FlappingSingletonService flappingSingletonService; private ListenerRegistration dtclReg; private IdIntsListener idIntsListener; - private Map publishNotificationsTasks = new HashMap<>(); + private final Map publishNotificationsTasks = new HashMap<>(); private ListenerRegistration ddtlReg; private IdIntsDOMDataTreeLIstener idIntsDdtl; @@ -249,13 +252,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> writeTransactions(final WriteTransactionsInput input) { LOG.debug("write-transactions, input: {}", input); - - final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input); - - final SettableFuture> settableFuture = SettableFuture.create(); - writeTransactionsHandler.start(settableFuture); - - return settableFuture; + return WriteTransactionsHandler.start(domDataBroker, input); } @Override @@ -264,7 +261,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> removeShardReplica(RemoveShardReplicaInput input) { + public Future> removeShardReplica(final RemoveShardReplicaInput input) { return null; } @@ -336,7 +333,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> registerDefaultConstant(RegisterDefaultConstantInput input) { + public Future> registerDefaultConstant(final RegisterDefaultConstantInput input) { return null; } @@ -378,7 +375,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> addShardReplica(AddShardReplicaInput input) { + public Future> addShardReplica(final AddShardReplicaInput input) { return null; } @@ -462,6 +459,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } + try { + idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.", + "clustering-it", "clustering-it", e); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + dtclReg.close(); dtclReg = null; @@ -560,14 +567,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> produceTransactions(final ProduceTransactionsInput input) { LOG.debug("producer-transactions, input: {}", input); - - final ProduceTransactionsHandler handler = - new ProduceTransactionsHandler(domDataTreeService, input); - - final SettableFuture> settableFuture = SettableFuture.create(); - handler.start(settableFuture); - - return settableFuture; + return ProduceTransactionsHandler.start(domDataTreeService, input); } @Override @@ -581,6 +581,28 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(rpcError).build()); } + return shutdownShardGracefully(shardName); + } + + @Override + public Future> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) { + LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input); + + final InstanceIdentifier shardPrefix = input.getPrefix(); + + if (shardPrefix == null) { + final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element", + "A valid shard prefix must be specified"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(rpcError).build()); + } + + final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix); + final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath); + + return shutdownShardGracefully(cleanPrefixShardName); + } + + private SettableFuture> shutdownShardGracefully(final String shardName) { final SettableFuture> rpcResult = SettableFuture.create(); final ActorContext context = configDataStore.getActorContext(); @@ -614,7 +636,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } } }, context.getClientDispatcher()); - return rpcResult; } @@ -654,6 +675,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } + try { + idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.", + "clustering-it", "clustering-it", e); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + ddtlReg.close(); ddtlReg = null;