92bacd1f89aacfdd67b12dfd299dce3fdaed64ab
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import static akka.actor.ActorRef.noSender;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError;
148 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
149 import org.opendaylight.yangtools.yang.common.RpcResult;
150 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
151 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
152 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
153 import org.slf4j.Logger;
154 import org.slf4j.LoggerFactory;
155 import scala.concurrent.duration.FiniteDuration;
156
157 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
158
159     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
160     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
161             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
162
163     private final RpcProviderRegistry rpcRegistry;
164     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
165     private final DistributedShardFactory distributedShardFactory;
166     private final DistributedDataStoreInterface configDataStore;
167     private final DOMDataTreeService domDataTreeService;
168     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
169     private final DOMDataBroker domDataBroker;
170     private final NotificationPublishService notificationPublishService;
171     private final NotificationService notificationService;
172     private final DOMSchemaService schemaService;
173     private final ClusterSingletonServiceProvider singletonService;
174     private final DOMRpcProviderService domRpcService;
175     private final PrefixLeaderHandler prefixLeaderHandler;
176     private final PrefixShardHandler prefixShardHandler;
177     private final DOMDataTreeChangeService domDataTreeChangeService;
178     private final ActorSystem actorSystem;
179
180     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
181             routedRegistrations = new HashMap<>();
182
183     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
184
185     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
186     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
187     private FlappingSingletonService flappingSingletonService;
188     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
189     private IdIntsListener idIntsListener;
190     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
191     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
192     private IdIntsDOMDataTreeLIstener idIntsDdtl;
193
194
195
196     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
197                                      final DOMRpcProviderService domRpcService,
198                                      final ClusterSingletonServiceProvider singletonService,
199                                      final DOMSchemaService schemaService,
200                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
201                                      final NotificationPublishService notificationPublishService,
202                                      final NotificationService notificationService,
203                                      final DOMDataBroker domDataBroker,
204                                      final DOMDataTreeService domDataTreeService,
205                                      final DistributedShardFactory distributedShardFactory,
206                                      final DistributedDataStoreInterface configDataStore,
207                                      final ActorSystemProvider actorSystemProvider) {
208         this.rpcRegistry = rpcRegistry;
209         this.domRpcService = domRpcService;
210         this.singletonService = singletonService;
211         this.schemaService = schemaService;
212         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
213         this.notificationPublishService = notificationPublishService;
214         this.notificationService = notificationService;
215         this.domDataBroker = domDataBroker;
216         this.domDataTreeService = domDataTreeService;
217         this.distributedShardFactory = distributedShardFactory;
218         this.configDataStore = configDataStore;
219         this.actorSystem = actorSystemProvider.getActorSystem();
220
221         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
222
223         domDataTreeChangeService =
224                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
225
226         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
227
228         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
229                 bindingNormalizedNodeSerializer);
230     }
231
232     @Override
233     @SuppressWarnings("checkstyle:IllegalCatch")
234     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
235             final UnregisterSingletonConstantInput input) {
236         LOG.debug("unregister-singleton-constant");
237
238         if (getSingletonConstantRegistration == null) {
239             LOG.debug("No get-singleton-constant registration present.");
240             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
241                     "No get-singleton-constant rpc registration present.");
242             final RpcResult<UnregisterSingletonConstantOutput> result =
243                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
244             return Futures.immediateFuture(result);
245         }
246
247         try {
248             getSingletonConstantRegistration.close();
249             getSingletonConstantRegistration = null;
250
251             return Futures.immediateFuture(RpcResultBuilder.success(
252                 new UnregisterSingletonConstantOutputBuilder().build()).build());
253         } catch (Exception e) {
254             LOG.debug("There was a problem closing the singleton constant service", e);
255             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
256                     "There was a problem closing get-singleton-constant");
257             final RpcResult<UnregisterSingletonConstantOutput> result =
258                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
259             return Futures.immediateFuture(result);
260         }
261     }
262
263     @Override
264     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
265             final StartPublishNotificationsInput input) {
266         LOG.debug("publish-notifications, input: {}", input);
267
268         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
269                 input.getSeconds(), input.getNotificationsPerSecond());
270
271         publishNotificationsTasks.put(input.getId(), task);
272
273         task.start();
274
275         return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
276             .build());
277     }
278
279     @Override
280     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
281
282         if (dtclReg != null) {
283             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
284                     "There is already dataTreeChangeListener registered on id-ints list.");
285             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
286         }
287
288         idIntsListener = new IdIntsListener();
289
290         dtclReg = domDataTreeChangeService
291                 .registerDataTreeChangeListener(
292                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
293                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
294                         idIntsListener);
295
296         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
297     }
298
299     @Override
300     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
301         LOG.debug("write-transactions, input: {}", input);
302         return WriteTransactionsHandler.start(domDataBroker, input);
303     }
304
305     @Override
306     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
307         return null;
308     }
309
310     @Override
311     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
312             final RemoveShardReplicaInput input) {
313         return null;
314     }
315
316     @Override
317     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
318
319         LOG.debug("subscribe-ynl, input: {}", input);
320
321         if (ynlRegistrations.containsKey(input.getId())) {
322             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
323                     "There is already ynl listener registered for this id: " + input.getId());
324             return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
325         }
326
327         ynlRegistrations.put(input.getId(),
328                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
329
330         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
331     }
332
333     @Override
334     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
335         LOG.debug("remove-prefix-shard, input: {}", input);
336
337         return prefixShardHandler.onRemovePrefixShard(input);
338     }
339
340     @Override
341     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
342             final BecomePrefixLeaderInput input) {
343         LOG.debug("become-prefix-leader, input: {}", input);
344
345         return prefixLeaderHandler.makeLeaderLocal(input);
346     }
347
348     @Override
349     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
350             final UnregisterBoundConstantInput input) {
351         LOG.debug("unregister-bound-constant, {}", input);
352
353         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
354                 routedRegistrations.remove(input.getContext());
355
356         if (rpcRegistration == null) {
357             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
358             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
359                     "No get-constant rpc registration present.");
360             final RpcResult<UnregisterBoundConstantOutput> result =
361                     RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
362             return Futures.immediateFuture(result);
363         }
364
365         rpcRegistration.close();
366         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
367             .build());
368     }
369
370     @Override
371     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
372             final RegisterSingletonConstantInput input) {
373
374         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
375
376         if (input.getConstant() == null) {
377             final RpcError error = RpcResultBuilder.newError(
378                     ErrorType.RPC, "Invalid input.", "Constant value is null");
379             return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
380                 .withRpcError(error).build());
381         }
382
383         getSingletonConstantRegistration =
384                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
385
386         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
387             .build());
388     }
389
390     @Override
391     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
392             final RegisterDefaultConstantInput input) {
393         return null;
394     }
395
396     @Override
397     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
398             final UnregisterConstantInput input) {
399
400         if (globalGetConstantRegistration == null) {
401             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
402                     "No get-constant rpc registration present.");
403             return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
404                 .build());
405         }
406
407         globalGetConstantRegistration.close();
408         globalGetConstantRegistration = null;
409
410         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
411     }
412
413     @Override
414     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
415             final UnregisterFlappingSingletonInput input) {
416         LOG.debug("unregister-flapping-singleton received.");
417
418         if (flappingSingletonService == null) {
419             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
420                     "No flapping-singleton registration present.");
421             final RpcResult<UnregisterFlappingSingletonOutput> result =
422                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
423             return Futures.immediateFuture(result);
424         }
425
426         final long flapCount = flappingSingletonService.setInactive();
427         flappingSingletonService = null;
428
429         final UnregisterFlappingSingletonOutput output =
430                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
431
432         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
433     }
434
435     @Override
436     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
437         return null;
438     }
439
440     @Override
441     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
442
443         if (ddtlReg != null) {
444             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
445                     "There is already dataTreeChangeListener registered on id-ints list.");
446             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
447         }
448
449         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
450
451         try {
452             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
453                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
454                             ProduceTransactionsHandler.ID_INT_YID)),
455                     true, Collections.emptyList());
456         } catch (DOMDataTreeLoopException e) {
457             LOG.error("Failed to register DOMDataTreeListener.", e);
458         }
459
460         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
461     }
462
463     @Override
464     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
465             final RegisterBoundConstantInput input) {
466         LOG.debug("register-bound-constant: {}", input);
467
468         if (input.getContext() == null) {
469             final RpcError error = RpcResultBuilder.newError(
470                     ErrorType.RPC, "Invalid input.", "Context value is null");
471             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
472                 .build());
473         }
474
475         if (input.getConstant() == null) {
476             final RpcError error = RpcResultBuilder.newError(
477                     ErrorType.RPC, "Invalid input.", "Constant value is null");
478             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
479                 .build());
480         }
481
482         if (routedRegistrations.containsKey(input.getContext())) {
483             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
484                     "There is already a rpc registered for context: " + input.getContext());
485             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
486                 .build());
487         }
488
489         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
490                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
491                         input.getConstant(), input.getContext());
492
493         routedRegistrations.put(input.getContext(), rpcRegistration);
494         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
495             .build());
496     }
497
498     @Override
499     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
500             final RegisterFlappingSingletonInput input) {
501         LOG.debug("Received register-flapping-singleton.");
502
503         if (flappingSingletonService != null) {
504             final RpcError error = RpcResultBuilder.newError(
505                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
506             return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
507                 .withRpcError(error).build());
508         }
509
510         flappingSingletonService = new FlappingSingletonService(singletonService);
511
512         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
513             .build());
514     }
515
516     @Override
517     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
518         LOG.debug("Received unsubscribe-dtcl");
519
520         if (idIntsListener == null || dtclReg == null) {
521             final RpcError error = RpcResultBuilder.newError(
522                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
523             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
524                     .withRpcError(error).build());
525         }
526
527         try {
528             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
529         } catch (InterruptedException | ExecutionException | TimeoutException e) {
530             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
531                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
532             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
533                     .withRpcError(error).build());
534         }
535
536         dtclReg.close();
537         dtclReg = null;
538
539         if (!idIntsListener.hasTriggered()) {
540             final RpcError error = RpcResultBuilder.newError(
541                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
542                             + "any notifications.");
543             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
544                     .withRpcError(error).build());
545         }
546
547         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
548         try {
549             final Optional<NormalizedNode<?, ?>> readResult =
550                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
551
552             if (!readResult.isPresent()) {
553                 final RpcError error = RpcResultBuilder.newError(
554                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
555                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
556                         .withRpcError(error).build());
557             }
558
559             final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
560             if (!nodesEqual) {
561                 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
562                         idIntsListener.diffWithLocalCopy(readResult.get()));
563             }
564
565             return Futures.immediateFuture(
566                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual)).build());
567
568         } catch (final InterruptedException | ExecutionException e) {
569             final RpcError error = RpcResultBuilder.newError(
570                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
571             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
572                     .withRpcError(error).build());
573
574         }
575     }
576
577     @Override
578     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
579         LOG.debug("create-prefix-shard, input: {}", input);
580
581         return prefixShardHandler.onCreatePrefixShard(input);
582     }
583
584     @Override
585     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
586             final DeconfigureIdIntsShardInput input) {
587         return null;
588     }
589
590     @Override
591     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
592         LOG.debug("Received unsubscribe-ynl, input: {}", input);
593
594         if (!ynlRegistrations.containsKey(input.getId())) {
595             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
596                     "No ynl listener with this id registered.");
597             final RpcResult<UnsubscribeYnlOutput> result =
598                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
599             return Futures.immediateFuture(result);
600         }
601
602         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
603         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
604
605         reg.close();
606
607         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
608     }
609
610     @Override
611     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
612             final CheckPublishNotificationsInput input) {
613
614         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
615
616         if (task == null) {
617             return Futures.immediateFuture(RpcResultBuilder.success(
618                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
619         }
620
621         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
622                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
623
624         if (task.getLastError() != null) {
625             LOG.error("Last error for {}", task, task.getLastError());
626             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
627         }
628
629         final CheckPublishNotificationsOutput output =
630                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
631
632         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
633     }
634
635     @Override
636     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
637             final ProduceTransactionsInput input) {
638         LOG.debug("producer-transactions, input: {}", input);
639         return ProduceTransactionsHandler.start(domDataTreeService, input);
640     }
641
642     @Override
643     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
644             final ShutdownShardReplicaInput input) {
645         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
646
647         final String shardName = input.getShardName();
648         if (Strings.isNullOrEmpty(shardName)) {
649             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
650                     "A valid shard name must be specified");
651             return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
652                 .build());
653         }
654
655         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
656     }
657
658     @Override
659     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
660             final ShutdownPrefixShardReplicaInput input) {
661         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
662
663         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
664
665         if (shardPrefix == null) {
666             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
667                     "A valid shard prefix must be specified");
668             return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
669                 .withRpcError(rpcError).build());
670         }
671
672         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
673         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
674
675         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
676     }
677
678     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
679         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
680         final ActorContext context = configDataStore.getActorContext();
681
682         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
683                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
684         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
685         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
686
687         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
688             @Override
689             public void onComplete(final Throwable throwable, final ActorRef actorRef) {
690                 if (throwable != null) {
691                     shutdownShardAsk.failure(throwable);
692                 } else {
693                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
694                 }
695             }
696         }, context.getClientDispatcher());
697
698         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
699             @Override
700             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
701                 if (throwable != null) {
702                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
703                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
704                     rpcResult.set(failedResult);
705                 } else {
706                     // according to Patterns.gracefulStop API, we don't have to
707                     // check value of gracefulStopResult
708                     rpcResult.set(RpcResultBuilder.success(success).build());
709                 }
710             }
711         }, context.getClientDispatcher());
712         return rpcResult;
713     }
714
715     @Override
716     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
717
718         LOG.debug("Received register-constant rpc, input: {}", input);
719
720         if (input.getConstant() == null) {
721             final RpcError error = RpcResultBuilder.newError(
722                     ErrorType.RPC, "Invalid input.", "Constant value is null");
723             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
724                 .build());
725         }
726
727         if (globalGetConstantRegistration != null) {
728             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
729                     "There is already a get-constant rpc registered.");
730             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
731                 .build());
732         }
733
734         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
735         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
736     }
737
738     @Override
739     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
740             final UnregisterDefaultConstantInput input) {
741         return null;
742     }
743
744     @Override
745     @SuppressWarnings("checkstyle:IllegalCatch")
746     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
747         LOG.debug("Received unsubscribe-ddtl.");
748
749         if (idIntsDdtl == null || ddtlReg == null) {
750             final RpcError error = RpcResultBuilder.newError(
751                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
752             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
753                     .withRpcError(error).build());
754         }
755
756         try {
757             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
758         } catch (InterruptedException | ExecutionException | TimeoutException e) {
759             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
760                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
761             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
762                     .withRpcError(error).build());
763         }
764
765         ddtlReg.close();
766         ddtlReg = null;
767
768         if (!idIntsDdtl.hasTriggered()) {
769             final RpcError error = RpcResultBuilder.newError(
770                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
771                             + "any notifications.");
772             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
773                     .withRpcError(error).build());
774         }
775
776         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
777         LOG.debug("Creating distributed datastore client for shard {}", shardName);
778
779         final ActorContext actorContext = configDataStore.getActorContext();
780         final Props distributedDataStoreClientProps =
781                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
782                         "Shard-" + shardName, actorContext, shardName);
783
784         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
785         final DataStoreClient distributedDataStoreClient;
786         try {
787             distributedDataStoreClient = SimpleDataStoreClientActor
788                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
789         } catch (RuntimeException e) {
790             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
791             clientActor.tell(PoisonPill.getInstance(), noSender());
792             final RpcError error = RpcResultBuilder.newError(
793                     ErrorType.APPLICATION, "Unable to create ds client for read.",
794                     "Unable to create ds client for read.");
795             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
796                     .withRpcError(error).build());
797         }
798
799         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
800         final ClientTransaction tx = localHistory.createTransaction();
801         final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
802                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
803
804         tx.abort();
805         localHistory.close();
806         try {
807             final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
808             if (!optional.isPresent()) {
809                 LOG.warn("Final read from client is empty.");
810                 final RpcError error = RpcResultBuilder.newError(
811                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
812                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
813                         .withRpcError(error).build());
814             }
815
816             return Futures.immediateFuture(
817                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
818                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
819
820         } catch (InterruptedException | ExecutionException e) {
821             LOG.error("Unable to read data to verify ddtl data.", e);
822             final RpcError error = RpcResultBuilder.newError(
823                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
824             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
825                     .withRpcError(error).build());
826         } finally {
827             distributedDataStoreClient.close();
828             clientActor.tell(PoisonPill.getInstance(), noSender());
829         }
830     }
831 }