X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2FMdsalLowLevelTestProvider.java;h=98ae842a505863fec0e7199a56ab18d1ebd75f05;hp=4ff42c14a1c6c76e1a5446811fca0e7d2fdc1758;hb=841d4041df1926cd2cab034533ad081cd4351ac7;hpb=bb61cf2bfc27e04d157f08fac3198fda532cebd6 diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index 4ff42c14a1..98ae842a50 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -8,13 +8,28 @@ package org.opendaylight.controller.clustering.it.provider; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService; import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener; +import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener; +import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler; +import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler; +import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler; import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService; @@ -22,33 +37,50 @@ import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactions import org.opendaylight.controller.clustering.it.provider.impl.YnlListener; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.NotificationService; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.PublishNotificationsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; @@ -59,15 +91,21 @@ import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService { private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class); + private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG = + org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; private final RpcProviderRegistry rpcRegistry; private final BindingAwareBroker.RpcRegistration registration; + private final DistributedShardFactory distributedShardFactory; + private final DOMDataTreeService domDataTreeService; private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer; private final DOMDataBroker domDataBroker; private final NotificationPublishService notificationPublishService; @@ -75,6 +113,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final SchemaService schemaService; private final ClusterSingletonServiceProvider singletonService; private final DOMRpcProviderService domRpcService; + private final PrefixLeaderHandler prefixLeaderHandler; + private final PrefixShardHandler prefixShardHandler; + private final DOMDataTreeChangeService domDataTreeChangeService; private Map, DOMRpcImplementationRegistration> routedRegistrations = new HashMap<>(); @@ -84,6 +125,13 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private DOMRpcImplementationRegistration globalGetConstantRegistration = null; private ClusterSingletonServiceRegistration getSingletonConstantRegistration; private FlappingSingletonService flappingSingletonService; + private ListenerRegistration dtclReg; + private IdIntsListener idIntsListener; + private Map publishNotificationsTasks = new HashMap<>(); + private ListenerRegistration ddtlReg; + private IdIntsDOMDataTreeLIstener idIntsDdtl; + + public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry, final DOMRpcProviderService domRpcService, @@ -92,7 +140,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer, final NotificationPublishService notificationPublishService, final NotificationService notificationService, - final DOMDataBroker domDataBroker) { + final DOMDataBroker domDataBroker, + final DOMDataTreeService domDataTreeService, + final DistributedShardFactory distributedShardFactory) { this.rpcRegistry = rpcRegistry; this.domRpcService = domRpcService; this.singletonService = singletonService; @@ -101,8 +151,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService this.notificationPublishService = notificationPublishService; this.notificationService = notificationService; this.domDataBroker = domDataBroker; + this.domDataTreeService = domDataTreeService; + this.distributedShardFactory = distributedShardFactory; + this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer); + + domDataTreeChangeService = + (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class); registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); + + prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer); } @Override @@ -132,21 +190,37 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> publishNotifications(final PublishNotificationsInput input) { + public Future> startPublishNotifications(final StartPublishNotificationsInput input) { LOG.debug("publish-notifications, input: {}", input); final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(), input.getSeconds(), input.getNotificationsPerSecond()); - final SettableFuture> settableFuture = SettableFuture.create(); - task.start(settableFuture); + publishNotificationsTasks.put(input.getId(), task); - return settableFuture; + task.start(); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> subscribeDtcl() { - return null; + + if (dtclReg != null) { + final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", + "There is already dataTreeChangeListener registered on id-ints list."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + idIntsListener = new IdIntsListener(); + + dtclReg = domDataTreeChangeService + .registerDataTreeChangeListener( + new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier( + CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INTS_YID), + idIntsListener); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -166,11 +240,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return null; } - @Override - public Future> becomeModuleLeader(BecomeModuleLeaderInput input) { - return null; - } - @Override public Future> removeShardReplica(RemoveShardReplicaInput input) { return null; @@ -194,8 +263,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> becomePrefixLeader(BecomePrefixLeaderInput input) { - return null; + public Future> removePrefixShard(final RemovePrefixShardInput input) { + LOG.debug("remove-prefix-shard, input: {}", input); + + return prefixShardHandler.onRemovePrefixShard(input); + } + + @Override + public Future> becomePrefixLeader(final BecomePrefixLeaderInput input) { + LOG.debug("become-prefix-leader, input: {}", input); + + return prefixLeaderHandler.makeLeaderLocal(input); } @Override @@ -283,7 +361,27 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> subscribeDdtl() { - return null; + + if (ddtlReg != null) { + final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", + "There is already dataTreeChangeListener registered on id-ints list."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + idIntsDdtl = new IdIntsDOMDataTreeLIstener(); + + try { + ddtlReg = + domDataTreeService.registerListener(idIntsDdtl, + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + ProduceTransactionsHandler.ID_INTS_YID)) + , true, Collections.emptyList()); + } catch (DOMDataTreeLoopException e) { + LOG.error("Failed to register DOMDataTreeListener.", e); + + } + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -333,7 +431,49 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> unsubscribeDtcl() { - return null; + LOG.debug("Received unsubscribe-dtcl"); + + if (idIntsListener == null || dtclReg == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction(); + try { + if (dtclReg != null) { + dtclReg.close(); + dtclReg = null; + } + + final Optional> readResult = + rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INTS_YID).checkedGet(); + + if (!readResult.isPresent()) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + + return Futures.immediateFuture( + RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder() + .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build()); + + } catch (final ReadFailedException e) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + + } + } + + @Override + public Future> createPrefixShard(final CreatePrefixShardInput input) { + LOG.debug("create-prefix-shard, input: {}", input); + + return prefixShardHandler.onCreatePrefixShard(input); } @Override @@ -362,8 +502,43 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> produceTransactions(ProduceTransactionsInput input) { - return null; + public Future> checkPublishNotifications( + final CheckPublishNotificationsInput input) { + + final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId()); + + if (task == null) { + return Futures.immediateFuture(RpcResultBuilder.success( + new CheckPublishNotificationsOutputBuilder().setActive(false)).build()); + } + + final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder = + new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished()); + + if (task.getLastError() != null) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw); + task.getLastError().printStackTrace(pw); + checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString()); + } + + final CheckPublishNotificationsOutput output = + checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build(); + + return Futures.immediateFuture(RpcResultBuilder.success(output).build()); + } + + @Override + public Future> produceTransactions(final ProduceTransactionsInput input) { + LOG.debug("producer-transactions, input: {}", input); + + final ProduceTransactionsHandler handler = + new ProduceTransactionsHandler(domDataTreeService, input); + + final SettableFuture> settableFuture = SettableFuture.create(); + handler.start(settableFuture); + + return settableFuture; } @Override @@ -394,6 +569,79 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> unsubscribeDdtl() { - return null; + LOG.debug("Received unsubscribe-ddtl."); + + if (idIntsDdtl == null || ddtlReg == null) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + ddtlReg.close(); + ddtlReg = null; + + final ReadListener readListener = new ReadListener(); + try { + final ListenerRegistration registration = domDataTreeService.registerListener(readListener, + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + ProduceTransactionsHandler.ID_INTS_YID)) + , true, Collections.emptyList()); + + final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get(); + registration.close(); + + if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + + final NormalizedNode lastRead = dataTreeCandidate.getRootNode().getDataAfter().get(); + + return Futures.immediateFuture( + RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder() + .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build()); + + + } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) { + LOG.error("Unable to read data to verify ddtl data.", e); + final RpcError error = RpcResultBuilder.newError( + ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + } + + private static class ReadListener implements DOMDataTreeListener { + + private Collection changes = null; + private SettableFuture readFuture; + + @Override + public synchronized void onDataTreeChanged(@Nonnull final Collection changes, + @Nonnull final Map> subtrees) { + Preconditions.checkArgument(changes.size() == 1); + + if (this.changes == null) { + this.changes = changes; + + readFuture.set(changes.iterator().next()); + } + } + + @Override + public void onDataTreeFailed(@Nonnull final Collection causes) { + LOG.error("Read Listener failed. {}", causes); + } + + public synchronized ListenableFuture getFirstNotif() { + if (changes != null) { + return Futures.immediateFuture(changes.iterator().next()); + } + + readFuture = SettableFuture.create(); + return readFuture; + } } }