Adjust for Binding RPC codegen changes
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.SettableFuture;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
145 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
146 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
147 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
148 import org.opendaylight.yangtools.concepts.ListenerRegistration;
149 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
150 import org.opendaylight.yangtools.yang.common.RpcError;
151 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
152 import org.opendaylight.yangtools.yang.common.RpcResult;
153 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
154 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
155 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
156 import org.slf4j.Logger;
157 import org.slf4j.LoggerFactory;
158 import scala.concurrent.duration.FiniteDuration;
159
160 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
161
162     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
163     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
164             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
165
166     private final RpcProviderRegistry rpcRegistry;
167     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
168     private final DistributedShardFactory distributedShardFactory;
169     private final DistributedDataStoreInterface configDataStore;
170     private final DOMDataTreeService domDataTreeService;
171     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
172     private final DOMDataBroker domDataBroker;
173     private final NotificationPublishService notificationPublishService;
174     private final NotificationService notificationService;
175     private final SchemaService schemaService;
176     private final ClusterSingletonServiceProvider singletonService;
177     private final DOMRpcProviderService domRpcService;
178     private final PrefixLeaderHandler prefixLeaderHandler;
179     private final PrefixShardHandler prefixShardHandler;
180     private final DOMDataTreeChangeService domDataTreeChangeService;
181     private final ActorSystem actorSystem;
182
183     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
184             routedRegistrations = new HashMap<>();
185
186     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
187
188     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
189     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
190     private FlappingSingletonService flappingSingletonService;
191     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
192     private IdIntsListener idIntsListener;
193     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
194     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
195     private IdIntsDOMDataTreeLIstener idIntsDdtl;
196
197
198
199     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
200                                      final DOMRpcProviderService domRpcService,
201                                      final ClusterSingletonServiceProvider singletonService,
202                                      final SchemaService schemaService,
203                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
204                                      final NotificationPublishService notificationPublishService,
205                                      final NotificationService notificationService,
206                                      final DOMDataBroker domDataBroker,
207                                      final DOMDataTreeService domDataTreeService,
208                                      final DistributedShardFactory distributedShardFactory,
209                                      final DistributedDataStoreInterface configDataStore,
210                                      final ActorSystemProvider actorSystemProvider) {
211         this.rpcRegistry = rpcRegistry;
212         this.domRpcService = domRpcService;
213         this.singletonService = singletonService;
214         this.schemaService = schemaService;
215         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
216         this.notificationPublishService = notificationPublishService;
217         this.notificationService = notificationService;
218         this.domDataBroker = domDataBroker;
219         this.domDataTreeService = domDataTreeService;
220         this.distributedShardFactory = distributedShardFactory;
221         this.configDataStore = configDataStore;
222         this.actorSystem = actorSystemProvider.getActorSystem();
223
224         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
225
226         domDataTreeChangeService =
227                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
228
229         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
230
231         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
232                 bindingNormalizedNodeSerializer);
233     }
234
235     @Override
236     @SuppressWarnings("checkstyle:IllegalCatch")
237     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
238             final UnregisterSingletonConstantInput input) {
239         LOG.debug("unregister-singleton-constant");
240
241         if (getSingletonConstantRegistration == null) {
242             LOG.debug("No get-singleton-constant registration present.");
243             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
244                     "No get-singleton-constant rpc registration present.");
245             final RpcResult<UnregisterSingletonConstantOutput> result =
246                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
247             return Futures.immediateFuture(result);
248         }
249
250         try {
251             getSingletonConstantRegistration.close();
252             getSingletonConstantRegistration = null;
253
254             return Futures.immediateFuture(RpcResultBuilder.success(
255                 new UnregisterSingletonConstantOutputBuilder().build()).build());
256         } catch (Exception e) {
257             LOG.debug("There was a problem closing the singleton constant service", e);
258             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
259                     "There was a problem closing get-singleton-constant");
260             final RpcResult<UnregisterSingletonConstantOutput> result =
261                     RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
262             return Futures.immediateFuture(result);
263         }
264     }
265
266     @Override
267     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
268             final StartPublishNotificationsInput input) {
269         LOG.debug("publish-notifications, input: {}", input);
270
271         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
272                 input.getSeconds(), input.getNotificationsPerSecond());
273
274         publishNotificationsTasks.put(input.getId(), task);
275
276         task.start();
277
278         return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
279             .build());
280     }
281
282     @Override
283     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
284
285         if (dtclReg != null) {
286             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
287                     "There is already dataTreeChangeListener registered on id-ints list.");
288             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
289         }
290
291         idIntsListener = new IdIntsListener();
292
293         dtclReg = domDataTreeChangeService
294                 .registerDataTreeChangeListener(
295                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
296                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
297                         idIntsListener);
298
299         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
300     }
301
302     @Override
303     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
304         LOG.debug("write-transactions, input: {}", input);
305         return WriteTransactionsHandler.start(domDataBroker, input);
306     }
307
308     @Override
309     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
310         return null;
311     }
312
313     @Override
314     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
315             final RemoveShardReplicaInput input) {
316         return null;
317     }
318
319     @Override
320     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
321
322         LOG.debug("subscribe-ynl, input: {}", input);
323
324         if (ynlRegistrations.containsKey(input.getId())) {
325             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
326                     "There is already ynl listener registered for this id: " + input.getId());
327             return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
328         }
329
330         ynlRegistrations.put(input.getId(),
331                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
332
333         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
334     }
335
336     @Override
337     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
338         LOG.debug("remove-prefix-shard, input: {}", input);
339
340         return prefixShardHandler.onRemovePrefixShard(input);
341     }
342
343     @Override
344     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
345             final BecomePrefixLeaderInput input) {
346         LOG.debug("become-prefix-leader, input: {}", input);
347
348         return prefixLeaderHandler.makeLeaderLocal(input);
349     }
350
351     @Override
352     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
353             final UnregisterBoundConstantInput input) {
354         LOG.debug("unregister-bound-constant, {}", input);
355
356         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
357                 routedRegistrations.remove(input.getContext());
358
359         if (rpcRegistration == null) {
360             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
361             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
362                     "No get-constant rpc registration present.");
363             final RpcResult<UnregisterBoundConstantOutput> result =
364                     RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
365             return Futures.immediateFuture(result);
366         }
367
368         rpcRegistration.close();
369         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
370             .build());
371     }
372
373     @Override
374     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
375             final RegisterSingletonConstantInput input) {
376
377         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
378
379         if (input.getConstant() == null) {
380             final RpcError error = RpcResultBuilder.newError(
381                     ErrorType.RPC, "Invalid input.", "Constant value is null");
382             return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
383                 .withRpcError(error).build());
384         }
385
386         getSingletonConstantRegistration =
387                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
388
389         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
390             .build());
391     }
392
393     @Override
394     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
395             final RegisterDefaultConstantInput input) {
396         return null;
397     }
398
399     @Override
400     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
401             final UnregisterConstantInput input) {
402
403         if (globalGetConstantRegistration == null) {
404             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
405                     "No get-constant rpc registration present.");
406             return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
407                 .build());
408         }
409
410         globalGetConstantRegistration.close();
411         globalGetConstantRegistration = null;
412
413         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
414     }
415
416     @Override
417     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
418             final UnregisterFlappingSingletonInput input) {
419         LOG.debug("unregister-flapping-singleton received.");
420
421         if (flappingSingletonService == null) {
422             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
423                     "No flapping-singleton registration present.");
424             final RpcResult<UnregisterFlappingSingletonOutput> result =
425                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
426             return Futures.immediateFuture(result);
427         }
428
429         final long flapCount = flappingSingletonService.setInactive();
430         flappingSingletonService = null;
431
432         final UnregisterFlappingSingletonOutput output =
433                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
434
435         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
436     }
437
438     @Override
439     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
440         return null;
441     }
442
443     @Override
444     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
445
446         if (ddtlReg != null) {
447             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
448                     "There is already dataTreeChangeListener registered on id-ints list.");
449             return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
450         }
451
452         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
453
454         try {
455             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
456                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
457                             ProduceTransactionsHandler.ID_INT_YID)),
458                     true, Collections.emptyList());
459         } catch (DOMDataTreeLoopException e) {
460             LOG.error("Failed to register DOMDataTreeListener.", e);
461         }
462
463         return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
464     }
465
466     @Override
467     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
468             final RegisterBoundConstantInput input) {
469         LOG.debug("register-bound-constant: {}", input);
470
471         if (input.getContext() == null) {
472             final RpcError error = RpcResultBuilder.newError(
473                     ErrorType.RPC, "Invalid input.", "Context value is null");
474             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
475                 .build());
476         }
477
478         if (input.getConstant() == null) {
479             final RpcError error = RpcResultBuilder.newError(
480                     ErrorType.RPC, "Invalid input.", "Constant value is null");
481             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
482                 .build());
483         }
484
485         if (routedRegistrations.containsKey(input.getContext())) {
486             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
487                     "There is already a rpc registered for context: " + input.getContext());
488             return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
489                 .build());
490         }
491
492         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
493                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
494                         input.getConstant(), input.getContext());
495
496         routedRegistrations.put(input.getContext(), rpcRegistration);
497         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
498             .build());
499     }
500
501     @Override
502     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
503             final RegisterFlappingSingletonInput input) {
504         LOG.debug("Received register-flapping-singleton.");
505
506         if (flappingSingletonService != null) {
507             final RpcError error = RpcResultBuilder.newError(
508                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
509             return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
510                 .withRpcError(error).build());
511         }
512
513         flappingSingletonService = new FlappingSingletonService(singletonService);
514
515         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
516             .build());
517     }
518
519     @Override
520     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
521         LOG.debug("Received unsubscribe-dtcl");
522
523         if (idIntsListener == null || dtclReg == null) {
524             final RpcError error = RpcResultBuilder.newError(
525                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
526             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
527                     .withRpcError(error).build());
528         }
529
530         try {
531             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
532         } catch (InterruptedException | ExecutionException | TimeoutException e) {
533             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
534                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
535             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
536                     .withRpcError(error).build());
537         }
538
539         dtclReg.close();
540         dtclReg = null;
541
542         if (!idIntsListener.hasTriggered()) {
543             final RpcError error = RpcResultBuilder.newError(
544                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
545                             + "any notifications.");
546             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
547                     .withRpcError(error).build());
548         }
549
550         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
551         try {
552             final Optional<NormalizedNode<?, ?>> readResult =
553                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
554
555             if (!readResult.isPresent()) {
556                 final RpcError error = RpcResultBuilder.newError(
557                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
558                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
559                         .withRpcError(error).build());
560             }
561
562             return Futures.immediateFuture(
563                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
564                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
565
566         } catch (final ReadFailedException e) {
567             final RpcError error = RpcResultBuilder.newError(
568                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
569             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
570                     .withRpcError(error).build());
571
572         }
573     }
574
575     @Override
576     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
577         LOG.debug("create-prefix-shard, input: {}", input);
578
579         return prefixShardHandler.onCreatePrefixShard(input);
580     }
581
582     @Override
583     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
584             final DeconfigureIdIntsShardInput input) {
585         return null;
586     }
587
588     @Override
589     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
590         LOG.debug("Received unsubscribe-ynl, input: {}", input);
591
592         if (!ynlRegistrations.containsKey(input.getId())) {
593             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
594                     "No ynl listener with this id registered.");
595             final RpcResult<UnsubscribeYnlOutput> result =
596                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
597             return Futures.immediateFuture(result);
598         }
599
600         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
601         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
602
603         reg.close();
604
605         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
606     }
607
608     @Override
609     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
610             final CheckPublishNotificationsInput input) {
611
612         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
613
614         if (task == null) {
615             return Futures.immediateFuture(RpcResultBuilder.success(
616                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
617         }
618
619         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
620                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
621
622         if (task.getLastError() != null) {
623             LOG.error("Last error for {}", task, task.getLastError());
624             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
625         }
626
627         final CheckPublishNotificationsOutput output =
628                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
629
630         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
631     }
632
633     @Override
634     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
635             final ProduceTransactionsInput input) {
636         LOG.debug("producer-transactions, input: {}", input);
637         return ProduceTransactionsHandler.start(domDataTreeService, input);
638     }
639
640     @Override
641     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
642             final ShutdownShardReplicaInput input) {
643         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
644
645         final String shardName = input.getShardName();
646         if (Strings.isNullOrEmpty(shardName)) {
647             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
648                     "A valid shard name must be specified");
649             return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
650                 .build());
651         }
652
653         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
654     }
655
656     @Override
657     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
658             final ShutdownPrefixShardReplicaInput input) {
659         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
660
661         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
662
663         if (shardPrefix == null) {
664             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
665                     "A valid shard prefix must be specified");
666             return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
667                 .withRpcError(rpcError).build());
668         }
669
670         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
671         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
672
673         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
674     }
675
676     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
677         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
678         final ActorContext context = configDataStore.getActorContext();
679
680         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
681                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
682         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
683         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
684
685         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
686             @Override
687             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
688                 if (throwable != null) {
689                     shutdownShardAsk.failure(throwable);
690                 } else {
691                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
692                 }
693             }
694         }, context.getClientDispatcher());
695
696         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
697             @Override
698             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
699                 if (throwable != null) {
700                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
701                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
702                     rpcResult.set(failedResult);
703                 } else {
704                     // according to Patterns.gracefulStop API, we don't have to
705                     // check value of gracefulStopResult
706                     rpcResult.set(RpcResultBuilder.success(success).build());
707                 }
708             }
709         }, context.getClientDispatcher());
710         return rpcResult;
711     }
712
713     @Override
714     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
715
716         LOG.debug("Received register-constant rpc, input: {}", input);
717
718         if (input.getConstant() == null) {
719             final RpcError error = RpcResultBuilder.newError(
720                     ErrorType.RPC, "Invalid input.", "Constant value is null");
721             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
722                 .build());
723         }
724
725         if (globalGetConstantRegistration != null) {
726             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
727                     "There is already a get-constant rpc registered.");
728             return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
729                 .build());
730         }
731
732         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
733         return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
734     }
735
736     @Override
737     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
738             final UnregisterDefaultConstantInput input) {
739         return null;
740     }
741
742     @Override
743     @SuppressWarnings("checkstyle:IllegalCatch")
744     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
745         LOG.debug("Received unsubscribe-ddtl.");
746
747         if (idIntsDdtl == null || ddtlReg == null) {
748             final RpcError error = RpcResultBuilder.newError(
749                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
750             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
751                     .withRpcError(error).build());
752         }
753
754         try {
755             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
756         } catch (InterruptedException | ExecutionException | TimeoutException e) {
757             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
758                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
759             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
760                     .withRpcError(error).build());
761         }
762
763         ddtlReg.close();
764         ddtlReg = null;
765
766         if (!idIntsDdtl.hasTriggered()) {
767             final RpcError error = RpcResultBuilder.newError(
768                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
769                             + "any notifications.");
770             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
771                     .withRpcError(error).build());
772         }
773
774         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
775         LOG.debug("Creating distributed datastore client for shard {}", shardName);
776
777         final ActorContext actorContext = configDataStore.getActorContext();
778         final Props distributedDataStoreClientProps =
779                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
780                         "Shard-" + shardName, actorContext, shardName);
781
782         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
783         final DataStoreClient distributedDataStoreClient;
784         try {
785             distributedDataStoreClient = SimpleDataStoreClientActor
786                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
787         } catch (RuntimeException e) {
788             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
789             clientActor.tell(PoisonPill.getInstance(), noSender());
790             final RpcError error = RpcResultBuilder.newError(
791                     ErrorType.APPLICATION, "Unable to create ds client for read.",
792                     "Unable to create ds client for read.");
793             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
794                     .withRpcError(error).build());
795         }
796
797         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
798         final ClientTransaction tx = localHistory.createTransaction();
799         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
800                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
801                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
802
803         tx.abort();
804         localHistory.close();
805         try {
806             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
807             if (!optional.isPresent()) {
808                 LOG.warn("Final read from client is empty.");
809                 final RpcError error = RpcResultBuilder.newError(
810                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
811                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
812                         .withRpcError(error).build());
813             }
814
815             return Futures.immediateFuture(
816                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
817                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
818
819         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
820             LOG.error("Unable to read data to verify ddtl data.", e);
821             final RpcError error = RpcResultBuilder.newError(
822                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
823             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
824                     .withRpcError(error).build());
825         } finally {
826             distributedDataStoreClient.close();
827             clientActor.tell(PoisonPill.getInstance(), noSender());
828         }
829     }
830 }