X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2FMdsalLowLevelTestProvider.java;h=f0569fa07dc4e8842d92504168b117832b99acec;hp=81f0220b1a81560a59c14350d60e91609ea13ef5;hb=45fb790ec883b0ca0cf51469c4a87382bfc4c565;hpb=271e66acd1391895c006806dfe76ae6d09017a83 diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index 81f0220b1a..f0569fa07d 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -8,80 +8,254 @@ package org.opendaylight.controller.clustering.it.provider; +import static akka.actor.ActorRef.noSender; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.ActorSystemProvider; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; +import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; +import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService; import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener; +import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener; +import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler; +import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler; +import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler; +import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; +import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler; +import org.opendaylight.controller.clustering.it.provider.impl.YnlListener; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.controller.md.sal.binding.api.NotificationService; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.PublishNotificationsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService { private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class); + private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG = + org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; private final RpcProviderRegistry rpcRegistry; private final BindingAwareBroker.RpcRegistration registration; + private final DistributedShardFactory distributedShardFactory; + private final DistributedDataStoreInterface configDataStore; + private final DOMDataTreeService domDataTreeService; + private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer; + private final DOMDataBroker domDataBroker; + private final NotificationPublishService notificationPublishService; + private final NotificationService notificationService; + private final SchemaService schemaService; private final ClusterSingletonServiceProvider singletonService; private final DOMRpcProviderService domRpcService; + private final PrefixLeaderHandler prefixLeaderHandler; + private final PrefixShardHandler prefixShardHandler; + private final DOMDataTreeChangeService domDataTreeChangeService; + private final ActorSystem actorSystem; + + private Map, DOMRpcImplementationRegistration> routedRegistrations = + new HashMap<>(); + + private Map> ynlRegistrations = new HashMap<>(); private DOMRpcImplementationRegistration globalGetConstantRegistration = null; + private ClusterSingletonServiceRegistration getSingletonConstantRegistration; + private FlappingSingletonService flappingSingletonService; + private ListenerRegistration dtclReg; + private IdIntsListener idIntsListener; + private Map publishNotificationsTasks = new HashMap<>(); + private ListenerRegistration ddtlReg; + private IdIntsDOMDataTreeLIstener idIntsDdtl; + + public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry, final DOMRpcProviderService domRpcService, - final ClusterSingletonServiceProvider singletonService) { + final ClusterSingletonServiceProvider singletonService, + final SchemaService schemaService, + final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer, + final NotificationPublishService notificationPublishService, + final NotificationService notificationService, + final DOMDataBroker domDataBroker, + final DOMDataTreeService domDataTreeService, + final DistributedShardFactory distributedShardFactory, + final DistributedDataStoreInterface configDataStore, + final ActorSystemProvider actorSystemProvider) { this.rpcRegistry = rpcRegistry; this.domRpcService = domRpcService; this.singletonService = singletonService; + this.schemaService = schemaService; + this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer; + this.notificationPublishService = notificationPublishService; + this.notificationService = notificationService; + this.domDataBroker = domDataBroker; + this.domDataTreeService = domDataTreeService; + this.distributedShardFactory = distributedShardFactory; + this.configDataStore = configDataStore; + this.actorSystem = actorSystemProvider.getActorSystem(); + + this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer); + + domDataTreeChangeService = + (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class); registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); + + prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService, + bindingNormalizedNodeSerializer); } @Override public Future> unregisterSingletonConstant() { - return null; + LOG.debug("unregister-singleton-constant"); + + if (getSingletonConstantRegistration == null) { + LOG.debug("No get-singleton-constant registration present."); + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present."); + final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } + + try { + getSingletonConstantRegistration.close(); + getSingletonConstantRegistration = null; + + return Futures.immediateFuture(RpcResultBuilder.success().build()); + } catch (final Exception e) { + LOG.debug("There was a problem closing the singleton constant service", e); + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant"); + final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } } @Override - public Future> publishNotifications(PublishNotificationsInput input) { - return null; + public Future> startPublishNotifications(final StartPublishNotificationsInput input) { + LOG.debug("publish-notifications, input: {}", input); + + final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(), + input.getSeconds(), input.getNotificationsPerSecond()); + + publishNotificationsTasks.put(input.getId(), task); + + task.start(); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> subscribeDtcl() { - return null; + + if (dtclReg != null) { + final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", + "There is already dataTreeChangeListener registered on id-ints list."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + idIntsListener = new IdIntsListener(); + + dtclReg = domDataTreeChangeService + .registerDataTreeChangeListener( + new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier( + CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID), + idIntsListener); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override - public Future> writeTransactions(WriteTransactionsInput input) { - return null; + public Future> writeTransactions(final WriteTransactionsInput input) { + LOG.debug("write-transactions, input: {}", input); + + final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input); + + final SettableFuture> settableFuture = SettableFuture.create(); + writeTransactionsHandler.start(settableFuture); + + return settableFuture; } @Override @@ -90,33 +264,75 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> becomeModuleLeader(BecomeModuleLeaderInput input) { + public Future> removeShardReplica(RemoveShardReplicaInput input) { return null; } @Override - public Future> removeShardReplica(RemoveShardReplicaInput input) { - return null; + public Future> subscribeYnl(final SubscribeYnlInput input) { + + LOG.debug("subscribe-ynl, input: {}", input); + + if (ynlRegistrations.containsKey(input.getId())) { + final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", + "There is already ynl listener registered for this id: " + input.getId()); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + ynlRegistrations.put(input.getId(), + notificationService.registerNotificationListener(new YnlListener(input.getId()))); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override - public Future> subscribeYnl(SubscribeYnlInput input) { - return null; + public Future> removePrefixShard(final RemovePrefixShardInput input) { + LOG.debug("remove-prefix-shard, input: {}", input); + + return prefixShardHandler.onRemovePrefixShard(input); } @Override - public Future> becomePrefixLeader(BecomePrefixLeaderInput input) { - return null; + public Future> becomePrefixLeader(final BecomePrefixLeaderInput input) { + LOG.debug("become-prefix-leader, input: {}", input); + + return prefixLeaderHandler.makeLeaderLocal(input); } @Override - public Future> unregisterBoundConstant(UnregisterBoundConstantInput input) { - return null; + public Future> unregisterBoundConstant(final UnregisterBoundConstantInput input) { + LOG.debug("unregister-bound-constant, {}", input); + + final DOMRpcImplementationRegistration registration = + routedRegistrations.remove(input.getContext()); + + if (registration == null) { + LOG.debug("No get-contexted-constant registration for context: {}", input.getContext()); + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present."); + final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } + + registration.close(); + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override - public Future> registerSingletonConstant(RegisterSingletonConstantInput input) { - return null; + public Future> registerSingletonConstant(final RegisterSingletonConstantInput input) { + + LOG.debug("Received register-singleton-constant rpc, input: {}", input); + + if (input.getConstant() == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Invalid input.", "Constant value is null"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + getSingletonConstantRegistration = + SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant()); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -142,7 +358,23 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> unregisterFlappingSingleton() { - return null; + LOG.debug("unregister-flapping-singleton received."); + + if (flappingSingletonService == null) { + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present."); + final RpcResult result = + RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } + + final long flapCount = flappingSingletonService.setInactive(); + flappingSingletonService = null; + + final UnregisterFlappingSingletonOutput output = + new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build(); + + return Futures.immediateFuture(RpcResultBuilder.success(output).build()); } @Override @@ -152,22 +384,125 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> subscribeDdtl() { - return null; + + if (ddtlReg != null) { + final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", + "There is already dataTreeChangeListener registered on id-ints list."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + idIntsDdtl = new IdIntsDOMDataTreeLIstener(); + + try { + ddtlReg = + domDataTreeService.registerListener(idIntsDdtl, + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + ProduceTransactionsHandler.ID_INT_YID)) + , true, Collections.emptyList()); + } catch (DOMDataTreeLoopException e) { + LOG.error("Failed to register DOMDataTreeListener.", e); + + } + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override - public Future> registerBoundConstant(RegisterBoundConstantInput input) { - return null; + public Future> registerBoundConstant(final RegisterBoundConstantInput input) { + LOG.debug("register-bound-constant: {}", input); + + if (input.getContext() == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Invalid input.", "Context value is null"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + if (input.getConstant() == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Invalid input.", "Constant value is null"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + if (routedRegistrations.containsKey(input.getContext())) { + final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", + "There is already a rpc registered for context: " + input.getContext()); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + final DOMRpcImplementationRegistration registration = + RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService, + input.getConstant(), input.getContext()); + + routedRegistrations.put(input.getContext(), registration); + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> registerFlappingSingleton() { - return null; + LOG.debug("Received register-flapping-singleton."); + + if (flappingSingletonService != null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Registration present.", "flapping-singleton already registered"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + flappingSingletonService = new FlappingSingletonService(singletonService); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> unsubscribeDtcl() { - return null; + LOG.debug("Received unsubscribe-dtcl"); + + if (idIntsListener == null || dtclReg == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + dtclReg.close(); + dtclReg = null; + + if (!idIntsListener.hasTriggered()) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" + + "any notifications."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + + final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction(); + try { + final Optional> readResult = + rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet(); + + if (!readResult.isPresent()) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + + return Futures.immediateFuture( + RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder() + .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build()); + + } catch (final ReadFailedException e) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + + } + } + + @Override + public Future> createPrefixShard(final CreatePrefixShardInput input) { + LOG.debug("create-prefix-shard, input: {}", input); + + return prefixShardHandler.onCreatePrefixShard(input); } @Override @@ -176,13 +511,111 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> unsubscribeYnl(UnsubscribeYnlInput input) { - return null; + public Future> unsubscribeYnl(final UnsubscribeYnlInput input) { + LOG.debug("Received unsubscribe-ynl, input: {}", input); + + if (!ynlRegistrations.containsKey(input.getId())) { + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered."); + final RpcResult result = + RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } + + final ListenerRegistration registration = ynlRegistrations.remove(input.getId()); + final UnsubscribeYnlOutput output = registration.getInstance().getOutput(); + + registration.close(); + + return Futures.immediateFuture(RpcResultBuilder.success().withResult(output).build()); } @Override - public Future> produceTransactions(ProduceTransactionsInput input) { - return null; + public Future> checkPublishNotifications( + final CheckPublishNotificationsInput input) { + + final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId()); + + if (task == null) { + return Futures.immediateFuture(RpcResultBuilder.success( + new CheckPublishNotificationsOutputBuilder().setActive(false)).build()); + } + + final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder = + new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished()); + + if (task.getLastError() != null) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw); + task.getLastError().printStackTrace(pw); + checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString()); + } + + final CheckPublishNotificationsOutput output = + checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build(); + + return Futures.immediateFuture(RpcResultBuilder.success(output).build()); + } + + @Override + public Future> produceTransactions(final ProduceTransactionsInput input) { + LOG.debug("producer-transactions, input: {}", input); + + final ProduceTransactionsHandler handler = + new ProduceTransactionsHandler(domDataTreeService, input); + + final SettableFuture> settableFuture = SettableFuture.create(); + handler.start(settableFuture); + + return settableFuture; + } + + @Override + public Future> shutdownShardReplica(final ShutdownShardReplicaInput input) { + LOG.debug("Received shutdown-shard-replica rpc, input: {}", input); + + final String shardName = input.getShardName(); + if (Strings.isNullOrEmpty(shardName)) { + final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element", + "A valid shard name must be specified"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(rpcError).build()); + } + + final SettableFuture> rpcResult = SettableFuture.create(); + final ActorContext context = configDataStore.getActorContext(); + + long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().$times(3).toMillis(), 10000); + final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS); + final scala.concurrent.Promise shutdownShardAsk = akka.dispatch.Futures.promise(); + + context.findLocalShardAsync(shardName).onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable { + if (throwable != null) { + shutdownShardAsk.failure(throwable); + } else { + shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE)); + } + } + }, context.getClientDispatcher()); + + shutdownShardAsk.future().onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable { + if (throwable != null) { + final RpcResult failedResult = RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build(); + rpcResult.set(failedResult); + } else { + // according to Patterns.gracefulStop API, we don't have to + // check value of gracefulStopResult + rpcResult.set(RpcResultBuilder.success().build()); + } + } + }, context.getClientDispatcher()); + + return rpcResult; } @Override @@ -213,6 +646,79 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> unsubscribeDdtl() { - return null; + LOG.debug("Received unsubscribe-ddtl."); + + if (idIntsDdtl == null || ddtlReg == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + ddtlReg.close(); + ddtlReg = null; + + if (!idIntsDdtl.hasTriggered()) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" + + "any notifications."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + + final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID); + LOG.debug("Creating distributed datastore client for shard {}", shardName); + + final ActorContext actorContext = configDataStore.getActorContext(); + final Props distributedDataStoreClientProps = + SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(), + "Shard-" + shardName, actorContext, shardName); + + final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); + final DataStoreClient distributedDataStoreClient; + try { + distributedDataStoreClient = SimpleDataStoreClientActor + .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); + } catch (final Exception e) { + LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e); + clientActor.tell(PoisonPill.getInstance(), noSender()); + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Unable to create ds client for read.", + "Unable to create ds client for read."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + + final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory(); + final ClientTransaction tx = localHistory.createTransaction(); + final CheckedFuture>, + org.opendaylight.mdsal.common.api.ReadFailedException> read = + tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT)); + + tx.abort(); + localHistory.close(); + try { + final Optional> optional = read.checkedGet(); + if (!optional.isPresent()) { + LOG.warn("Final read from client is empty."); + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + + return Futures.immediateFuture( + RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder() + .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build()); + + } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) { + LOG.error("Unable to read data to verify ddtl data.", e); + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } finally { + distributedDataStoreClient.close(); + clientActor.tell(PoisonPill.getInstance(), noSender()); + } } }