BUG 7802: split out shard creation from produce transactions
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.SettableFuture;
13 import java.io.PrintWriter;
14 import java.io.StringWriter;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.concurrent.Future;
18 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
19 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
20 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
21 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
22 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
23 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
24 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
25 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
26 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
27 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
28 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
34 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
35 import org.opendaylight.controller.sal.core.api.model.SchemaService;
36 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
37 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
40 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
41 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
42 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
43 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
44 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
45 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
46 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
47 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
48 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
49 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
50 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
51 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
67 import org.opendaylight.yangtools.concepts.ListenerRegistration;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.RpcError;
70 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
71 import org.opendaylight.yangtools.yang.common.RpcResult;
72 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75
76 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
77
78     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
79
80     private final RpcProviderRegistry rpcRegistry;
81     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
82     private final DistributedShardFactory distributedShardFactory;
83     private final DOMDataTreeService domDataTreeService;
84     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
85     private final DOMDataBroker domDataBroker;
86     private final NotificationPublishService notificationPublishService;
87     private final NotificationService notificationService;
88     private final SchemaService schemaService;
89     private final ClusterSingletonServiceProvider singletonService;
90     private final DOMRpcProviderService domRpcService;
91     private final PrefixShardHandler prefixShardHandler;
92
93     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
94             new HashMap<>();
95
96     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
97
98     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
99     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
100     private FlappingSingletonService flappingSingletonService;
101     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
102
103     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
104                                      final DOMRpcProviderService domRpcService,
105                                      final ClusterSingletonServiceProvider singletonService,
106                                      final SchemaService schemaService,
107                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
108                                      final NotificationPublishService notificationPublishService,
109                                      final NotificationService notificationService,
110                                      final DOMDataBroker domDataBroker,
111                                      final DOMDataTreeService domDataTreeService,
112                                      final DistributedShardFactory distributedShardFactory) {
113         this.rpcRegistry = rpcRegistry;
114         this.domRpcService = domRpcService;
115         this.singletonService = singletonService;
116         this.schemaService = schemaService;
117         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
118         this.notificationPublishService = notificationPublishService;
119         this.notificationService = notificationService;
120         this.domDataBroker = domDataBroker;
121         this.domDataTreeService = domDataTreeService;
122         this.distributedShardFactory = distributedShardFactory;
123
124         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
125
126         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
127     }
128
129     @Override
130     public Future<RpcResult<Void>> unregisterSingletonConstant() {
131         LOG.debug("unregister-singleton-constant");
132
133         if (getSingletonConstantRegistration == null) {
134             LOG.debug("No get-singleton-constant registration present.");
135             final RpcError rpcError = RpcResultBuilder
136                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
137             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
138             return Futures.immediateFuture(result);
139         }
140
141         try {
142             getSingletonConstantRegistration.close();
143             getSingletonConstantRegistration = null;
144
145             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
146         } catch (final Exception e) {
147             LOG.debug("There was a problem closing the singleton constant service", e);
148             final RpcError rpcError = RpcResultBuilder
149                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
150             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
151             return Futures.immediateFuture(result);
152         }
153     }
154
155     @Override
156     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
157         LOG.debug("publish-notifications, input: {}", input);
158
159         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
160                 input.getSeconds(), input.getNotificationsPerSecond());
161
162         publishNotificationsTasks.put(input.getId(), task);
163
164         task.start();
165
166         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
167     }
168
169     @Override
170     public Future<RpcResult<Void>> subscribeDtcl() {
171         return null;
172     }
173
174     @Override
175     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
176         LOG.debug("write-transactions, input: {}", input);
177
178         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
179
180         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
181         writeTransactionsHandler.start(settableFuture);
182
183         return settableFuture;
184     }
185
186     @Override
187     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
188         return null;
189     }
190
191     @Override
192     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
193         return null;
194     }
195
196     @Override
197     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
198
199         LOG.debug("subscribe-ynl, input: {}", input);
200
201         if (ynlRegistrations.containsKey(input.getId())) {
202             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
203                     "There is already ynl listener registered for this id: " + input.getId());
204             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
205         }
206
207         ynlRegistrations.put(input.getId(),
208                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
209
210         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
211     }
212
213     @Override
214     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
215         LOG.debug("remove-prefix-shard, input: {}", input);
216
217         return prefixShardHandler.onRemovePrefixShard(input);
218     }
219
220     @Override
221     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
222         return null;
223     }
224
225     @Override
226     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
227         LOG.debug("unregister-bound-constant, {}", input);
228
229         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
230                 routedRegistrations.remove(input.getContext());
231
232         if (registration == null) {
233             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
234             final RpcError rpcError = RpcResultBuilder
235                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
236             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
237             return Futures.immediateFuture(result);
238         }
239
240         registration.close();
241         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
242     }
243
244     @Override
245     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
246
247         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
248
249         if (input.getConstant() == null) {
250             final RpcError error = RpcResultBuilder.newError(
251                     ErrorType.RPC, "Invalid input.", "Constant value is null");
252             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
253         }
254
255         getSingletonConstantRegistration =
256                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
257
258         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
259     }
260
261     @Override
262     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
263         return null;
264     }
265
266     @Override
267     public Future<RpcResult<Void>> unregisterConstant() {
268
269         if (globalGetConstantRegistration == null) {
270             final RpcError rpcError = RpcResultBuilder
271                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
272             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
273             return Futures.immediateFuture(result);
274         }
275
276         globalGetConstantRegistration.close();
277         globalGetConstantRegistration = null;
278
279         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
280     }
281
282     @Override
283     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
284         LOG.debug("unregister-flapping-singleton received.");
285
286         if (flappingSingletonService == null) {
287             final RpcError rpcError = RpcResultBuilder
288                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
289             final RpcResult<UnregisterFlappingSingletonOutput> result =
290                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
291             return Futures.immediateFuture(result);
292         }
293
294         final long flapCount = flappingSingletonService.setInactive();
295         flappingSingletonService = null;
296
297         final UnregisterFlappingSingletonOutput output =
298                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
299
300         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
301     }
302
303     @Override
304     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
305         return null;
306     }
307
308     @Override
309     public Future<RpcResult<Void>> subscribeDdtl() {
310         return null;
311     }
312
313     @Override
314     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
315         LOG.debug("register-bound-constant: {}", input);
316
317         if (input.getContext() == null) {
318             final RpcError error = RpcResultBuilder.newError(
319                     ErrorType.RPC, "Invalid input.", "Context value is null");
320             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
321         }
322
323         if (input.getConstant() == null) {
324             final RpcError error = RpcResultBuilder.newError(
325                     ErrorType.RPC, "Invalid input.", "Constant value is null");
326             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
327         }
328
329         if (routedRegistrations.containsKey(input.getContext())) {
330             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
331                     "There is already a rpc registered for context: " + input.getContext());
332             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
333         }
334
335         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
336                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
337                         input.getConstant(), input.getContext());
338
339         routedRegistrations.put(input.getContext(), registration);
340         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
341     }
342
343     @Override
344     public Future<RpcResult<Void>> registerFlappingSingleton() {
345         LOG.debug("Received register-flapping-singleton.");
346
347         if (flappingSingletonService != null) {
348             final RpcError error = RpcResultBuilder.newError(
349                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
350             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
351         }
352
353         flappingSingletonService = new FlappingSingletonService(singletonService);
354
355         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
356     }
357
358     @Override
359     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
360         return null;
361     }
362
363     @Override
364     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
365         LOG.debug("create-prefix-shard, input: {}", input);
366
367         return prefixShardHandler.onCreatePrefixShard(input);
368     }
369
370     @Override
371     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
372         return null;
373     }
374
375     @Override
376     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
377         LOG.debug("Received unsubscribe-ynl, input: {}", input);
378
379         if (!ynlRegistrations.containsKey(input.getId())) {
380             final RpcError rpcError = RpcResultBuilder
381                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
382             final RpcResult<UnsubscribeYnlOutput> result =
383                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
384             return Futures.immediateFuture(result);
385         }
386
387         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
388         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
389
390         registration.close();
391
392         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
393     }
394
395     @Override
396     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
397             final CheckPublishNotificationsInput input) {
398
399         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
400
401         if (task == null) {
402             return Futures.immediateFuture(RpcResultBuilder.success(
403                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
404         }
405
406         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
407                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
408
409         if (task.getLastError() != null) {
410             final StringWriter sw = new StringWriter();
411             final PrintWriter pw = new PrintWriter(sw);
412             task.getLastError().printStackTrace(pw);
413             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
414         }
415
416         final CheckPublishNotificationsOutput output =
417                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
418
419         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
420     }
421
422     @Override
423     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
424         LOG.debug("producer-transactions, input: {}", input);
425
426         final ProduceTransactionsHandler handler =
427                 new ProduceTransactionsHandler(domDataTreeService, input);
428
429         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
430         handler.start(settableFuture);
431
432         return settableFuture;
433     }
434
435     @Override
436     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
437
438         LOG.debug("Received register-constant rpc, input: {}", input);
439
440         if (input.getConstant() == null) {
441             final RpcError error = RpcResultBuilder.newError(
442                     ErrorType.RPC, "Invalid input.", "Constant value is null");
443             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
444         }
445
446         if (globalGetConstantRegistration != null) {
447             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
448                     "There is already a get-constant rpc registered.");
449             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
450         }
451
452         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
453         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
454     }
455
456     @Override
457     public Future<RpcResult<Void>> unregisterDefaultConstant() {
458         return null;
459     }
460
461     @Override
462     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
463         return null;
464     }
465 }