Nest id-ints list inside a container
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.io.PrintWriter;
17 import java.io.StringWriter;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
26 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
27 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
28 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
29 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
30 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
31 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
32 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
33 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
34 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
35 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
36 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
37 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
38 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
39 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
45 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
46 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
47 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
48 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
49 import org.opendaylight.controller.sal.core.api.model.SchemaService;
50 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
51 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
57 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
58 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
88 import org.opendaylight.yangtools.concepts.ListenerRegistration;
89 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
90 import org.opendaylight.yangtools.yang.common.RpcError;
91 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
92 import org.opendaylight.yangtools.yang.common.RpcResult;
93 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
94 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
95 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
98
99 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
100
101     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
102     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
103             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
104
105     private final RpcProviderRegistry rpcRegistry;
106     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
107     private final DistributedShardFactory distributedShardFactory;
108     private final DOMDataTreeService domDataTreeService;
109     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
110     private final DOMDataBroker domDataBroker;
111     private final NotificationPublishService notificationPublishService;
112     private final NotificationService notificationService;
113     private final SchemaService schemaService;
114     private final ClusterSingletonServiceProvider singletonService;
115     private final DOMRpcProviderService domRpcService;
116     private final PrefixLeaderHandler prefixLeaderHandler;
117     private final PrefixShardHandler prefixShardHandler;
118     private final DOMDataTreeChangeService domDataTreeChangeService;
119
120     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
121             new HashMap<>();
122
123     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
124
125     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
126     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
127     private FlappingSingletonService flappingSingletonService;
128     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
129     private IdIntsListener idIntsListener;
130     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
131     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
132     private IdIntsDOMDataTreeLIstener idIntsDdtl;
133
134
135
136     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
137                                      final DOMRpcProviderService domRpcService,
138                                      final ClusterSingletonServiceProvider singletonService,
139                                      final SchemaService schemaService,
140                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
141                                      final NotificationPublishService notificationPublishService,
142                                      final NotificationService notificationService,
143                                      final DOMDataBroker domDataBroker,
144                                      final DOMDataTreeService domDataTreeService,
145                                      final DistributedShardFactory distributedShardFactory) {
146         this.rpcRegistry = rpcRegistry;
147         this.domRpcService = domRpcService;
148         this.singletonService = singletonService;
149         this.schemaService = schemaService;
150         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
151         this.notificationPublishService = notificationPublishService;
152         this.notificationService = notificationService;
153         this.domDataBroker = domDataBroker;
154         this.domDataTreeService = domDataTreeService;
155         this.distributedShardFactory = distributedShardFactory;
156         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
157
158         domDataTreeChangeService =
159                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
160
161         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
162
163         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
164     }
165
166     @Override
167     public Future<RpcResult<Void>> unregisterSingletonConstant() {
168         LOG.debug("unregister-singleton-constant");
169
170         if (getSingletonConstantRegistration == null) {
171             LOG.debug("No get-singleton-constant registration present.");
172             final RpcError rpcError = RpcResultBuilder
173                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
174             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
175             return Futures.immediateFuture(result);
176         }
177
178         try {
179             getSingletonConstantRegistration.close();
180             getSingletonConstantRegistration = null;
181
182             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
183         } catch (final Exception e) {
184             LOG.debug("There was a problem closing the singleton constant service", e);
185             final RpcError rpcError = RpcResultBuilder
186                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
187             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
188             return Futures.immediateFuture(result);
189         }
190     }
191
192     @Override
193     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
194         LOG.debug("publish-notifications, input: {}", input);
195
196         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
197                 input.getSeconds(), input.getNotificationsPerSecond());
198
199         publishNotificationsTasks.put(input.getId(), task);
200
201         task.start();
202
203         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
204     }
205
206     @Override
207     public Future<RpcResult<Void>> subscribeDtcl() {
208
209         if (dtclReg != null) {
210             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
211                     "There is already dataTreeChangeListener registered on id-ints list.");
212             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
213         }
214
215         idIntsListener = new IdIntsListener();
216
217         dtclReg = domDataTreeChangeService
218                 .registerDataTreeChangeListener(
219                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
220                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
221                         idIntsListener);
222
223         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
224     }
225
226     @Override
227     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
228         LOG.debug("write-transactions, input: {}", input);
229
230         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
231
232         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
233         writeTransactionsHandler.start(settableFuture);
234
235         return settableFuture;
236     }
237
238     @Override
239     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
240         return null;
241     }
242
243     @Override
244     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
245         return null;
246     }
247
248     @Override
249     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
250
251         LOG.debug("subscribe-ynl, input: {}", input);
252
253         if (ynlRegistrations.containsKey(input.getId())) {
254             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
255                     "There is already ynl listener registered for this id: " + input.getId());
256             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
257         }
258
259         ynlRegistrations.put(input.getId(),
260                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
261
262         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
263     }
264
265     @Override
266     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
267         LOG.debug("remove-prefix-shard, input: {}", input);
268
269         return prefixShardHandler.onRemovePrefixShard(input);
270     }
271
272     @Override
273     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
274         LOG.debug("become-prefix-leader, input: {}", input);
275
276         return prefixLeaderHandler.makeLeaderLocal(input);
277     }
278
279     @Override
280     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
281         LOG.debug("unregister-bound-constant, {}", input);
282
283         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
284                 routedRegistrations.remove(input.getContext());
285
286         if (registration == null) {
287             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
288             final RpcError rpcError = RpcResultBuilder
289                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
290             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
291             return Futures.immediateFuture(result);
292         }
293
294         registration.close();
295         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
296     }
297
298     @Override
299     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
300
301         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
302
303         if (input.getConstant() == null) {
304             final RpcError error = RpcResultBuilder.newError(
305                     ErrorType.RPC, "Invalid input.", "Constant value is null");
306             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
307         }
308
309         getSingletonConstantRegistration =
310                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
311
312         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
313     }
314
315     @Override
316     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
317         return null;
318     }
319
320     @Override
321     public Future<RpcResult<Void>> unregisterConstant() {
322
323         if (globalGetConstantRegistration == null) {
324             final RpcError rpcError = RpcResultBuilder
325                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
326             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
327             return Futures.immediateFuture(result);
328         }
329
330         globalGetConstantRegistration.close();
331         globalGetConstantRegistration = null;
332
333         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
334     }
335
336     @Override
337     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
338         LOG.debug("unregister-flapping-singleton received.");
339
340         if (flappingSingletonService == null) {
341             final RpcError rpcError = RpcResultBuilder
342                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
343             final RpcResult<UnregisterFlappingSingletonOutput> result =
344                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
345             return Futures.immediateFuture(result);
346         }
347
348         final long flapCount = flappingSingletonService.setInactive();
349         flappingSingletonService = null;
350
351         final UnregisterFlappingSingletonOutput output =
352                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
353
354         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
355     }
356
357     @Override
358     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
359         return null;
360     }
361
362     @Override
363     public Future<RpcResult<Void>> subscribeDdtl() {
364
365         if (ddtlReg != null) {
366             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
367                     "There is already dataTreeChangeListener registered on id-ints list.");
368             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
369         }
370
371         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
372
373         try {
374             ddtlReg =
375                     domDataTreeService.registerListener(idIntsDdtl,
376                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
377                                     ProduceTransactionsHandler.ID_INT_YID))
378                             , true, Collections.emptyList());
379         } catch (DOMDataTreeLoopException e) {
380             LOG.error("Failed to register DOMDataTreeListener.", e);
381
382         }
383
384         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
385     }
386
387     @Override
388     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
389         LOG.debug("register-bound-constant: {}", input);
390
391         if (input.getContext() == null) {
392             final RpcError error = RpcResultBuilder.newError(
393                     ErrorType.RPC, "Invalid input.", "Context value is null");
394             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
395         }
396
397         if (input.getConstant() == null) {
398             final RpcError error = RpcResultBuilder.newError(
399                     ErrorType.RPC, "Invalid input.", "Constant value is null");
400             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
401         }
402
403         if (routedRegistrations.containsKey(input.getContext())) {
404             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
405                     "There is already a rpc registered for context: " + input.getContext());
406             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
407         }
408
409         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
410                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
411                         input.getConstant(), input.getContext());
412
413         routedRegistrations.put(input.getContext(), registration);
414         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
415     }
416
417     @Override
418     public Future<RpcResult<Void>> registerFlappingSingleton() {
419         LOG.debug("Received register-flapping-singleton.");
420
421         if (flappingSingletonService != null) {
422             final RpcError error = RpcResultBuilder.newError(
423                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
424             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
425         }
426
427         flappingSingletonService = new FlappingSingletonService(singletonService);
428
429         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
430     }
431
432     @Override
433     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
434         LOG.debug("Received unsubscribe-dtcl");
435
436         if (idIntsListener == null || dtclReg == null) {
437             final RpcError error = RpcResultBuilder.newError(
438                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
439             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
440         }
441
442         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
443         try {
444             if (dtclReg != null) {
445                 dtclReg.close();
446                 dtclReg = null;
447             }
448
449             final Optional<NormalizedNode<?, ?>> readResult =
450                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
451
452             if (!readResult.isPresent()) {
453                 final RpcError error = RpcResultBuilder.newError(
454                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
455                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
456                         .withRpcError(error).build());
457             }
458
459             return Futures.immediateFuture(
460                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
461                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
462
463         } catch (final ReadFailedException e) {
464             final RpcError error = RpcResultBuilder.newError(
465                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
466             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
467                     .withRpcError(error).build());
468
469         }
470     }
471
472     @Override
473     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
474         LOG.debug("create-prefix-shard, input: {}", input);
475
476         return prefixShardHandler.onCreatePrefixShard(input);
477     }
478
479     @Override
480     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
481         return null;
482     }
483
484     @Override
485     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
486         LOG.debug("Received unsubscribe-ynl, input: {}", input);
487
488         if (!ynlRegistrations.containsKey(input.getId())) {
489             final RpcError rpcError = RpcResultBuilder
490                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
491             final RpcResult<UnsubscribeYnlOutput> result =
492                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
493             return Futures.immediateFuture(result);
494         }
495
496         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
497         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
498
499         registration.close();
500
501         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
502     }
503
504     @Override
505     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
506             final CheckPublishNotificationsInput input) {
507
508         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
509
510         if (task == null) {
511             return Futures.immediateFuture(RpcResultBuilder.success(
512                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
513         }
514
515         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
516                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
517
518         if (task.getLastError() != null) {
519             final StringWriter sw = new StringWriter();
520             final PrintWriter pw = new PrintWriter(sw);
521             task.getLastError().printStackTrace(pw);
522             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
523         }
524
525         final CheckPublishNotificationsOutput output =
526                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
527
528         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
529     }
530
531     @Override
532     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
533         LOG.debug("producer-transactions, input: {}", input);
534
535         final ProduceTransactionsHandler handler =
536                 new ProduceTransactionsHandler(domDataTreeService, input);
537
538         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
539         handler.start(settableFuture);
540
541         return settableFuture;
542     }
543
544     @Override
545     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
546
547         LOG.debug("Received register-constant rpc, input: {}", input);
548
549         if (input.getConstant() == null) {
550             final RpcError error = RpcResultBuilder.newError(
551                     ErrorType.RPC, "Invalid input.", "Constant value is null");
552             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
553         }
554
555         if (globalGetConstantRegistration != null) {
556             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
557                     "There is already a get-constant rpc registered.");
558             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
559         }
560
561         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
562         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
563     }
564
565     @Override
566     public Future<RpcResult<Void>> unregisterDefaultConstant() {
567         return null;
568     }
569
570     @Override
571     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
572         LOG.debug("Received unsubscribe-ddtl.");
573
574         if (idIntsDdtl == null || ddtlReg == null) {
575             final RpcError error = RpcResultBuilder.newError(
576                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
577             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
578         }
579
580         ddtlReg.close();
581         ddtlReg = null;
582
583         final ReadListener readListener = new ReadListener();
584         try {
585             final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
586                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
587                             ProduceTransactionsHandler.ID_INT_YID))
588                     , true, Collections.emptyList());
589
590             final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
591             registration.close();
592
593             if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
594                 final RpcError error = RpcResultBuilder.newError(
595                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
596                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
597                         .withRpcError(error).build());
598             }
599
600             final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
601
602             return Futures.immediateFuture(
603                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
604                             .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
605
606         } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
607             LOG.error("Unable to read data to verify ddtl data.", e);
608             final RpcError error = RpcResultBuilder.newError(
609                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
610             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
611                     .withRpcError(error).build());
612         }
613     }
614
615     private static class ReadListener implements DOMDataTreeListener {
616
617         private Collection<DataTreeCandidate> changes = null;
618         private SettableFuture<DataTreeCandidate> readFuture;
619
620         @Override
621         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
622                                       @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
623             Preconditions.checkArgument(changes.size() == 1);
624
625             if (this.changes == null) {
626                 this.changes = changes;
627
628                 readFuture.set(changes.iterator().next());
629             }
630         }
631
632         @Override
633         public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
634             LOG.error("Read Listener failed. {}", causes);
635         }
636
637         public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
638             if (changes != null) {
639                 return Futures.immediateFuture(changes.iterator().next());
640             }
641
642             readFuture = SettableFuture.create();
643             return readFuture;
644         }
645     }
646 }