Bug 8621 - Add shutdown-shard-replica rpc to MdsalLowLevelTestProvider
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
index 15c9d8ea60f6a4bde6d07cca820ee40824c5329d..f0569fa07dc4e8842d92504168b117832b99acec 100644 (file)
@@ -8,23 +8,41 @@
 
 package org.opendaylight.controller.clustering.it.provider;
 
-import com.google.common.base.Preconditions;
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
+import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
+import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
@@ -34,7 +52,11 @@ import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactions
 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
@@ -43,8 +65,6 @@ import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
@@ -65,6 +85,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
@@ -73,6 +94,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
@@ -83,18 +105,22 @@ import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
 
     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
+    private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
+            org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
     private final DistributedShardFactory distributedShardFactory;
+    private final DistributedDataStoreInterface configDataStore;
     private final DOMDataTreeService domDataTreeService;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
     private final DOMDataBroker domDataBroker;
@@ -103,7 +129,10 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final SchemaService schemaService;
     private final ClusterSingletonServiceProvider singletonService;
     private final DOMRpcProviderService domRpcService;
+    private final PrefixLeaderHandler prefixLeaderHandler;
     private final PrefixShardHandler prefixShardHandler;
+    private final DOMDataTreeChangeService domDataTreeChangeService;
+    private final ActorSystem actorSystem;
 
     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
             new HashMap<>();
@@ -113,10 +142,14 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
+    private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
+    private IdIntsListener idIntsListener;
     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
     private IdIntsDOMDataTreeLIstener idIntsDdtl;
 
+
+
     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
                                      final DOMRpcProviderService domRpcService,
                                      final ClusterSingletonServiceProvider singletonService,
@@ -126,7 +159,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                                      final NotificationService notificationService,
                                      final DOMDataBroker domDataBroker,
                                      final DOMDataTreeService domDataTreeService,
-                                     final DistributedShardFactory distributedShardFactory) {
+                                     final DistributedShardFactory distributedShardFactory,
+                                     final DistributedDataStoreInterface configDataStore,
+                                     final ActorSystemProvider actorSystemProvider) {
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
@@ -137,10 +172,18 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         this.domDataBroker = domDataBroker;
         this.domDataTreeService = domDataTreeService;
         this.distributedShardFactory = distributedShardFactory;
+        this.configDataStore = configDataStore;
+        this.actorSystem = actorSystemProvider.getActorSystem();
+
+        this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
+
+        domDataTreeChangeService =
+                (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
 
         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
 
-        prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
+        prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
+                bindingNormalizedNodeSerializer);
     }
 
     @Override
@@ -185,7 +228,22 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
     @Override
     public Future<RpcResult<Void>> subscribeDtcl() {
-        return null;
+
+        if (dtclReg != null) {
+            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
+                    "There is already dataTreeChangeListener registered on id-ints list.");
+            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+        }
+
+        idIntsListener = new IdIntsListener();
+
+        dtclReg = domDataTreeChangeService
+                .registerDataTreeChangeListener(
+                        new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
+                                CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
+                        idIntsListener);
+
+        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
     }
 
     @Override
@@ -236,7 +294,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
     @Override
     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
-        return null;
+        LOG.debug("become-prefix-leader, input: {}", input);
+
+        return prefixLeaderHandler.makeLeaderLocal(input);
     }
 
     @Override
@@ -337,7 +397,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
             ddtlReg =
                     domDataTreeService.registerListener(idIntsDdtl,
                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-                                    ProduceTransactionsHandler.ID_INTS_YID))
+                                    ProduceTransactionsHandler.ID_INT_YID))
                             , true, Collections.emptyList());
         } catch (DOMDataTreeLoopException e) {
             LOG.error("Failed to register DOMDataTreeListener.", e);
@@ -383,7 +443,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
         if (flappingSingletonService != null) {
             final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
+                    ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
         }
 
@@ -394,7 +454,48 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
     @Override
     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
-        return null;
+        LOG.debug("Received unsubscribe-dtcl");
+
+        if (idIntsListener == null || dtclReg == null) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
+        }
+
+        dtclReg.close();
+        dtclReg = null;
+
+        if (!idIntsListener.hasTriggered()) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+                            "any notifications.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+                    .withRpcError(error).build());
+        }
+
+        final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
+        try {
+            final Optional<NormalizedNode<?, ?>> readResult =
+                    rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
+
+            if (!readResult.isPresent()) {
+                final RpcError error = RpcResultBuilder.newError(
+                        ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
+                return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+                        .withRpcError(error).build());
+            }
+
+            return Futures.immediateFuture(
+                    RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
+                            .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
+
+        } catch (final ReadFailedException e) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+                    .withRpcError(error).build());
+
+        }
     }
 
     @Override
@@ -469,6 +570,54 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         return settableFuture;
     }
 
+    @Override
+    public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
+        LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
+
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+                    "A valid shard name must be specified");
+            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+        }
+
+        final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
+        final ActorContext context = configDataStore.getActorContext();
+
+        long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+                .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+        final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+        final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+        context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
+                if (throwable != null) {
+                    shutdownShardAsk.failure(throwable);
+                } else {
+                    shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+                }
+            }
+        }, context.getClientDispatcher());
+
+        shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
+                if (throwable != null) {
+                    final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
+                            .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+                    rpcResult.set(failedResult);
+                } else {
+                    // according to Patterns.gracefulStop API, we don't have to
+                    // check value of gracefulStopResult
+                    rpcResult.set(RpcResultBuilder.<Void>success().build());
+                }
+            }
+        }, context.getClientDispatcher());
+
+        return rpcResult;
+    }
+
     @Override
     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
 
@@ -508,68 +657,68 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         ddtlReg.close();
         ddtlReg = null;
 
-        final ReadListener readListener = new ReadListener();
+        if (!idIntsDdtl.hasTriggered()) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+                            "any notifications.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withRpcError(error).build());
+        }
+
+        final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+        final ActorContext actorContext = configDataStore.getActorContext();
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+                        "Shard-" + shardName, actorContext, shardName);
+
+        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient distributedDataStoreClient;
         try {
-            final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
-                    Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-                            ProduceTransactionsHandler.ID_INTS_YID))
-                    , true, Collections.emptyList());
+            distributedDataStoreClient = SimpleDataStoreClientActor
+                    .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.APPLICATION, "Unable to create ds client for read.",
+                    "Unable to create ds client for read.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withRpcError(error).build());
+        }
 
-            final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
-            registration.close();
+        final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+        final ClientTransaction tx = localHistory.createTransaction();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>,
+                org.opendaylight.mdsal.common.api.ReadFailedException> read =
+                tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
 
-            if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
+        tx.abort();
+        localHistory.close();
+        try {
+            final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+            if (!optional.isPresent()) {
+                LOG.warn("Final read from client is empty.");
                 final RpcError error = RpcResultBuilder.newError(
-                        ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
+                        ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
                         .withRpcError(error).build());
             }
 
-            final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
-
             return Futures.immediateFuture(
                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
-                            .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
-
+                            .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
 
-        } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
+        } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
             LOG.error("Unable to read data to verify ddtl data.", e);
             final RpcError error = RpcResultBuilder.newError(
                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
                     .withRpcError(error).build());
-        }
-    }
-
-    private static class ReadListener implements DOMDataTreeListener {
-
-        private Collection<DataTreeCandidate> changes = null;
-        private SettableFuture<DataTreeCandidate> readFuture;
-
-        @Override
-        public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
-                                      @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
-            Preconditions.checkArgument(changes.size() == 1);
-
-            if (this.changes == null) {
-                this.changes = changes;
-
-                readFuture.set(changes.iterator().next());
-            }
-        }
-
-        @Override
-        public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
-            LOG.error("Read Listener failed. {}", causes);
-        }
-
-        public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
-            if (changes != null) {
-                return Futures.immediateFuture(changes.iterator().next());
-            }
-
-            readFuture = SettableFuture.create();
-            return readFuture;
+        } finally {
+            distributedDataStoreClient.close();
+            clientActor.tell(PoisonPill.getInstance(), noSender());
         }
     }
 }