f9f82a237d2ceee8177e08aa8f94c2b6bd0f8d55
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.io.PrintWriter;
17 import java.io.StringWriter;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
26 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
27 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
28 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
29 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
30 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
31 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
32 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
33 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
34 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
35 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
36 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
39 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
40 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
41 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
44 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
45 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
46 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
47 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
48 import org.opendaylight.controller.sal.core.api.model.SchemaService;
49 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
50 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
56 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
57 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
87 import org.opendaylight.yangtools.concepts.ListenerRegistration;
88 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
89 import org.opendaylight.yangtools.yang.common.RpcError;
90 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
91 import org.opendaylight.yangtools.yang.common.RpcResult;
92 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
93 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
94 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
97
98 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
99
100     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
101     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
102             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
103
104     private final RpcProviderRegistry rpcRegistry;
105     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
106     private final DistributedShardFactory distributedShardFactory;
107     private final DOMDataTreeService domDataTreeService;
108     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
109     private final DOMDataBroker domDataBroker;
110     private final NotificationPublishService notificationPublishService;
111     private final NotificationService notificationService;
112     private final SchemaService schemaService;
113     private final ClusterSingletonServiceProvider singletonService;
114     private final DOMRpcProviderService domRpcService;
115     private final PrefixShardHandler prefixShardHandler;
116     private final DOMDataTreeChangeService domDataTreeChangeService;
117
118     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
119             new HashMap<>();
120
121     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
122
123     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
124     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
125     private FlappingSingletonService flappingSingletonService;
126     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
127     private IdIntsListener idIntsListener;
128     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
129     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
130     private IdIntsDOMDataTreeLIstener idIntsDdtl;
131
132
133
134     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
135                                      final DOMRpcProviderService domRpcService,
136                                      final ClusterSingletonServiceProvider singletonService,
137                                      final SchemaService schemaService,
138                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
139                                      final NotificationPublishService notificationPublishService,
140                                      final NotificationService notificationService,
141                                      final DOMDataBroker domDataBroker,
142                                      final DOMDataTreeService domDataTreeService,
143                                      final DistributedShardFactory distributedShardFactory) {
144         this.rpcRegistry = rpcRegistry;
145         this.domRpcService = domRpcService;
146         this.singletonService = singletonService;
147         this.schemaService = schemaService;
148         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
149         this.notificationPublishService = notificationPublishService;
150         this.notificationService = notificationService;
151         this.domDataBroker = domDataBroker;
152         this.domDataTreeService = domDataTreeService;
153         this.distributedShardFactory = distributedShardFactory;
154
155         domDataTreeChangeService =
156                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
157
158         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
159
160         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
161     }
162
163     @Override
164     public Future<RpcResult<Void>> unregisterSingletonConstant() {
165         LOG.debug("unregister-singleton-constant");
166
167         if (getSingletonConstantRegistration == null) {
168             LOG.debug("No get-singleton-constant registration present.");
169             final RpcError rpcError = RpcResultBuilder
170                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
171             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
172             return Futures.immediateFuture(result);
173         }
174
175         try {
176             getSingletonConstantRegistration.close();
177             getSingletonConstantRegistration = null;
178
179             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
180         } catch (final Exception e) {
181             LOG.debug("There was a problem closing the singleton constant service", e);
182             final RpcError rpcError = RpcResultBuilder
183                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
184             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
185             return Futures.immediateFuture(result);
186         }
187     }
188
189     @Override
190     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
191         LOG.debug("publish-notifications, input: {}", input);
192
193         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
194                 input.getSeconds(), input.getNotificationsPerSecond());
195
196         publishNotificationsTasks.put(input.getId(), task);
197
198         task.start();
199
200         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
201     }
202
203     @Override
204     public Future<RpcResult<Void>> subscribeDtcl() {
205
206         if (dtclReg != null) {
207             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
208                     "There is already dataTreeChangeListener registered on id-ints list.");
209             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
210         }
211
212         idIntsListener = new IdIntsListener();
213
214         dtclReg = domDataTreeChangeService
215                 .registerDataTreeChangeListener(
216                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
217                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INTS_YID),
218                         idIntsListener);
219
220         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
221     }
222
223     @Override
224     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
225         LOG.debug("write-transactions, input: {}", input);
226
227         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
228
229         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
230         writeTransactionsHandler.start(settableFuture);
231
232         return settableFuture;
233     }
234
235     @Override
236     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
237         return null;
238     }
239
240     @Override
241     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
242         return null;
243     }
244
245     @Override
246     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
247
248         LOG.debug("subscribe-ynl, input: {}", input);
249
250         if (ynlRegistrations.containsKey(input.getId())) {
251             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
252                     "There is already ynl listener registered for this id: " + input.getId());
253             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
254         }
255
256         ynlRegistrations.put(input.getId(),
257                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
258
259         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
260     }
261
262     @Override
263     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
264         LOG.debug("remove-prefix-shard, input: {}", input);
265
266         return prefixShardHandler.onRemovePrefixShard(input);
267     }
268
269     @Override
270     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
271         return null;
272     }
273
274     @Override
275     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
276         LOG.debug("unregister-bound-constant, {}", input);
277
278         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
279                 routedRegistrations.remove(input.getContext());
280
281         if (registration == null) {
282             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
283             final RpcError rpcError = RpcResultBuilder
284                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
285             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
286             return Futures.immediateFuture(result);
287         }
288
289         registration.close();
290         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
291     }
292
293     @Override
294     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
295
296         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
297
298         if (input.getConstant() == null) {
299             final RpcError error = RpcResultBuilder.newError(
300                     ErrorType.RPC, "Invalid input.", "Constant value is null");
301             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
302         }
303
304         getSingletonConstantRegistration =
305                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
306
307         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
308     }
309
310     @Override
311     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
312         return null;
313     }
314
315     @Override
316     public Future<RpcResult<Void>> unregisterConstant() {
317
318         if (globalGetConstantRegistration == null) {
319             final RpcError rpcError = RpcResultBuilder
320                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
321             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
322             return Futures.immediateFuture(result);
323         }
324
325         globalGetConstantRegistration.close();
326         globalGetConstantRegistration = null;
327
328         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
329     }
330
331     @Override
332     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
333         LOG.debug("unregister-flapping-singleton received.");
334
335         if (flappingSingletonService == null) {
336             final RpcError rpcError = RpcResultBuilder
337                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
338             final RpcResult<UnregisterFlappingSingletonOutput> result =
339                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
340             return Futures.immediateFuture(result);
341         }
342
343         final long flapCount = flappingSingletonService.setInactive();
344         flappingSingletonService = null;
345
346         final UnregisterFlappingSingletonOutput output =
347                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
348
349         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
350     }
351
352     @Override
353     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
354         return null;
355     }
356
357     @Override
358     public Future<RpcResult<Void>> subscribeDdtl() {
359
360         if (ddtlReg != null) {
361             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
362                     "There is already dataTreeChangeListener registered on id-ints list.");
363             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
364         }
365
366         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
367
368         try {
369             ddtlReg =
370                     domDataTreeService.registerListener(idIntsDdtl,
371                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
372                                     ProduceTransactionsHandler.ID_INTS_YID))
373                             , true, Collections.emptyList());
374         } catch (DOMDataTreeLoopException e) {
375             LOG.error("Failed to register DOMDataTreeListener.", e);
376
377         }
378
379         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
380     }
381
382     @Override
383     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
384         LOG.debug("register-bound-constant: {}", input);
385
386         if (input.getContext() == null) {
387             final RpcError error = RpcResultBuilder.newError(
388                     ErrorType.RPC, "Invalid input.", "Context value is null");
389             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
390         }
391
392         if (input.getConstant() == null) {
393             final RpcError error = RpcResultBuilder.newError(
394                     ErrorType.RPC, "Invalid input.", "Constant value is null");
395             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
396         }
397
398         if (routedRegistrations.containsKey(input.getContext())) {
399             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
400                     "There is already a rpc registered for context: " + input.getContext());
401             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
402         }
403
404         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
405                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
406                         input.getConstant(), input.getContext());
407
408         routedRegistrations.put(input.getContext(), registration);
409         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
410     }
411
412     @Override
413     public Future<RpcResult<Void>> registerFlappingSingleton() {
414         LOG.debug("Received register-flapping-singleton.");
415
416         if (flappingSingletonService != null) {
417             final RpcError error = RpcResultBuilder.newError(
418                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
419             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
420         }
421
422         flappingSingletonService = new FlappingSingletonService(singletonService);
423
424         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
425     }
426
427     @Override
428     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
429         LOG.debug("Received unsubscribe-dtcl");
430
431         if (idIntsListener == null || dtclReg == null) {
432             final RpcError error = RpcResultBuilder.newError(
433                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
434             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
435         }
436
437         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
438         try {
439             if (dtclReg != null) {
440                 dtclReg.close();
441                 dtclReg = null;
442             }
443
444             final Optional<NormalizedNode<?, ?>> readResult =
445                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INTS_YID).checkedGet();
446
447             if (!readResult.isPresent()) {
448                 final RpcError error = RpcResultBuilder.newError(
449                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
450                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
451                         .withRpcError(error).build());
452             }
453
454             return Futures.immediateFuture(
455                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
456                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
457
458         } catch (final ReadFailedException e) {
459             final RpcError error = RpcResultBuilder.newError(
460                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
461             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
462                     .withRpcError(error).build());
463
464         }
465     }
466
467     @Override
468     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
469         LOG.debug("create-prefix-shard, input: {}", input);
470
471         return prefixShardHandler.onCreatePrefixShard(input);
472     }
473
474     @Override
475     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
476         return null;
477     }
478
479     @Override
480     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
481         LOG.debug("Received unsubscribe-ynl, input: {}", input);
482
483         if (!ynlRegistrations.containsKey(input.getId())) {
484             final RpcError rpcError = RpcResultBuilder
485                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
486             final RpcResult<UnsubscribeYnlOutput> result =
487                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
488             return Futures.immediateFuture(result);
489         }
490
491         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
492         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
493
494         registration.close();
495
496         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
497     }
498
499     @Override
500     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
501             final CheckPublishNotificationsInput input) {
502
503         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
504
505         if (task == null) {
506             return Futures.immediateFuture(RpcResultBuilder.success(
507                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
508         }
509
510         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
511                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
512
513         if (task.getLastError() != null) {
514             final StringWriter sw = new StringWriter();
515             final PrintWriter pw = new PrintWriter(sw);
516             task.getLastError().printStackTrace(pw);
517             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
518         }
519
520         final CheckPublishNotificationsOutput output =
521                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
522
523         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
524     }
525
526     @Override
527     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
528         LOG.debug("producer-transactions, input: {}", input);
529
530         final ProduceTransactionsHandler handler =
531                 new ProduceTransactionsHandler(domDataTreeService, input);
532
533         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
534         handler.start(settableFuture);
535
536         return settableFuture;
537     }
538
539     @Override
540     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
541
542         LOG.debug("Received register-constant rpc, input: {}", input);
543
544         if (input.getConstant() == null) {
545             final RpcError error = RpcResultBuilder.newError(
546                     ErrorType.RPC, "Invalid input.", "Constant value is null");
547             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
548         }
549
550         if (globalGetConstantRegistration != null) {
551             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
552                     "There is already a get-constant rpc registered.");
553             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
554         }
555
556         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
557         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
558     }
559
560     @Override
561     public Future<RpcResult<Void>> unregisterDefaultConstant() {
562         return null;
563     }
564
565     @Override
566     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
567         LOG.debug("Received unsubscribe-ddtl.");
568
569         if (idIntsDdtl == null || ddtlReg == null) {
570             final RpcError error = RpcResultBuilder.newError(
571                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
572             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
573         }
574
575         ddtlReg.close();
576         ddtlReg = null;
577
578         final ReadListener readListener = new ReadListener();
579         try {
580             final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
581                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
582                             ProduceTransactionsHandler.ID_INTS_YID))
583                     , true, Collections.emptyList());
584
585             final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
586             registration.close();
587
588             if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
589                 final RpcError error = RpcResultBuilder.newError(
590                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
591                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
592                         .withRpcError(error).build());
593             }
594
595             final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
596
597             return Futures.immediateFuture(
598                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
599                             .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
600
601
602         } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
603             LOG.error("Unable to read data to verify ddtl data.", e);
604             final RpcError error = RpcResultBuilder.newError(
605                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
606             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
607                     .withRpcError(error).build());
608         }
609     }
610
611     private static class ReadListener implements DOMDataTreeListener {
612
613         private Collection<DataTreeCandidate> changes = null;
614         private SettableFuture<DataTreeCandidate> readFuture;
615
616         @Override
617         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
618                                       @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
619             Preconditions.checkArgument(changes.size() == 1);
620
621             if (this.changes == null) {
622                 this.changes = changes;
623
624                 readFuture.set(changes.iterator().next());
625             }
626         }
627
628         @Override
629         public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
630             LOG.error("Read Listener failed. {}", causes);
631         }
632
633         public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
634             if (changes != null) {
635                 return Futures.immediateFuture(changes.iterator().next());
636             }
637
638             readFuture = SettableFuture.create();
639             return readFuture;
640         }
641     }
642 }