cbba269d1c0b193a0900bec5481235d695e5f360
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import static akka.actor.ActorRef.noSender;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError;
148 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
149 import org.opendaylight.yangtools.yang.common.RpcResult;
150 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
151 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
152 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
153 import org.slf4j.Logger;
154 import org.slf4j.LoggerFactory;
155 import scala.concurrent.duration.FiniteDuration;
156
157 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
158
159     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
160     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
161             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
162
163     private final RpcProviderRegistry rpcRegistry;
164     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
165     private final DistributedShardFactory distributedShardFactory;
166     private final DistributedDataStoreInterface configDataStore;
167     private final DOMDataTreeService domDataTreeService;
168     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
169     private final DOMDataBroker domDataBroker;
170     private final NotificationPublishService notificationPublishService;
171     private final NotificationService notificationService;
172     private final DOMSchemaService schemaService;
173     private final ClusterSingletonServiceProvider singletonService;
174     private final DOMRpcProviderService domRpcService;
175     private final PrefixLeaderHandler prefixLeaderHandler;
176     private final PrefixShardHandler prefixShardHandler;
177     private final DOMDataTreeChangeService domDataTreeChangeService;
178     private final ActorSystem actorSystem;
179
180     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
181             routedRegistrations = new HashMap<>();
182
183     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
184
185     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
186     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
187     private FlappingSingletonService flappingSingletonService;
188     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
189     private IdIntsListener idIntsListener;
190     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
191     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
192     private IdIntsDOMDataTreeLIstener idIntsDdtl;
193
194
195
196     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
197                                      final DOMRpcProviderService domRpcService,
198                                      final ClusterSingletonServiceProvider singletonService,
199                                      final DOMSchemaService schemaService,
200                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
201                                      final NotificationPublishService notificationPublishService,
202                                      final NotificationService notificationService,
203                                      final DOMDataBroker domDataBroker,
204                                      final DOMDataTreeService domDataTreeService,
205                                      final DistributedShardFactory distributedShardFactory,
206                                      final DistributedDataStoreInterface configDataStore,
207                                      final ActorSystemProvider actorSystemProvider) {
208         this.rpcRegistry = rpcRegistry;
209         this.domRpcService = domRpcService;
210         this.singletonService = singletonService;
211         this.schemaService = schemaService;
212         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
213         this.notificationPublishService = notificationPublishService;
214         this.notificationService = notificationService;
215         this.domDataBroker = domDataBroker;
216         this.domDataTreeService = domDataTreeService;
217         this.distributedShardFactory = distributedShardFactory;
218         this.configDataStore = configDataStore;
219         this.actorSystem = actorSystemProvider.getActorSystem();
220
221         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
222
223         domDataTreeChangeService =
224                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
225
226         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
227
228         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
229                 bindingNormalizedNodeSerializer);
230     }
231
232     @Override
233     @SuppressWarnings("checkstyle:IllegalCatch")
234     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
235             final UnregisterSingletonConstantInput input) {
236         LOG.debug("unregister-singleton-constant");
237
238         if (getSingletonConstantRegistration == null) {
239             LOG.debug("No get-singleton-constant registration present.");
240             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
241                     "No get-singleton-constant rpc registration present.");
242             final RpcResult<UnregisterSingletonConstantOutput> result =
243                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
244             return Futures.immediateFuture(result);
245         }
246
247         try {
248             getSingletonConstantRegistration.close();
249             getSingletonConstantRegistration = null;
250
251             return Futures.immediateFuture(RpcResultBuilder.success(
252                 new UnregisterSingletonConstantOutputBuilder().build()).build());
253         } catch (Exception e) {
254             LOG.debug("There was a problem closing the singleton constant service", e);
255             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
256                     "There was a problem closing get-singleton-constant");
257             final RpcResult<UnregisterSingletonConstantOutput> result =
258                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
259             return Futures.immediateFuture(result);
260         }
261     }
262
263     @Override
264     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
265             final StartPublishNotificationsInput input) {
266         LOG.debug("publish-notifications, input: {}", input);
267
268         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
269                 input.getSeconds(), input.getNotificationsPerSecond());
270
271         publishNotificationsTasks.put(input.getId(), task);
272
273         task.start();
274
275         return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
276             .build());
277     }
278
279     @Override
280     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
281
282         if (dtclReg != null) {
283             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
284                     "There is already dataTreeChangeListener registered on id-ints list.");
285             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
286         }
287
288         idIntsListener = new IdIntsListener();
289
290         dtclReg = domDataTreeChangeService
291                 .registerDataTreeChangeListener(
292                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
293                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
294                         idIntsListener);
295
296         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
297     }
298
299     @Override
300     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
301         LOG.debug("write-transactions, input: {}", input);
302         return WriteTransactionsHandler.start(domDataBroker, input);
303     }
304
305     @Override
306     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
307         return null;
308     }
309
310     @Override
311     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
312             final RemoveShardReplicaInput input) {
313         return null;
314     }
315
316     @Override
317     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
318
319         LOG.debug("subscribe-ynl, input: {}", input);
320
321         if (ynlRegistrations.containsKey(input.getId())) {
322             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
323                     "There is already ynl listener registered for this id: " + input.getId());
324             return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
325         }
326
327         ynlRegistrations.put(input.getId(),
328                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
329
330         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
331     }
332
333     @Override
334     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
335         LOG.debug("remove-prefix-shard, input: {}", input);
336
337         return prefixShardHandler.onRemovePrefixShard(input);
338     }
339
340     @Override
341     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
342             final BecomePrefixLeaderInput input) {
343         LOG.debug("become-prefix-leader, input: {}", input);
344
345         return prefixLeaderHandler.makeLeaderLocal(input);
346     }
347
348     @Override
349     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
350             final UnregisterBoundConstantInput input) {
351         LOG.debug("unregister-bound-constant, {}", input);
352
353         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
354                 routedRegistrations.remove(input.getContext());
355
356         if (rpcRegistration == null) {
357             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
358             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
359                     "No get-constant rpc registration present.");
360             final RpcResult<UnregisterBoundConstantOutput> result =
361                     RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
362             return Futures.immediateFuture(result);
363         }
364
365         rpcRegistration.close();
366         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
367             .build());
368     }
369
370     @Override
371     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
372             final RegisterSingletonConstantInput input) {
373
374         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
375
376         if (input.getConstant() == null) {
377             final RpcError error = RpcResultBuilder.newError(
378                     ErrorType.RPC, "Invalid input.", "Constant value is null");
379             return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
380                 .withRpcError(error).build());
381         }
382
383         getSingletonConstantRegistration =
384                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
385
386         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
387             .build());
388     }
389
390     @Override
391     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
392             final RegisterDefaultConstantInput input) {
393         return null;
394     }
395
396     @Override
397     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
398             final UnregisterConstantInput input) {
399
400         if (globalGetConstantRegistration == null) {
401             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
402                     "No get-constant rpc registration present.");
403             return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
404                 .build());
405         }
406
407         globalGetConstantRegistration.close();
408         globalGetConstantRegistration = null;
409
410         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
411     }
412
413     @Override
414     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
415             final UnregisterFlappingSingletonInput input) {
416         LOG.debug("unregister-flapping-singleton received.");
417
418         if (flappingSingletonService == null) {
419             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
420                     "No flapping-singleton registration present.");
421             final RpcResult<UnregisterFlappingSingletonOutput> result =
422                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
423             return Futures.immediateFuture(result);
424         }
425
426         final long flapCount = flappingSingletonService.setInactive();
427         flappingSingletonService = null;
428
429         final UnregisterFlappingSingletonOutput output =
430                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
431
432         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
433     }
434
435     @Override
436     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
437         return null;
438     }
439
440     @Override
441     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
442
443         if (ddtlReg != null) {
444             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
445                     "There is already dataTreeChangeListener registered on id-ints list.");
446             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
447         }
448
449         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
450
451         try {
452             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
453                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
454                             ProduceTransactionsHandler.ID_INT_YID)),
455                     true, Collections.emptyList());
456         } catch (DOMDataTreeLoopException e) {
457             LOG.error("Failed to register DOMDataTreeListener.", e);
458         }
459
460         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
461     }
462
463     @Override
464     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
465             final RegisterBoundConstantInput input) {
466         LOG.debug("register-bound-constant: {}", input);
467
468         if (input.getContext() == null) {
469             final RpcError error = RpcResultBuilder.newError(
470                     ErrorType.RPC, "Invalid input.", "Context value is null");
471             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
472                 .build());
473         }
474
475         if (input.getConstant() == null) {
476             final RpcError error = RpcResultBuilder.newError(
477                     ErrorType.RPC, "Invalid input.", "Constant value is null");
478             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
479                 .build());
480         }
481
482         if (routedRegistrations.containsKey(input.getContext())) {
483             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
484                     "There is already a rpc registered for context: " + input.getContext());
485             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
486                 .build());
487         }
488
489         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
490                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
491                         input.getConstant(), input.getContext());
492
493         routedRegistrations.put(input.getContext(), rpcRegistration);
494         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
495             .build());
496     }
497
498     @Override
499     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
500             final RegisterFlappingSingletonInput input) {
501         LOG.debug("Received register-flapping-singleton.");
502
503         if (flappingSingletonService != null) {
504             final RpcError error = RpcResultBuilder.newError(
505                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
506             return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
507                 .withRpcError(error).build());
508         }
509
510         flappingSingletonService = new FlappingSingletonService(singletonService);
511
512         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
513             .build());
514     }
515
516     @Override
517     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
518         LOG.debug("Received unsubscribe-dtcl");
519
520         if (idIntsListener == null || dtclReg == null) {
521             final RpcError error = RpcResultBuilder.newError(
522                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
523             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
524                     .withRpcError(error).build());
525         }
526
527         try {
528             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
529         } catch (InterruptedException | ExecutionException | TimeoutException e) {
530             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
531                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
532             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
533                     .withRpcError(error).build());
534         }
535
536         dtclReg.close();
537         dtclReg = null;
538
539         if (!idIntsListener.hasTriggered()) {
540             final RpcError error = RpcResultBuilder.newError(ErrorType.APPLICATION, "operation-failed",
541                     "id-ints listener did not receive any notifications.");
542             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
543                     .withRpcError(error).build());
544         }
545
546         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
547         try {
548             final Optional<NormalizedNode<?, ?>> readResult =
549                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
550
551             if (!readResult.isPresent()) {
552                 final RpcError error = RpcResultBuilder.newError(
553                         ErrorType.APPLICATION, "data-missing", "No data read from id-ints list.");
554                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
555                         .withRpcError(error).build());
556             }
557
558             final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
559             if (!nodesEqual) {
560                 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
561                         idIntsListener.diffWithLocalCopy(readResult.get()));
562             }
563
564             return Futures.immediateFuture(
565                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual)).build());
566
567         } catch (final InterruptedException | ExecutionException e) {
568             final RpcError error = RpcResultBuilder.newError(
569                     ErrorType.APPLICATION, "operation-failed", "Final read from id-ints failed.", null, null, e);
570             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
571                     .withRpcError(error).build());
572
573         }
574     }
575
576     @Override
577     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
578         LOG.debug("create-prefix-shard, input: {}", input);
579
580         return prefixShardHandler.onCreatePrefixShard(input);
581     }
582
583     @Override
584     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
585             final DeconfigureIdIntsShardInput input) {
586         return null;
587     }
588
589     @Override
590     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
591         LOG.debug("Received unsubscribe-ynl, input: {}", input);
592
593         if (!ynlRegistrations.containsKey(input.getId())) {
594             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
595                     "No ynl listener with this id registered.");
596             final RpcResult<UnsubscribeYnlOutput> result =
597                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
598             return Futures.immediateFuture(result);
599         }
600
601         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
602         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
603
604         reg.close();
605
606         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
607     }
608
609     @Override
610     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
611             final CheckPublishNotificationsInput input) {
612
613         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
614
615         if (task == null) {
616             return Futures.immediateFuture(RpcResultBuilder.success(
617                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
618         }
619
620         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
621                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
622
623         if (task.getLastError() != null) {
624             LOG.error("Last error for {}", task, task.getLastError());
625             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
626         }
627
628         final CheckPublishNotificationsOutput output =
629                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
630
631         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
632     }
633
634     @Override
635     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
636             final ProduceTransactionsInput input) {
637         LOG.debug("producer-transactions, input: {}", input);
638         return ProduceTransactionsHandler.start(domDataTreeService, input);
639     }
640
641     @Override
642     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
643             final ShutdownShardReplicaInput input) {
644         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
645
646         final String shardName = input.getShardName();
647         if (Strings.isNullOrEmpty(shardName)) {
648             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
649                     "A valid shard name must be specified");
650             return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
651                 .build());
652         }
653
654         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
655     }
656
657     @Override
658     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
659             final ShutdownPrefixShardReplicaInput input) {
660         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
661
662         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
663
664         if (shardPrefix == null) {
665             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
666                     "A valid shard prefix must be specified");
667             return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
668                 .withRpcError(rpcError).build());
669         }
670
671         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
672         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
673
674         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
675     }
676
677     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
678         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
679         final ActorContext context = configDataStore.getActorContext();
680
681         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
682                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
683         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
684         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
685
686         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
687             @Override
688             public void onComplete(final Throwable throwable, final ActorRef actorRef) {
689                 if (throwable != null) {
690                     shutdownShardAsk.failure(throwable);
691                 } else {
692                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
693                 }
694             }
695         }, context.getClientDispatcher());
696
697         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
698             @Override
699             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
700                 if (throwable != null) {
701                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
702                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
703                     rpcResult.set(failedResult);
704                 } else {
705                     // according to Patterns.gracefulStop API, we don't have to
706                     // check value of gracefulStopResult
707                     rpcResult.set(RpcResultBuilder.success(success).build());
708                 }
709             }
710         }, context.getClientDispatcher());
711         return rpcResult;
712     }
713
714     @Override
715     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
716
717         LOG.debug("Received register-constant rpc, input: {}", input);
718
719         if (input.getConstant() == null) {
720             final RpcError error = RpcResultBuilder.newError(
721                     ErrorType.RPC, "Invalid input.", "Constant value is null");
722             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
723                 .build());
724         }
725
726         if (globalGetConstantRegistration != null) {
727             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
728                     "There is already a get-constant rpc registered.");
729             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
730                 .build());
731         }
732
733         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
734         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
735     }
736
737     @Override
738     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
739             final UnregisterDefaultConstantInput input) {
740         return null;
741     }
742
743     @Override
744     @SuppressWarnings("checkstyle:IllegalCatch")
745     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
746         LOG.debug("Received unsubscribe-ddtl.");
747
748         if (idIntsDdtl == null || ddtlReg == null) {
749             final RpcError error = RpcResultBuilder.newError(
750                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
751             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
752                     .withRpcError(error).build());
753         }
754
755         try {
756             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
757         } catch (InterruptedException | ExecutionException | TimeoutException e) {
758             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
759                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
760             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
761                     .withRpcError(error).build());
762         }
763
764         ddtlReg.close();
765         ddtlReg = null;
766
767         if (!idIntsDdtl.hasTriggered()) {
768             final RpcError error = RpcResultBuilder.newError(
769                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
770                             + "any notifications.");
771             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
772                     .withRpcError(error).build());
773         }
774
775         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
776         LOG.debug("Creating distributed datastore client for shard {}", shardName);
777
778         final ActorContext actorContext = configDataStore.getActorContext();
779         final Props distributedDataStoreClientProps =
780                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
781                         "Shard-" + shardName, actorContext, shardName);
782
783         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
784         final DataStoreClient distributedDataStoreClient;
785         try {
786             distributedDataStoreClient = SimpleDataStoreClientActor
787                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
788         } catch (RuntimeException e) {
789             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
790             clientActor.tell(PoisonPill.getInstance(), noSender());
791             final RpcError error = RpcResultBuilder.newError(
792                     ErrorType.APPLICATION, "Unable to create ds client for read.",
793                     "Unable to create ds client for read.");
794             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
795                     .withRpcError(error).build());
796         }
797
798         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
799         final ClientTransaction tx = localHistory.createTransaction();
800         final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
801                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
802
803         tx.abort();
804         localHistory.close();
805         try {
806             final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
807             if (!optional.isPresent()) {
808                 LOG.warn("Final read from client is empty.");
809                 final RpcError error = RpcResultBuilder.newError(
810                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
811                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
812                         .withRpcError(error).build());
813             }
814
815             return Futures.immediateFuture(
816                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
817                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
818
819         } catch (InterruptedException | ExecutionException e) {
820             LOG.error("Unable to read data to verify ddtl data.", e);
821             final RpcError error = RpcResultBuilder.newError(
822                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
823             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
824                     .withRpcError(error).build());
825         } finally {
826             distributedDataStoreClient.close();
827             clientActor.tell(PoisonPill.getInstance(), noSender());
828         }
829     }
830 }