X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2FMdsalLowLevelTestProvider.java;h=6b94983b6f360129beead2ebb82c6ad8f1bdfe2c;hb=350d31783d2e7ebcaaa9cc48b572a4d1f2974650;hp=f67e465efbe3892904944bfdd8426705fb5dd450;hpb=19c001c51f162264e2b06baad7bef7650e7648f7;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index f67e465efb..6b94983b6f 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -9,13 +9,22 @@ package org.opendaylight.controller.clustering.it.provider; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; +import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService; import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; -import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec; -import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodecFactory; +import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler; +import org.opendaylight.controller.clustering.it.provider.impl.YnlListener; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.controller.md.sal.binding.api.NotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; @@ -23,34 +32,38 @@ import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.PublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +74,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final RpcProviderRegistry rpcRegistry; private final BindingAwareBroker.RpcRegistration registration; private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer; + private final DOMDataBroker domDataBroker; + private final NotificationPublishService notificationPublishService; + private final NotificationService notificationService; private final SchemaService schemaService; private final ClusterSingletonServiceProvider singletonService; private final DOMRpcProviderService domRpcService; @@ -68,30 +84,71 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private Map, DOMRpcImplementationRegistration> routedRegistrations = new HashMap<>(); + private Map> ynlRegistrations = new HashMap<>(); + private DOMRpcImplementationRegistration globalGetConstantRegistration = null; + private ClusterSingletonServiceRegistration getSingletonConstantRegistration; + private FlappingSingletonService flappingSingletonService; + private Map publishNotificationsTasks = new HashMap<>(); public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry, final DOMRpcProviderService domRpcService, final ClusterSingletonServiceProvider singletonService, final SchemaService schemaService, - final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer) { + final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer, + final NotificationPublishService notificationPublishService, + final NotificationService notificationService, + final DOMDataBroker domDataBroker) { this.rpcRegistry = rpcRegistry; this.domRpcService = domRpcService; this.singletonService = singletonService; this.schemaService = schemaService; this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer; + this.notificationPublishService = notificationPublishService; + this.notificationService = notificationService; + this.domDataBroker = domDataBroker; registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); } @Override public Future> unregisterSingletonConstant() { - return null; + LOG.debug("unregister-singleton-constant"); + + if (getSingletonConstantRegistration == null) { + LOG.debug("No get-singleton-constant registration present."); + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present."); + final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } + + try { + getSingletonConstantRegistration.close(); + getSingletonConstantRegistration = null; + + return Futures.immediateFuture(RpcResultBuilder.success().build()); + } catch (final Exception e) { + LOG.debug("There was a problem closing the singleton constant service", e); + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant"); + final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } } @Override - public Future> publishNotifications(PublishNotificationsInput input) { - return null; + public Future> startPublishNotifications(final StartPublishNotificationsInput input) { + LOG.debug("publish-notifications, input: {}", input); + + final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(), + input.getSeconds(), input.getNotificationsPerSecond()); + + publishNotificationsTasks.put(input.getId(), task); + + task.start(); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -100,8 +157,15 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> writeTransactions(WriteTransactionsInput input) { - return null; + public Future> writeTransactions(final WriteTransactionsInput input) { + LOG.debug("write-transactions, input: {}", input); + + final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input); + + final SettableFuture> settableFuture = SettableFuture.create(); + writeTransactionsHandler.start(settableFuture); + + return settableFuture; } @Override @@ -120,8 +184,20 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> subscribeYnl(SubscribeYnlInput input) { - return null; + public Future> subscribeYnl(final SubscribeYnlInput input) { + + LOG.debug("subscribe-ynl, input: {}", input); + + if (ynlRegistrations.containsKey(input.getId())) { + final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", + "There is already ynl listener registered for this id: " + input.getId()); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + ynlRegistrations.put(input.getId(), + notificationService.registerNotificationListener(new YnlListener(input.getId()))); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -149,8 +225,20 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> registerSingletonConstant(RegisterSingletonConstantInput input) { - return null; + public Future> registerSingletonConstant(final RegisterSingletonConstantInput input) { + + LOG.debug("Received register-singleton-constant rpc, input: {}", input); + + if (input.getConstant() == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Invalid input.", "Constant value is null"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + getSingletonConstantRegistration = + SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant()); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -176,7 +264,23 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> unregisterFlappingSingleton() { - return null; + LOG.debug("unregister-flapping-singleton received."); + + if (flappingSingletonService == null) { + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present."); + final RpcResult result = + RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } + + final long flapCount = flappingSingletonService.setInactive(); + flappingSingletonService = null; + + final UnregisterFlappingSingletonOutput output = + new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build(); + + return Futures.immediateFuture(RpcResultBuilder.success(output).build()); } @Override @@ -221,7 +325,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> registerFlappingSingleton() { - return null; + LOG.debug("Received register-flapping-singleton."); + + if (flappingSingletonService != null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Registration present.", "flappin-singleton already registered"); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + flappingSingletonService = new FlappingSingletonService(singletonService); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -235,8 +349,50 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> unsubscribeYnl(UnsubscribeYnlInput input) { - return null; + public Future> unsubscribeYnl(final UnsubscribeYnlInput input) { + LOG.debug("Received unsubscribe-ynl, input: {}", input); + + if (!ynlRegistrations.containsKey(input.getId())) { + final RpcError rpcError = RpcResultBuilder + .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered."); + final RpcResult result = + RpcResultBuilder.failed().withRpcError(rpcError).build(); + return Futures.immediateFuture(result); + } + + final ListenerRegistration registration = ynlRegistrations.remove(input.getId()); + final UnsubscribeYnlOutput output = registration.getInstance().getOutput(); + + registration.close(); + + return Futures.immediateFuture(RpcResultBuilder.success().withResult(output).build()); + } + + @Override + public Future> checkPublishNotifications( + final CheckPublishNotificationsInput input) { + + final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId()); + + if (task == null) { + return Futures.immediateFuture(RpcResultBuilder.success( + new CheckPublishNotificationsOutputBuilder().setActive(false)).build()); + } + + final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder = + new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished()); + + if (task.getLastError() != null) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw); + task.getLastError().printStackTrace(pw); + checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString()); + } + + final CheckPublishNotificationsOutput output = + checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build(); + + return Futures.immediateFuture(RpcResultBuilder.success(output).build()); } @Override