BUG 8462: Switch to using cds-client in usubscribe-ddtl 67/57567/1
authorTomas Cere <tcere@cisco.com>
Thu, 18 May 2017 08:34:25 +0000 (10:34 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 18:17:09 +0000 (20:17 +0200)
The initial notification seemed iffy when the leader was moving,
so switch the final data consitency check to cds-clients read
which also makes this more consistent with unsubscribe-dtcl.

Change-Id: Ia23da11a5bda33925ee6ba911d2794f666a17a94
Signed-off-by: Tomas Cere <tcere@cisco.com>
(cherry picked from commit 5daa19c25730a83fa5d0eb510b47ff159fe734fb)

opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml

index 8edbec2..ccb42d9 100644 (file)
                 org.opendaylight.controller.cluster.datastore.persisted;
                 org.opendaylight.controller.cluster.datastore.utils;
                 org.opendaylight.controller.cluster.sharding;
+                org.opendaylight.controller.cluster.databroker.actors.dds;
             </Export-Package>
             <Import-Package>
                 !*snappy;
index 8f67f13..bea3bcd 100644 (file)
@@ -8,20 +8,31 @@
 
 package org.opendaylight.controller.clustering.it.provider;
 
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
@@ -50,8 +61,6 @@ import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
@@ -91,8 +100,8 @@ import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,6 +114,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
     private final DistributedShardFactory distributedShardFactory;
+    private final DistributedDataStoreInterface configDataStore;
     private final DOMDataTreeService domDataTreeService;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
     private final DOMDataBroker domDataBroker;
@@ -116,6 +126,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final PrefixLeaderHandler prefixLeaderHandler;
     private final PrefixShardHandler prefixShardHandler;
     private final DOMDataTreeChangeService domDataTreeChangeService;
+    private final ActorSystem actorSystem;
 
     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
             new HashMap<>();
@@ -142,7 +153,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                                      final NotificationService notificationService,
                                      final DOMDataBroker domDataBroker,
                                      final DOMDataTreeService domDataTreeService,
-                                     final DistributedShardFactory distributedShardFactory) {
+                                     final DistributedShardFactory distributedShardFactory,
+                                     final DistributedDataStoreInterface configDataStore,
+                                     final ActorSystemProvider actorSystemProvider) {
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
@@ -153,6 +166,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         this.domDataBroker = domDataBroker;
         this.domDataTreeService = domDataTreeService;
         this.distributedShardFactory = distributedShardFactory;
+        this.configDataStore = configDataStore;
+        this.actorSystem = actorSystemProvider.getActorSystem();
+
         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
 
         domDataTreeChangeService =
@@ -447,6 +463,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                 dtclReg = null;
             }
 
+
+
             final Optional<NormalizedNode<?, ?>> readResult =
                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
 
@@ -581,67 +599,60 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         ddtlReg.close();
         ddtlReg = null;
 
-        final ReadListener readListener = new ReadListener();
+        final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+        final ActorContext actorContext = configDataStore.getActorContext();
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+                        "Shard-" + shardName, actorContext, shardName);
+
+        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient distributedDataStoreClient;
         try {
-            final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
-                    Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-                            ProduceTransactionsHandler.ID_INT_YID))
-                    , true, Collections.emptyList());
+            distributedDataStoreClient = SimpleDataStoreClientActor
+                    .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.APPLICATION, "Unable to create ds client for read.",
+                    "Unable to create ds client for read.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withRpcError(error).build());
+        }
 
-            final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
-            registration.close();
+        final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+        final ClientTransaction tx = localHistory.createTransaction();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>,
+                org.opendaylight.mdsal.common.api.ReadFailedException> read =
+                tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
 
-            if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
+        tx.abort();
+        localHistory.close();
+        try {
+            final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+            if (!optional.isPresent()) {
+                LOG.warn("Final read from client is empty.");
                 final RpcError error = RpcResultBuilder.newError(
-                        ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
+                        ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
                         .withRpcError(error).build());
             }
 
-            final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
-
             return Futures.immediateFuture(
                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
-                            .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
+                            .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
 
-        } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
+        } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
             LOG.error("Unable to read data to verify ddtl data.", e);
             final RpcError error = RpcResultBuilder.newError(
                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
                     .withRpcError(error).build());
-        }
-    }
-
-    private static class ReadListener implements DOMDataTreeListener {
-
-        private Collection<DataTreeCandidate> changes = null;
-        private SettableFuture<DataTreeCandidate> readFuture;
-
-        @Override
-        public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
-                                      @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
-            Preconditions.checkArgument(changes.size() == 1);
-
-            if (this.changes == null) {
-                this.changes = changes;
-
-                readFuture.set(changes.iterator().next());
-            }
-        }
-
-        @Override
-        public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
-            LOG.error("Read Listener failed. {}", causes);
-        }
-
-        public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
-            if (changes != null) {
-                return Futures.immediateFuture(changes.iterator().next());
-            }
-
-            readFuture = SettableFuture.create();
-            return readFuture;
+        } finally {
+            distributedDataStoreClient.close();
+            clientActor.tell(PoisonPill.getInstance(), noSender());
         }
     }
 }
index 7ce2b8d..875f179 100644 (file)
@@ -57,7 +57,7 @@ public class ProduceTransactionsHandler implements Runnable {
 
     static final QName ID_INTS =
             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
-    static final QName ID_INT =
+    public static final QName ID_INT =
             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
     static final QName ID =
             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
index 43a31df..6e14774 100644 (file)
   <reference id="notificationListenerService" interface="org.opendaylight.controller.md.sal.binding.api.NotificationService" />
   <reference id="domDataTreeService" interface="org.opendaylight.mdsal.dom.api.DOMDataTreeService"/>
   <reference id="distributedShardFactory" interface="org.opendaylight.controller.cluster.sharding.DistributedShardFactory"/>
+  <reference id="configDatastore" interface="org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface"
+             odl:type="distributed-config"/>
+  <reference id="actorSystemProvider" interface="org.opendaylight.controller.cluster.ActorSystemProvider"/>
+
 
   <bean id="purchaseCarProvider" class="org.opendaylight.controller.clustering.it.provider.PurchaseCarProvider" >
     <property name="notificationProvider" ref="notificationService"/>
@@ -61,6 +65,8 @@
     <argument ref="domDataBroker"/>
     <argument ref="domDataTreeService"/>
     <argument ref="distributedShardFactory"/>
+    <argument ref="configDatastore"/>
+    <argument ref="actorSystemProvider"/>
   </bean>
 
 </blueprint>

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.