Bug 7804: Implement agent RPCs for DOMDataTreeListener testing
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.io.PrintWriter;
16 import java.io.StringWriter;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
25 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
26 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
27 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
28 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
29 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
30 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
31 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
32 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
33 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
34 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
35 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
36 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
37 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
38 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
39 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
41 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
42 import org.opendaylight.controller.sal.core.api.model.SchemaService;
43 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
44 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
80 import org.opendaylight.yangtools.concepts.ListenerRegistration;
81 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcError;
83 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
84 import org.opendaylight.yangtools.yang.common.RpcResult;
85 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
86 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
92
93     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
94
95     private final RpcProviderRegistry rpcRegistry;
96     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
97     private final DistributedShardFactory distributedShardFactory;
98     private final DOMDataTreeService domDataTreeService;
99     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
100     private final DOMDataBroker domDataBroker;
101     private final NotificationPublishService notificationPublishService;
102     private final NotificationService notificationService;
103     private final SchemaService schemaService;
104     private final ClusterSingletonServiceProvider singletonService;
105     private final DOMRpcProviderService domRpcService;
106     private final PrefixShardHandler prefixShardHandler;
107
108     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
109             new HashMap<>();
110
111     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
112
113     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
114     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
115     private FlappingSingletonService flappingSingletonService;
116     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
117     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
118     private IdIntsDOMDataTreeLIstener idIntsDdtl;
119
120     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
121                                      final DOMRpcProviderService domRpcService,
122                                      final ClusterSingletonServiceProvider singletonService,
123                                      final SchemaService schemaService,
124                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
125                                      final NotificationPublishService notificationPublishService,
126                                      final NotificationService notificationService,
127                                      final DOMDataBroker domDataBroker,
128                                      final DOMDataTreeService domDataTreeService,
129                                      final DistributedShardFactory distributedShardFactory) {
130         this.rpcRegistry = rpcRegistry;
131         this.domRpcService = domRpcService;
132         this.singletonService = singletonService;
133         this.schemaService = schemaService;
134         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
135         this.notificationPublishService = notificationPublishService;
136         this.notificationService = notificationService;
137         this.domDataBroker = domDataBroker;
138         this.domDataTreeService = domDataTreeService;
139         this.distributedShardFactory = distributedShardFactory;
140
141         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
142
143         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
144     }
145
146     @Override
147     public Future<RpcResult<Void>> unregisterSingletonConstant() {
148         LOG.debug("unregister-singleton-constant");
149
150         if (getSingletonConstantRegistration == null) {
151             LOG.debug("No get-singleton-constant registration present.");
152             final RpcError rpcError = RpcResultBuilder
153                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
154             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
155             return Futures.immediateFuture(result);
156         }
157
158         try {
159             getSingletonConstantRegistration.close();
160             getSingletonConstantRegistration = null;
161
162             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
163         } catch (final Exception e) {
164             LOG.debug("There was a problem closing the singleton constant service", e);
165             final RpcError rpcError = RpcResultBuilder
166                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
167             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
168             return Futures.immediateFuture(result);
169         }
170     }
171
172     @Override
173     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
174         LOG.debug("publish-notifications, input: {}", input);
175
176         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
177                 input.getSeconds(), input.getNotificationsPerSecond());
178
179         publishNotificationsTasks.put(input.getId(), task);
180
181         task.start();
182
183         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
184     }
185
186     @Override
187     public Future<RpcResult<Void>> subscribeDtcl() {
188         return null;
189     }
190
191     @Override
192     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
193         LOG.debug("write-transactions, input: {}", input);
194
195         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
196
197         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
198         writeTransactionsHandler.start(settableFuture);
199
200         return settableFuture;
201     }
202
203     @Override
204     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
205         return null;
206     }
207
208     @Override
209     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
210         return null;
211     }
212
213     @Override
214     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
215
216         LOG.debug("subscribe-ynl, input: {}", input);
217
218         if (ynlRegistrations.containsKey(input.getId())) {
219             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
220                     "There is already ynl listener registered for this id: " + input.getId());
221             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
222         }
223
224         ynlRegistrations.put(input.getId(),
225                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
226
227         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
228     }
229
230     @Override
231     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
232         LOG.debug("remove-prefix-shard, input: {}", input);
233
234         return prefixShardHandler.onRemovePrefixShard(input);
235     }
236
237     @Override
238     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
239         return null;
240     }
241
242     @Override
243     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
244         LOG.debug("unregister-bound-constant, {}", input);
245
246         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
247                 routedRegistrations.remove(input.getContext());
248
249         if (registration == null) {
250             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
251             final RpcError rpcError = RpcResultBuilder
252                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
253             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
254             return Futures.immediateFuture(result);
255         }
256
257         registration.close();
258         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
259     }
260
261     @Override
262     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
263
264         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
265
266         if (input.getConstant() == null) {
267             final RpcError error = RpcResultBuilder.newError(
268                     ErrorType.RPC, "Invalid input.", "Constant value is null");
269             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
270         }
271
272         getSingletonConstantRegistration =
273                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
274
275         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
276     }
277
278     @Override
279     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
280         return null;
281     }
282
283     @Override
284     public Future<RpcResult<Void>> unregisterConstant() {
285
286         if (globalGetConstantRegistration == null) {
287             final RpcError rpcError = RpcResultBuilder
288                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
289             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
290             return Futures.immediateFuture(result);
291         }
292
293         globalGetConstantRegistration.close();
294         globalGetConstantRegistration = null;
295
296         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
297     }
298
299     @Override
300     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
301         LOG.debug("unregister-flapping-singleton received.");
302
303         if (flappingSingletonService == null) {
304             final RpcError rpcError = RpcResultBuilder
305                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
306             final RpcResult<UnregisterFlappingSingletonOutput> result =
307                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
308             return Futures.immediateFuture(result);
309         }
310
311         final long flapCount = flappingSingletonService.setInactive();
312         flappingSingletonService = null;
313
314         final UnregisterFlappingSingletonOutput output =
315                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
316
317         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
318     }
319
320     @Override
321     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
322         return null;
323     }
324
325     @Override
326     public Future<RpcResult<Void>> subscribeDdtl() {
327
328         if (ddtlReg != null) {
329             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
330                     "There is already dataTreeChangeListener registered on id-ints list.");
331             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
332         }
333
334         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
335
336         try {
337             ddtlReg =
338                     domDataTreeService.registerListener(idIntsDdtl,
339                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
340                                     ProduceTransactionsHandler.ID_INTS_YID))
341                             , true, Collections.emptyList());
342         } catch (DOMDataTreeLoopException e) {
343             LOG.error("Failed to register DOMDataTreeListener.", e);
344
345         }
346
347         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
348     }
349
350     @Override
351     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
352         LOG.debug("register-bound-constant: {}", input);
353
354         if (input.getContext() == null) {
355             final RpcError error = RpcResultBuilder.newError(
356                     ErrorType.RPC, "Invalid input.", "Context value is null");
357             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
358         }
359
360         if (input.getConstant() == null) {
361             final RpcError error = RpcResultBuilder.newError(
362                     ErrorType.RPC, "Invalid input.", "Constant value is null");
363             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
364         }
365
366         if (routedRegistrations.containsKey(input.getContext())) {
367             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
368                     "There is already a rpc registered for context: " + input.getContext());
369             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
370         }
371
372         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
373                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
374                         input.getConstant(), input.getContext());
375
376         routedRegistrations.put(input.getContext(), registration);
377         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
378     }
379
380     @Override
381     public Future<RpcResult<Void>> registerFlappingSingleton() {
382         LOG.debug("Received register-flapping-singleton.");
383
384         if (flappingSingletonService != null) {
385             final RpcError error = RpcResultBuilder.newError(
386                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
387             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
388         }
389
390         flappingSingletonService = new FlappingSingletonService(singletonService);
391
392         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
393     }
394
395     @Override
396     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
397         return null;
398     }
399
400     @Override
401     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
402         LOG.debug("create-prefix-shard, input: {}", input);
403
404         return prefixShardHandler.onCreatePrefixShard(input);
405     }
406
407     @Override
408     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
409         return null;
410     }
411
412     @Override
413     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
414         LOG.debug("Received unsubscribe-ynl, input: {}", input);
415
416         if (!ynlRegistrations.containsKey(input.getId())) {
417             final RpcError rpcError = RpcResultBuilder
418                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
419             final RpcResult<UnsubscribeYnlOutput> result =
420                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
421             return Futures.immediateFuture(result);
422         }
423
424         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
425         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
426
427         registration.close();
428
429         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
430     }
431
432     @Override
433     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
434             final CheckPublishNotificationsInput input) {
435
436         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
437
438         if (task == null) {
439             return Futures.immediateFuture(RpcResultBuilder.success(
440                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
441         }
442
443         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
444                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
445
446         if (task.getLastError() != null) {
447             final StringWriter sw = new StringWriter();
448             final PrintWriter pw = new PrintWriter(sw);
449             task.getLastError().printStackTrace(pw);
450             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
451         }
452
453         final CheckPublishNotificationsOutput output =
454                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
455
456         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
457     }
458
459     @Override
460     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
461         LOG.debug("producer-transactions, input: {}", input);
462
463         final ProduceTransactionsHandler handler =
464                 new ProduceTransactionsHandler(domDataTreeService, input);
465
466         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
467         handler.start(settableFuture);
468
469         return settableFuture;
470     }
471
472     @Override
473     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
474
475         LOG.debug("Received register-constant rpc, input: {}", input);
476
477         if (input.getConstant() == null) {
478             final RpcError error = RpcResultBuilder.newError(
479                     ErrorType.RPC, "Invalid input.", "Constant value is null");
480             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
481         }
482
483         if (globalGetConstantRegistration != null) {
484             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
485                     "There is already a get-constant rpc registered.");
486             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
487         }
488
489         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
490         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
491     }
492
493     @Override
494     public Future<RpcResult<Void>> unregisterDefaultConstant() {
495         return null;
496     }
497
498     @Override
499     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
500         LOG.debug("Received unsubscribe-ddtl.");
501
502         if (idIntsDdtl == null || ddtlReg == null) {
503             final RpcError error = RpcResultBuilder.newError(
504                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
505             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
506         }
507
508         ddtlReg.close();
509         ddtlReg = null;
510
511         final ReadListener readListener = new ReadListener();
512         try {
513             final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
514                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
515                             ProduceTransactionsHandler.ID_INTS_YID))
516                     , true, Collections.emptyList());
517
518             final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
519             registration.close();
520
521             if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
522                 final RpcError error = RpcResultBuilder.newError(
523                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
524                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
525                         .withRpcError(error).build());
526             }
527
528             final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
529
530             return Futures.immediateFuture(
531                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
532                             .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
533
534
535         } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
536             LOG.error("Unable to read data to verify ddtl data.", e);
537             final RpcError error = RpcResultBuilder.newError(
538                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
539             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
540                     .withRpcError(error).build());
541         }
542     }
543
544     private static class ReadListener implements DOMDataTreeListener {
545
546         private Collection<DataTreeCandidate> changes = null;
547         private SettableFuture<DataTreeCandidate> readFuture;
548
549         @Override
550         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
551                                       @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
552             Preconditions.checkArgument(changes.size() == 1);
553
554             if (this.changes == null) {
555                 this.changes = changes;
556
557                 readFuture.set(changes.iterator().next());
558             }
559         }
560
561         @Override
562         public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
563             LOG.error("Read Listener failed. {}", causes);
564         }
565
566         public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
567             if (changes != null) {
568                 return Futures.immediateFuture(changes.iterator().next());
569             }
570
571             readFuture = SettableFuture.create();
572             return readFuture;
573         }
574     }
575 }