Adjust to mdsal DOM read/exists FluentFuture change
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import static akka.actor.ActorRef.noSender;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError;
148 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
149 import org.opendaylight.yangtools.yang.common.RpcResult;
150 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
151 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
152 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
153 import org.slf4j.Logger;
154 import org.slf4j.LoggerFactory;
155 import scala.concurrent.duration.FiniteDuration;
156
157 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
158
159     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
160     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
161             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
162
163     private final RpcProviderRegistry rpcRegistry;
164     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
165     private final DistributedShardFactory distributedShardFactory;
166     private final DistributedDataStoreInterface configDataStore;
167     private final DOMDataTreeService domDataTreeService;
168     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
169     private final DOMDataBroker domDataBroker;
170     private final NotificationPublishService notificationPublishService;
171     private final NotificationService notificationService;
172     private final DOMSchemaService schemaService;
173     private final ClusterSingletonServiceProvider singletonService;
174     private final DOMRpcProviderService domRpcService;
175     private final PrefixLeaderHandler prefixLeaderHandler;
176     private final PrefixShardHandler prefixShardHandler;
177     private final DOMDataTreeChangeService domDataTreeChangeService;
178     private final ActorSystem actorSystem;
179
180     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
181             routedRegistrations = new HashMap<>();
182
183     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
184
185     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
186     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
187     private FlappingSingletonService flappingSingletonService;
188     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
189     private IdIntsListener idIntsListener;
190     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
191     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
192     private IdIntsDOMDataTreeLIstener idIntsDdtl;
193
194
195
196     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
197                                      final DOMRpcProviderService domRpcService,
198                                      final ClusterSingletonServiceProvider singletonService,
199                                      final DOMSchemaService schemaService,
200                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
201                                      final NotificationPublishService notificationPublishService,
202                                      final NotificationService notificationService,
203                                      final DOMDataBroker domDataBroker,
204                                      final DOMDataTreeService domDataTreeService,
205                                      final DistributedShardFactory distributedShardFactory,
206                                      final DistributedDataStoreInterface configDataStore,
207                                      final ActorSystemProvider actorSystemProvider) {
208         this.rpcRegistry = rpcRegistry;
209         this.domRpcService = domRpcService;
210         this.singletonService = singletonService;
211         this.schemaService = schemaService;
212         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
213         this.notificationPublishService = notificationPublishService;
214         this.notificationService = notificationService;
215         this.domDataBroker = domDataBroker;
216         this.domDataTreeService = domDataTreeService;
217         this.distributedShardFactory = distributedShardFactory;
218         this.configDataStore = configDataStore;
219         this.actorSystem = actorSystemProvider.getActorSystem();
220
221         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
222
223         domDataTreeChangeService =
224                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
225
226         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
227
228         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
229                 bindingNormalizedNodeSerializer);
230     }
231
232     @Override
233     @SuppressWarnings("checkstyle:IllegalCatch")
234     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
235             final UnregisterSingletonConstantInput input) {
236         LOG.debug("unregister-singleton-constant");
237
238         if (getSingletonConstantRegistration == null) {
239             LOG.debug("No get-singleton-constant registration present.");
240             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
241                     "No get-singleton-constant rpc registration present.");
242             final RpcResult<UnregisterSingletonConstantOutput> result =
243                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
244             return Futures.immediateFuture(result);
245         }
246
247         try {
248             getSingletonConstantRegistration.close();
249             getSingletonConstantRegistration = null;
250
251             return Futures.immediateFuture(RpcResultBuilder.success(
252                 new UnregisterSingletonConstantOutputBuilder().build()).build());
253         } catch (Exception e) {
254             LOG.debug("There was a problem closing the singleton constant service", e);
255             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
256                     "There was a problem closing get-singleton-constant");
257             final RpcResult<UnregisterSingletonConstantOutput> result =
258                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
259             return Futures.immediateFuture(result);
260         }
261     }
262
263     @Override
264     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
265             final StartPublishNotificationsInput input) {
266         LOG.debug("publish-notifications, input: {}", input);
267
268         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
269                 input.getSeconds(), input.getNotificationsPerSecond());
270
271         publishNotificationsTasks.put(input.getId(), task);
272
273         task.start();
274
275         return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
276             .build());
277     }
278
279     @Override
280     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
281
282         if (dtclReg != null) {
283             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
284                     "There is already dataTreeChangeListener registered on id-ints list.");
285             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
286         }
287
288         idIntsListener = new IdIntsListener();
289
290         dtclReg = domDataTreeChangeService
291                 .registerDataTreeChangeListener(
292                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
293                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
294                         idIntsListener);
295
296         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
297     }
298
299     @Override
300     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
301         LOG.debug("write-transactions, input: {}", input);
302         return WriteTransactionsHandler.start(domDataBroker, input);
303     }
304
305     @Override
306     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
307         return null;
308     }
309
310     @Override
311     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
312             final RemoveShardReplicaInput input) {
313         return null;
314     }
315
316     @Override
317     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
318
319         LOG.debug("subscribe-ynl, input: {}", input);
320
321         if (ynlRegistrations.containsKey(input.getId())) {
322             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
323                     "There is already ynl listener registered for this id: " + input.getId());
324             return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
325         }
326
327         ynlRegistrations.put(input.getId(),
328                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
329
330         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
331     }
332
333     @Override
334     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
335         LOG.debug("remove-prefix-shard, input: {}", input);
336
337         return prefixShardHandler.onRemovePrefixShard(input);
338     }
339
340     @Override
341     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
342             final BecomePrefixLeaderInput input) {
343         LOG.debug("become-prefix-leader, input: {}", input);
344
345         return prefixLeaderHandler.makeLeaderLocal(input);
346     }
347
348     @Override
349     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
350             final UnregisterBoundConstantInput input) {
351         LOG.debug("unregister-bound-constant, {}", input);
352
353         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
354                 routedRegistrations.remove(input.getContext());
355
356         if (rpcRegistration == null) {
357             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
358             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
359                     "No get-constant rpc registration present.");
360             final RpcResult<UnregisterBoundConstantOutput> result =
361                     RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
362             return Futures.immediateFuture(result);
363         }
364
365         rpcRegistration.close();
366         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
367             .build());
368     }
369
370     @Override
371     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
372             final RegisterSingletonConstantInput input) {
373
374         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
375
376         if (input.getConstant() == null) {
377             final RpcError error = RpcResultBuilder.newError(
378                     ErrorType.RPC, "Invalid input.", "Constant value is null");
379             return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
380                 .withRpcError(error).build());
381         }
382
383         getSingletonConstantRegistration =
384                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
385
386         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
387             .build());
388     }
389
390     @Override
391     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
392             final RegisterDefaultConstantInput input) {
393         return null;
394     }
395
396     @Override
397     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
398             final UnregisterConstantInput input) {
399
400         if (globalGetConstantRegistration == null) {
401             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
402                     "No get-constant rpc registration present.");
403             return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
404                 .build());
405         }
406
407         globalGetConstantRegistration.close();
408         globalGetConstantRegistration = null;
409
410         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
411     }
412
413     @Override
414     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
415             final UnregisterFlappingSingletonInput input) {
416         LOG.debug("unregister-flapping-singleton received.");
417
418         if (flappingSingletonService == null) {
419             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
420                     "No flapping-singleton registration present.");
421             final RpcResult<UnregisterFlappingSingletonOutput> result =
422                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
423             return Futures.immediateFuture(result);
424         }
425
426         final long flapCount = flappingSingletonService.setInactive();
427         flappingSingletonService = null;
428
429         final UnregisterFlappingSingletonOutput output =
430                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
431
432         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
433     }
434
435     @Override
436     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
437         return null;
438     }
439
440     @Override
441     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
442
443         if (ddtlReg != null) {
444             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
445                     "There is already dataTreeChangeListener registered on id-ints list.");
446             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
447         }
448
449         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
450
451         try {
452             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
453                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
454                             ProduceTransactionsHandler.ID_INT_YID)),
455                     true, Collections.emptyList());
456         } catch (DOMDataTreeLoopException e) {
457             LOG.error("Failed to register DOMDataTreeListener.", e);
458         }
459
460         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
461     }
462
463     @Override
464     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
465             final RegisterBoundConstantInput input) {
466         LOG.debug("register-bound-constant: {}", input);
467
468         if (input.getContext() == null) {
469             final RpcError error = RpcResultBuilder.newError(
470                     ErrorType.RPC, "Invalid input.", "Context value is null");
471             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
472                 .build());
473         }
474
475         if (input.getConstant() == null) {
476             final RpcError error = RpcResultBuilder.newError(
477                     ErrorType.RPC, "Invalid input.", "Constant value is null");
478             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
479                 .build());
480         }
481
482         if (routedRegistrations.containsKey(input.getContext())) {
483             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
484                     "There is already a rpc registered for context: " + input.getContext());
485             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
486                 .build());
487         }
488
489         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
490                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
491                         input.getConstant(), input.getContext());
492
493         routedRegistrations.put(input.getContext(), rpcRegistration);
494         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
495             .build());
496     }
497
498     @Override
499     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
500             final RegisterFlappingSingletonInput input) {
501         LOG.debug("Received register-flapping-singleton.");
502
503         if (flappingSingletonService != null) {
504             final RpcError error = RpcResultBuilder.newError(
505                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
506             return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
507                 .withRpcError(error).build());
508         }
509
510         flappingSingletonService = new FlappingSingletonService(singletonService);
511
512         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
513             .build());
514     }
515
516     @Override
517     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
518         LOG.debug("Received unsubscribe-dtcl");
519
520         if (idIntsListener == null || dtclReg == null) {
521             final RpcError error = RpcResultBuilder.newError(
522                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
523             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
524                     .withRpcError(error).build());
525         }
526
527         try {
528             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
529         } catch (InterruptedException | ExecutionException | TimeoutException e) {
530             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
531                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
532             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
533                     .withRpcError(error).build());
534         }
535
536         dtclReg.close();
537         dtclReg = null;
538
539         if (!idIntsListener.hasTriggered()) {
540             final RpcError error = RpcResultBuilder.newError(
541                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
542                             + "any notifications.");
543             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
544                     .withRpcError(error).build());
545         }
546
547         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
548         try {
549             final Optional<NormalizedNode<?, ?>> readResult =
550                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
551
552             if (!readResult.isPresent()) {
553                 final RpcError error = RpcResultBuilder.newError(
554                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
555                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
556                         .withRpcError(error).build());
557             }
558
559             return Futures.immediateFuture(
560                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
561                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
562
563         } catch (final InterruptedException | ExecutionException e) {
564             final RpcError error = RpcResultBuilder.newError(
565                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
566             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
567                     .withRpcError(error).build());
568
569         }
570     }
571
572     @Override
573     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
574         LOG.debug("create-prefix-shard, input: {}", input);
575
576         return prefixShardHandler.onCreatePrefixShard(input);
577     }
578
579     @Override
580     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
581             final DeconfigureIdIntsShardInput input) {
582         return null;
583     }
584
585     @Override
586     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
587         LOG.debug("Received unsubscribe-ynl, input: {}", input);
588
589         if (!ynlRegistrations.containsKey(input.getId())) {
590             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
591                     "No ynl listener with this id registered.");
592             final RpcResult<UnsubscribeYnlOutput> result =
593                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
594             return Futures.immediateFuture(result);
595         }
596
597         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
598         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
599
600         reg.close();
601
602         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
603     }
604
605     @Override
606     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
607             final CheckPublishNotificationsInput input) {
608
609         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
610
611         if (task == null) {
612             return Futures.immediateFuture(RpcResultBuilder.success(
613                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
614         }
615
616         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
617                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
618
619         if (task.getLastError() != null) {
620             LOG.error("Last error for {}", task, task.getLastError());
621             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
622         }
623
624         final CheckPublishNotificationsOutput output =
625                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
626
627         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
628     }
629
630     @Override
631     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
632             final ProduceTransactionsInput input) {
633         LOG.debug("producer-transactions, input: {}", input);
634         return ProduceTransactionsHandler.start(domDataTreeService, input);
635     }
636
637     @Override
638     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
639             final ShutdownShardReplicaInput input) {
640         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
641
642         final String shardName = input.getShardName();
643         if (Strings.isNullOrEmpty(shardName)) {
644             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
645                     "A valid shard name must be specified");
646             return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
647                 .build());
648         }
649
650         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
651     }
652
653     @Override
654     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
655             final ShutdownPrefixShardReplicaInput input) {
656         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
657
658         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
659
660         if (shardPrefix == null) {
661             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
662                     "A valid shard prefix must be specified");
663             return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
664                 .withRpcError(rpcError).build());
665         }
666
667         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
668         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
669
670         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
671     }
672
673     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
674         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
675         final ActorContext context = configDataStore.getActorContext();
676
677         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
678                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
679         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
680         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
681
682         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
683             @Override
684             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
685                 if (throwable != null) {
686                     shutdownShardAsk.failure(throwable);
687                 } else {
688                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
689                 }
690             }
691         }, context.getClientDispatcher());
692
693         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
694             @Override
695             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
696                 if (throwable != null) {
697                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
698                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
699                     rpcResult.set(failedResult);
700                 } else {
701                     // according to Patterns.gracefulStop API, we don't have to
702                     // check value of gracefulStopResult
703                     rpcResult.set(RpcResultBuilder.success(success).build());
704                 }
705             }
706         }, context.getClientDispatcher());
707         return rpcResult;
708     }
709
710     @Override
711     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
712
713         LOG.debug("Received register-constant rpc, input: {}", input);
714
715         if (input.getConstant() == null) {
716             final RpcError error = RpcResultBuilder.newError(
717                     ErrorType.RPC, "Invalid input.", "Constant value is null");
718             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
719                 .build());
720         }
721
722         if (globalGetConstantRegistration != null) {
723             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
724                     "There is already a get-constant rpc registered.");
725             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
726                 .build());
727         }
728
729         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
730         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
731     }
732
733     @Override
734     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
735             final UnregisterDefaultConstantInput input) {
736         return null;
737     }
738
739     @Override
740     @SuppressWarnings("checkstyle:IllegalCatch")
741     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
742         LOG.debug("Received unsubscribe-ddtl.");
743
744         if (idIntsDdtl == null || ddtlReg == null) {
745             final RpcError error = RpcResultBuilder.newError(
746                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
747             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
748                     .withRpcError(error).build());
749         }
750
751         try {
752             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
753         } catch (InterruptedException | ExecutionException | TimeoutException e) {
754             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
755                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
756             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
757                     .withRpcError(error).build());
758         }
759
760         ddtlReg.close();
761         ddtlReg = null;
762
763         if (!idIntsDdtl.hasTriggered()) {
764             final RpcError error = RpcResultBuilder.newError(
765                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
766                             + "any notifications.");
767             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
768                     .withRpcError(error).build());
769         }
770
771         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
772         LOG.debug("Creating distributed datastore client for shard {}", shardName);
773
774         final ActorContext actorContext = configDataStore.getActorContext();
775         final Props distributedDataStoreClientProps =
776                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
777                         "Shard-" + shardName, actorContext, shardName);
778
779         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
780         final DataStoreClient distributedDataStoreClient;
781         try {
782             distributedDataStoreClient = SimpleDataStoreClientActor
783                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
784         } catch (RuntimeException e) {
785             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
786             clientActor.tell(PoisonPill.getInstance(), noSender());
787             final RpcError error = RpcResultBuilder.newError(
788                     ErrorType.APPLICATION, "Unable to create ds client for read.",
789                     "Unable to create ds client for read.");
790             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
791                     .withRpcError(error).build());
792         }
793
794         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
795         final ClientTransaction tx = localHistory.createTransaction();
796         final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
797                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
798
799         tx.abort();
800         localHistory.close();
801         try {
802             final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
803             if (!optional.isPresent()) {
804                 LOG.warn("Final read from client is empty.");
805                 final RpcError error = RpcResultBuilder.newError(
806                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
807                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
808                         .withRpcError(error).build());
809             }
810
811             return Futures.immediateFuture(
812                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
813                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
814
815         } catch (InterruptedException | ExecutionException e) {
816             LOG.error("Unable to read data to verify ddtl data.", e);
817             final RpcError error = RpcResultBuilder.newError(
818                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
819             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
820                     .withRpcError(error).build());
821         } finally {
822             distributedDataStoreClient.close();
823             clientActor.tell(PoisonPill.getInstance(), noSender());
824         }
825     }
826 }