Move initial list creation to create-prefix-shard.
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.io.PrintWriter;
17 import java.io.StringWriter;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
26 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
27 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
28 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
29 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
30 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
31 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
32 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
33 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
34 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
35 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
36 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
37 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
38 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
39 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
45 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
46 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
47 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
48 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
49 import org.opendaylight.controller.sal.core.api.model.SchemaService;
50 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
51 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
57 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
58 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
88 import org.opendaylight.yangtools.concepts.ListenerRegistration;
89 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
90 import org.opendaylight.yangtools.yang.common.RpcError;
91 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
92 import org.opendaylight.yangtools.yang.common.RpcResult;
93 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
94 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
95 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
98
99 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
100
101     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
102     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
103             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
104
105     private final RpcProviderRegistry rpcRegistry;
106     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
107     private final DistributedShardFactory distributedShardFactory;
108     private final DOMDataTreeService domDataTreeService;
109     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
110     private final DOMDataBroker domDataBroker;
111     private final NotificationPublishService notificationPublishService;
112     private final NotificationService notificationService;
113     private final SchemaService schemaService;
114     private final ClusterSingletonServiceProvider singletonService;
115     private final DOMRpcProviderService domRpcService;
116     private final PrefixLeaderHandler prefixLeaderHandler;
117     private final PrefixShardHandler prefixShardHandler;
118     private final DOMDataTreeChangeService domDataTreeChangeService;
119
120     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
121             new HashMap<>();
122
123     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
124
125     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
126     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
127     private FlappingSingletonService flappingSingletonService;
128     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
129     private IdIntsListener idIntsListener;
130     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
131     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
132     private IdIntsDOMDataTreeLIstener idIntsDdtl;
133
134
135
136     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
137                                      final DOMRpcProviderService domRpcService,
138                                      final ClusterSingletonServiceProvider singletonService,
139                                      final SchemaService schemaService,
140                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
141                                      final NotificationPublishService notificationPublishService,
142                                      final NotificationService notificationService,
143                                      final DOMDataBroker domDataBroker,
144                                      final DOMDataTreeService domDataTreeService,
145                                      final DistributedShardFactory distributedShardFactory) {
146         this.rpcRegistry = rpcRegistry;
147         this.domRpcService = domRpcService;
148         this.singletonService = singletonService;
149         this.schemaService = schemaService;
150         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
151         this.notificationPublishService = notificationPublishService;
152         this.notificationService = notificationService;
153         this.domDataBroker = domDataBroker;
154         this.domDataTreeService = domDataTreeService;
155         this.distributedShardFactory = distributedShardFactory;
156         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
157
158         domDataTreeChangeService =
159                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
160
161         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
162
163         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
164                 bindingNormalizedNodeSerializer);
165     }
166
167     @Override
168     public Future<RpcResult<Void>> unregisterSingletonConstant() {
169         LOG.debug("unregister-singleton-constant");
170
171         if (getSingletonConstantRegistration == null) {
172             LOG.debug("No get-singleton-constant registration present.");
173             final RpcError rpcError = RpcResultBuilder
174                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
175             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
176             return Futures.immediateFuture(result);
177         }
178
179         try {
180             getSingletonConstantRegistration.close();
181             getSingletonConstantRegistration = null;
182
183             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
184         } catch (final Exception e) {
185             LOG.debug("There was a problem closing the singleton constant service", e);
186             final RpcError rpcError = RpcResultBuilder
187                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
188             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
189             return Futures.immediateFuture(result);
190         }
191     }
192
193     @Override
194     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
195         LOG.debug("publish-notifications, input: {}", input);
196
197         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
198                 input.getSeconds(), input.getNotificationsPerSecond());
199
200         publishNotificationsTasks.put(input.getId(), task);
201
202         task.start();
203
204         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
205     }
206
207     @Override
208     public Future<RpcResult<Void>> subscribeDtcl() {
209
210         if (dtclReg != null) {
211             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
212                     "There is already dataTreeChangeListener registered on id-ints list.");
213             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
214         }
215
216         idIntsListener = new IdIntsListener();
217
218         dtclReg = domDataTreeChangeService
219                 .registerDataTreeChangeListener(
220                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
221                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
222                         idIntsListener);
223
224         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
225     }
226
227     @Override
228     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
229         LOG.debug("write-transactions, input: {}", input);
230
231         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
232
233         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
234         writeTransactionsHandler.start(settableFuture);
235
236         return settableFuture;
237     }
238
239     @Override
240     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
241         return null;
242     }
243
244     @Override
245     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
246         return null;
247     }
248
249     @Override
250     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
251
252         LOG.debug("subscribe-ynl, input: {}", input);
253
254         if (ynlRegistrations.containsKey(input.getId())) {
255             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
256                     "There is already ynl listener registered for this id: " + input.getId());
257             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
258         }
259
260         ynlRegistrations.put(input.getId(),
261                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
262
263         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
264     }
265
266     @Override
267     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
268         LOG.debug("remove-prefix-shard, input: {}", input);
269
270         return prefixShardHandler.onRemovePrefixShard(input);
271     }
272
273     @Override
274     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
275         LOG.debug("become-prefix-leader, input: {}", input);
276
277         return prefixLeaderHandler.makeLeaderLocal(input);
278     }
279
280     @Override
281     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
282         LOG.debug("unregister-bound-constant, {}", input);
283
284         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
285                 routedRegistrations.remove(input.getContext());
286
287         if (registration == null) {
288             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
289             final RpcError rpcError = RpcResultBuilder
290                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
291             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
292             return Futures.immediateFuture(result);
293         }
294
295         registration.close();
296         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
297     }
298
299     @Override
300     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
301
302         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
303
304         if (input.getConstant() == null) {
305             final RpcError error = RpcResultBuilder.newError(
306                     ErrorType.RPC, "Invalid input.", "Constant value is null");
307             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
308         }
309
310         getSingletonConstantRegistration =
311                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
312
313         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
314     }
315
316     @Override
317     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
318         return null;
319     }
320
321     @Override
322     public Future<RpcResult<Void>> unregisterConstant() {
323
324         if (globalGetConstantRegistration == null) {
325             final RpcError rpcError = RpcResultBuilder
326                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
327             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
328             return Futures.immediateFuture(result);
329         }
330
331         globalGetConstantRegistration.close();
332         globalGetConstantRegistration = null;
333
334         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
335     }
336
337     @Override
338     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
339         LOG.debug("unregister-flapping-singleton received.");
340
341         if (flappingSingletonService == null) {
342             final RpcError rpcError = RpcResultBuilder
343                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
344             final RpcResult<UnregisterFlappingSingletonOutput> result =
345                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
346             return Futures.immediateFuture(result);
347         }
348
349         final long flapCount = flappingSingletonService.setInactive();
350         flappingSingletonService = null;
351
352         final UnregisterFlappingSingletonOutput output =
353                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
354
355         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
356     }
357
358     @Override
359     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
360         return null;
361     }
362
363     @Override
364     public Future<RpcResult<Void>> subscribeDdtl() {
365
366         if (ddtlReg != null) {
367             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
368                     "There is already dataTreeChangeListener registered on id-ints list.");
369             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
370         }
371
372         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
373
374         try {
375             ddtlReg =
376                     domDataTreeService.registerListener(idIntsDdtl,
377                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
378                                     ProduceTransactionsHandler.ID_INT_YID))
379                             , true, Collections.emptyList());
380         } catch (DOMDataTreeLoopException e) {
381             LOG.error("Failed to register DOMDataTreeListener.", e);
382
383         }
384
385         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
386     }
387
388     @Override
389     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
390         LOG.debug("register-bound-constant: {}", input);
391
392         if (input.getContext() == null) {
393             final RpcError error = RpcResultBuilder.newError(
394                     ErrorType.RPC, "Invalid input.", "Context value is null");
395             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
396         }
397
398         if (input.getConstant() == null) {
399             final RpcError error = RpcResultBuilder.newError(
400                     ErrorType.RPC, "Invalid input.", "Constant value is null");
401             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
402         }
403
404         if (routedRegistrations.containsKey(input.getContext())) {
405             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
406                     "There is already a rpc registered for context: " + input.getContext());
407             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
408         }
409
410         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
411                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
412                         input.getConstant(), input.getContext());
413
414         routedRegistrations.put(input.getContext(), registration);
415         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
416     }
417
418     @Override
419     public Future<RpcResult<Void>> registerFlappingSingleton() {
420         LOG.debug("Received register-flapping-singleton.");
421
422         if (flappingSingletonService != null) {
423             final RpcError error = RpcResultBuilder.newError(
424                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
425             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
426         }
427
428         flappingSingletonService = new FlappingSingletonService(singletonService);
429
430         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
431     }
432
433     @Override
434     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
435         LOG.debug("Received unsubscribe-dtcl");
436
437         if (idIntsListener == null || dtclReg == null) {
438             final RpcError error = RpcResultBuilder.newError(
439                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
440             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
441         }
442
443         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
444         try {
445             if (dtclReg != null) {
446                 dtclReg.close();
447                 dtclReg = null;
448             }
449
450             final Optional<NormalizedNode<?, ?>> readResult =
451                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
452
453             if (!readResult.isPresent()) {
454                 final RpcError error = RpcResultBuilder.newError(
455                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
456                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
457                         .withRpcError(error).build());
458             }
459
460             return Futures.immediateFuture(
461                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
462                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
463
464         } catch (final ReadFailedException e) {
465             final RpcError error = RpcResultBuilder.newError(
466                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
467             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
468                     .withRpcError(error).build());
469
470         }
471     }
472
473     @Override
474     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
475         LOG.debug("create-prefix-shard, input: {}", input);
476
477         return prefixShardHandler.onCreatePrefixShard(input);
478     }
479
480     @Override
481     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
482         return null;
483     }
484
485     @Override
486     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
487         LOG.debug("Received unsubscribe-ynl, input: {}", input);
488
489         if (!ynlRegistrations.containsKey(input.getId())) {
490             final RpcError rpcError = RpcResultBuilder
491                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
492             final RpcResult<UnsubscribeYnlOutput> result =
493                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
494             return Futures.immediateFuture(result);
495         }
496
497         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
498         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
499
500         registration.close();
501
502         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
503     }
504
505     @Override
506     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
507             final CheckPublishNotificationsInput input) {
508
509         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
510
511         if (task == null) {
512             return Futures.immediateFuture(RpcResultBuilder.success(
513                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
514         }
515
516         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
517                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
518
519         if (task.getLastError() != null) {
520             final StringWriter sw = new StringWriter();
521             final PrintWriter pw = new PrintWriter(sw);
522             task.getLastError().printStackTrace(pw);
523             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
524         }
525
526         final CheckPublishNotificationsOutput output =
527                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
528
529         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
530     }
531
532     @Override
533     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
534         LOG.debug("producer-transactions, input: {}", input);
535
536         final ProduceTransactionsHandler handler =
537                 new ProduceTransactionsHandler(domDataTreeService, input);
538
539         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
540         handler.start(settableFuture);
541
542         return settableFuture;
543     }
544
545     @Override
546     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
547
548         LOG.debug("Received register-constant rpc, input: {}", input);
549
550         if (input.getConstant() == null) {
551             final RpcError error = RpcResultBuilder.newError(
552                     ErrorType.RPC, "Invalid input.", "Constant value is null");
553             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
554         }
555
556         if (globalGetConstantRegistration != null) {
557             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
558                     "There is already a get-constant rpc registered.");
559             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
560         }
561
562         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
563         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
564     }
565
566     @Override
567     public Future<RpcResult<Void>> unregisterDefaultConstant() {
568         return null;
569     }
570
571     @Override
572     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
573         LOG.debug("Received unsubscribe-ddtl.");
574
575         if (idIntsDdtl == null || ddtlReg == null) {
576             final RpcError error = RpcResultBuilder.newError(
577                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
578             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
579         }
580
581         ddtlReg.close();
582         ddtlReg = null;
583
584         final ReadListener readListener = new ReadListener();
585         try {
586             final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
587                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
588                             ProduceTransactionsHandler.ID_INT_YID))
589                     , true, Collections.emptyList());
590
591             final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
592             registration.close();
593
594             if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
595                 final RpcError error = RpcResultBuilder.newError(
596                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
597                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
598                         .withRpcError(error).build());
599             }
600
601             final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
602
603             return Futures.immediateFuture(
604                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
605                             .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
606
607         } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
608             LOG.error("Unable to read data to verify ddtl data.", e);
609             final RpcError error = RpcResultBuilder.newError(
610                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
611             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
612                     .withRpcError(error).build());
613         }
614     }
615
616     private static class ReadListener implements DOMDataTreeListener {
617
618         private Collection<DataTreeCandidate> changes = null;
619         private SettableFuture<DataTreeCandidate> readFuture;
620
621         @Override
622         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
623                                       @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
624             Preconditions.checkArgument(changes.size() == 1);
625
626             if (this.changes == null) {
627                 this.changes = changes;
628
629                 readFuture.set(changes.iterator().next());
630             }
631         }
632
633         @Override
634         public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
635             LOG.error("Read Listener failed. {}", causes);
636         }
637
638         public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
639             if (changes != null) {
640                 return Futures.immediateFuture(changes.iterator().next());
641             }
642
643             readFuture = SettableFuture.create();
644             return readFuture;
645         }
646     }
647 }