Remove deprecated controller.sal.core.api interfaces
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import org.opendaylight.controller.cluster.ActorSystemProvider;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
35 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
38 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
39 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
40 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
41 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
48 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
51 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
59 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
60 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
61 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
62 import org.opendaylight.controller.sal.core.api.model.SchemaService;
63 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
64 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
66 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
69 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
145 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
146 import org.opendaylight.yangtools.concepts.ListenerRegistration;
147 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
148 import org.opendaylight.yangtools.yang.common.RpcError;
149 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
150 import org.opendaylight.yangtools.yang.common.RpcResult;
151 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
152 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
153 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
154 import org.slf4j.Logger;
155 import org.slf4j.LoggerFactory;
156 import scala.concurrent.duration.FiniteDuration;
157
158 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
159
160     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
161     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
162             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
163
164     private final RpcProviderRegistry rpcRegistry;
165     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
166     private final DistributedShardFactory distributedShardFactory;
167     private final DistributedDataStoreInterface configDataStore;
168     private final DOMDataTreeService domDataTreeService;
169     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
170     private final DOMDataBroker domDataBroker;
171     private final NotificationPublishService notificationPublishService;
172     private final NotificationService notificationService;
173     private final SchemaService schemaService;
174     private final ClusterSingletonServiceProvider singletonService;
175     private final DOMRpcProviderService domRpcService;
176     private final PrefixLeaderHandler prefixLeaderHandler;
177     private final PrefixShardHandler prefixShardHandler;
178     private final DOMDataTreeChangeService domDataTreeChangeService;
179     private final ActorSystem actorSystem;
180
181     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
182             routedRegistrations = new HashMap<>();
183
184     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
185
186     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
187     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
188     private FlappingSingletonService flappingSingletonService;
189     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
190     private IdIntsListener idIntsListener;
191     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
192     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
193     private IdIntsDOMDataTreeLIstener idIntsDdtl;
194
195
196
197     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
198                                      final DOMRpcProviderService domRpcService,
199                                      final ClusterSingletonServiceProvider singletonService,
200                                      final SchemaService schemaService,
201                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
202                                      final NotificationPublishService notificationPublishService,
203                                      final NotificationService notificationService,
204                                      final DOMDataBroker domDataBroker,
205                                      final DOMDataTreeService domDataTreeService,
206                                      final DistributedShardFactory distributedShardFactory,
207                                      final DistributedDataStoreInterface configDataStore,
208                                      final ActorSystemProvider actorSystemProvider) {
209         this.rpcRegistry = rpcRegistry;
210         this.domRpcService = domRpcService;
211         this.singletonService = singletonService;
212         this.schemaService = schemaService;
213         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
214         this.notificationPublishService = notificationPublishService;
215         this.notificationService = notificationService;
216         this.domDataBroker = domDataBroker;
217         this.domDataTreeService = domDataTreeService;
218         this.distributedShardFactory = distributedShardFactory;
219         this.configDataStore = configDataStore;
220         this.actorSystem = actorSystemProvider.getActorSystem();
221
222         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
223
224         domDataTreeChangeService =
225                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
226
227         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
228
229         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
230                 bindingNormalizedNodeSerializer);
231     }
232
233     @Override
234     @SuppressWarnings("checkstyle:IllegalCatch")
235     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
236             final UnregisterSingletonConstantInput input) {
237         LOG.debug("unregister-singleton-constant");
238
239         if (getSingletonConstantRegistration == null) {
240             LOG.debug("No get-singleton-constant registration present.");
241             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
242                     "No get-singleton-constant rpc registration present.");
243             final RpcResult<UnregisterSingletonConstantOutput> result =
244                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
245             return Futures.immediateFuture(result);
246         }
247
248         try {
249             getSingletonConstantRegistration.close();
250             getSingletonConstantRegistration = null;
251
252             return Futures.immediateFuture(RpcResultBuilder.success(
253                 new UnregisterSingletonConstantOutputBuilder().build()).build());
254         } catch (Exception e) {
255             LOG.debug("There was a problem closing the singleton constant service", e);
256             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
257                     "There was a problem closing get-singleton-constant");
258             final RpcResult<UnregisterSingletonConstantOutput> result =
259                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
260             return Futures.immediateFuture(result);
261         }
262     }
263
264     @Override
265     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
266             final StartPublishNotificationsInput input) {
267         LOG.debug("publish-notifications, input: {}", input);
268
269         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
270                 input.getSeconds(), input.getNotificationsPerSecond());
271
272         publishNotificationsTasks.put(input.getId(), task);
273
274         task.start();
275
276         return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
277             .build());
278     }
279
280     @Override
281     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
282
283         if (dtclReg != null) {
284             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
285                     "There is already dataTreeChangeListener registered on id-ints list.");
286             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
287         }
288
289         idIntsListener = new IdIntsListener();
290
291         dtclReg = domDataTreeChangeService
292                 .registerDataTreeChangeListener(
293                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
294                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
295                         idIntsListener);
296
297         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
298     }
299
300     @Override
301     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
302         LOG.debug("write-transactions, input: {}", input);
303         return WriteTransactionsHandler.start(domDataBroker, input);
304     }
305
306     @Override
307     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
308         return null;
309     }
310
311     @Override
312     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
313             final RemoveShardReplicaInput input) {
314         return null;
315     }
316
317     @Override
318     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
319
320         LOG.debug("subscribe-ynl, input: {}", input);
321
322         if (ynlRegistrations.containsKey(input.getId())) {
323             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
324                     "There is already ynl listener registered for this id: " + input.getId());
325             return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
326         }
327
328         ynlRegistrations.put(input.getId(),
329                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
330
331         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
332     }
333
334     @Override
335     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
336         LOG.debug("remove-prefix-shard, input: {}", input);
337
338         return prefixShardHandler.onRemovePrefixShard(input);
339     }
340
341     @Override
342     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
343             final BecomePrefixLeaderInput input) {
344         LOG.debug("become-prefix-leader, input: {}", input);
345
346         return prefixLeaderHandler.makeLeaderLocal(input);
347     }
348
349     @Override
350     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
351             final UnregisterBoundConstantInput input) {
352         LOG.debug("unregister-bound-constant, {}", input);
353
354         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
355                 routedRegistrations.remove(input.getContext());
356
357         if (rpcRegistration == null) {
358             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
359             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
360                     "No get-constant rpc registration present.");
361             final RpcResult<UnregisterBoundConstantOutput> result =
362                     RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
363             return Futures.immediateFuture(result);
364         }
365
366         rpcRegistration.close();
367         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
368             .build());
369     }
370
371     @Override
372     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
373             final RegisterSingletonConstantInput input) {
374
375         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
376
377         if (input.getConstant() == null) {
378             final RpcError error = RpcResultBuilder.newError(
379                     ErrorType.RPC, "Invalid input.", "Constant value is null");
380             return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
381                 .withRpcError(error).build());
382         }
383
384         getSingletonConstantRegistration =
385                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
386
387         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
388             .build());
389     }
390
391     @Override
392     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
393             final RegisterDefaultConstantInput input) {
394         return null;
395     }
396
397     @Override
398     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
399             final UnregisterConstantInput input) {
400
401         if (globalGetConstantRegistration == null) {
402             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
403                     "No get-constant rpc registration present.");
404             return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
405                 .build());
406         }
407
408         globalGetConstantRegistration.close();
409         globalGetConstantRegistration = null;
410
411         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
412     }
413
414     @Override
415     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
416             final UnregisterFlappingSingletonInput input) {
417         LOG.debug("unregister-flapping-singleton received.");
418
419         if (flappingSingletonService == null) {
420             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
421                     "No flapping-singleton registration present.");
422             final RpcResult<UnregisterFlappingSingletonOutput> result =
423                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
424             return Futures.immediateFuture(result);
425         }
426
427         final long flapCount = flappingSingletonService.setInactive();
428         flappingSingletonService = null;
429
430         final UnregisterFlappingSingletonOutput output =
431                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
432
433         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
434     }
435
436     @Override
437     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
438         return null;
439     }
440
441     @Override
442     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
443
444         if (ddtlReg != null) {
445             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
446                     "There is already dataTreeChangeListener registered on id-ints list.");
447             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
448         }
449
450         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
451
452         try {
453             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
454                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
455                             ProduceTransactionsHandler.ID_INT_YID)),
456                     true, Collections.emptyList());
457         } catch (DOMDataTreeLoopException e) {
458             LOG.error("Failed to register DOMDataTreeListener.", e);
459         }
460
461         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
462     }
463
464     @Override
465     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
466             final RegisterBoundConstantInput input) {
467         LOG.debug("register-bound-constant: {}", input);
468
469         if (input.getContext() == null) {
470             final RpcError error = RpcResultBuilder.newError(
471                     ErrorType.RPC, "Invalid input.", "Context value is null");
472             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
473                 .build());
474         }
475
476         if (input.getConstant() == null) {
477             final RpcError error = RpcResultBuilder.newError(
478                     ErrorType.RPC, "Invalid input.", "Constant value is null");
479             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
480                 .build());
481         }
482
483         if (routedRegistrations.containsKey(input.getContext())) {
484             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
485                     "There is already a rpc registered for context: " + input.getContext());
486             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
487                 .build());
488         }
489
490         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
491                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
492                         input.getConstant(), input.getContext());
493
494         routedRegistrations.put(input.getContext(), rpcRegistration);
495         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
496             .build());
497     }
498
499     @Override
500     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
501             final RegisterFlappingSingletonInput input) {
502         LOG.debug("Received register-flapping-singleton.");
503
504         if (flappingSingletonService != null) {
505             final RpcError error = RpcResultBuilder.newError(
506                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
507             return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
508                 .withRpcError(error).build());
509         }
510
511         flappingSingletonService = new FlappingSingletonService(singletonService);
512
513         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
514             .build());
515     }
516
517     @Override
518     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
519         LOG.debug("Received unsubscribe-dtcl");
520
521         if (idIntsListener == null || dtclReg == null) {
522             final RpcError error = RpcResultBuilder.newError(
523                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
524             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
525                     .withRpcError(error).build());
526         }
527
528         try {
529             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
530         } catch (InterruptedException | ExecutionException | TimeoutException e) {
531             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
532                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
533             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
534                     .withRpcError(error).build());
535         }
536
537         dtclReg.close();
538         dtclReg = null;
539
540         if (!idIntsListener.hasTriggered()) {
541             final RpcError error = RpcResultBuilder.newError(
542                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
543                             + "any notifications.");
544             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
545                     .withRpcError(error).build());
546         }
547
548         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
549         try {
550             final Optional<NormalizedNode<?, ?>> readResult =
551                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
552
553             if (!readResult.isPresent()) {
554                 final RpcError error = RpcResultBuilder.newError(
555                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
556                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
557                         .withRpcError(error).build());
558             }
559
560             return Futures.immediateFuture(
561                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
562                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
563
564         } catch (final InterruptedException | ExecutionException e) {
565             final RpcError error = RpcResultBuilder.newError(
566                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
567             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
568                     .withRpcError(error).build());
569
570         }
571     }
572
573     @Override
574     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
575         LOG.debug("create-prefix-shard, input: {}", input);
576
577         return prefixShardHandler.onCreatePrefixShard(input);
578     }
579
580     @Override
581     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
582             final DeconfigureIdIntsShardInput input) {
583         return null;
584     }
585
586     @Override
587     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
588         LOG.debug("Received unsubscribe-ynl, input: {}", input);
589
590         if (!ynlRegistrations.containsKey(input.getId())) {
591             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
592                     "No ynl listener with this id registered.");
593             final RpcResult<UnsubscribeYnlOutput> result =
594                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
595             return Futures.immediateFuture(result);
596         }
597
598         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
599         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
600
601         reg.close();
602
603         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
604     }
605
606     @Override
607     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
608             final CheckPublishNotificationsInput input) {
609
610         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
611
612         if (task == null) {
613             return Futures.immediateFuture(RpcResultBuilder.success(
614                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
615         }
616
617         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
618                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
619
620         if (task.getLastError() != null) {
621             LOG.error("Last error for {}", task, task.getLastError());
622             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
623         }
624
625         final CheckPublishNotificationsOutput output =
626                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
627
628         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
629     }
630
631     @Override
632     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
633             final ProduceTransactionsInput input) {
634         LOG.debug("producer-transactions, input: {}", input);
635         return ProduceTransactionsHandler.start(domDataTreeService, input);
636     }
637
638     @Override
639     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
640             final ShutdownShardReplicaInput input) {
641         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
642
643         final String shardName = input.getShardName();
644         if (Strings.isNullOrEmpty(shardName)) {
645             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
646                     "A valid shard name must be specified");
647             return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
648                 .build());
649         }
650
651         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
652     }
653
654     @Override
655     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
656             final ShutdownPrefixShardReplicaInput input) {
657         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
658
659         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
660
661         if (shardPrefix == null) {
662             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
663                     "A valid shard prefix must be specified");
664             return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
665                 .withRpcError(rpcError).build());
666         }
667
668         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
669         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
670
671         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
672     }
673
674     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
675         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
676         final ActorContext context = configDataStore.getActorContext();
677
678         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
679                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
680         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
681         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
682
683         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
684             @Override
685             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
686                 if (throwable != null) {
687                     shutdownShardAsk.failure(throwable);
688                 } else {
689                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
690                 }
691             }
692         }, context.getClientDispatcher());
693
694         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
695             @Override
696             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
697                 if (throwable != null) {
698                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
699                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
700                     rpcResult.set(failedResult);
701                 } else {
702                     // according to Patterns.gracefulStop API, we don't have to
703                     // check value of gracefulStopResult
704                     rpcResult.set(RpcResultBuilder.success(success).build());
705                 }
706             }
707         }, context.getClientDispatcher());
708         return rpcResult;
709     }
710
711     @Override
712     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
713
714         LOG.debug("Received register-constant rpc, input: {}", input);
715
716         if (input.getConstant() == null) {
717             final RpcError error = RpcResultBuilder.newError(
718                     ErrorType.RPC, "Invalid input.", "Constant value is null");
719             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
720                 .build());
721         }
722
723         if (globalGetConstantRegistration != null) {
724             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
725                     "There is already a get-constant rpc registered.");
726             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
727                 .build());
728         }
729
730         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
731         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
732     }
733
734     @Override
735     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
736             final UnregisterDefaultConstantInput input) {
737         return null;
738     }
739
740     @Override
741     @SuppressWarnings("checkstyle:IllegalCatch")
742     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
743         LOG.debug("Received unsubscribe-ddtl.");
744
745         if (idIntsDdtl == null || ddtlReg == null) {
746             final RpcError error = RpcResultBuilder.newError(
747                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
748             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
749                     .withRpcError(error).build());
750         }
751
752         try {
753             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
754         } catch (InterruptedException | ExecutionException | TimeoutException e) {
755             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
756                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
757             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
758                     .withRpcError(error).build());
759         }
760
761         ddtlReg.close();
762         ddtlReg = null;
763
764         if (!idIntsDdtl.hasTriggered()) {
765             final RpcError error = RpcResultBuilder.newError(
766                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
767                             + "any notifications.");
768             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
769                     .withRpcError(error).build());
770         }
771
772         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
773         LOG.debug("Creating distributed datastore client for shard {}", shardName);
774
775         final ActorContext actorContext = configDataStore.getActorContext();
776         final Props distributedDataStoreClientProps =
777                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
778                         "Shard-" + shardName, actorContext, shardName);
779
780         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
781         final DataStoreClient distributedDataStoreClient;
782         try {
783             distributedDataStoreClient = SimpleDataStoreClientActor
784                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
785         } catch (RuntimeException e) {
786             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
787             clientActor.tell(PoisonPill.getInstance(), noSender());
788             final RpcError error = RpcResultBuilder.newError(
789                     ErrorType.APPLICATION, "Unable to create ds client for read.",
790                     "Unable to create ds client for read.");
791             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
792                     .withRpcError(error).build());
793         }
794
795         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
796         final ClientTransaction tx = localHistory.createTransaction();
797         final ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
798                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
799
800         tx.abort();
801         localHistory.close();
802         try {
803             final Optional<NormalizedNode<?, ?>> optional = read.get();
804             if (!optional.isPresent()) {
805                 LOG.warn("Final read from client is empty.");
806                 final RpcError error = RpcResultBuilder.newError(
807                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
808                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
809                         .withRpcError(error).build());
810             }
811
812             return Futures.immediateFuture(
813                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
814                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
815
816         } catch (InterruptedException | ExecutionException e) {
817             LOG.error("Unable to read data to verify ddtl data.", e);
818             final RpcError error = RpcResultBuilder.newError(
819                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
820             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
821                     .withRpcError(error).build());
822         } finally {
823             distributedDataStoreClient.close();
824             clientActor.tell(PoisonPill.getInstance(), noSender());
825         }
826     }
827 }