Bug 7802 : Implement agent RPCs for transaction producer testing
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.SettableFuture;
13 import java.io.PrintWriter;
14 import java.io.StringWriter;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.concurrent.Future;
18 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
19 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
20 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
21 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
22 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
23 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
24 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
25 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
26 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
27 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
28 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
30 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.controller.sal.core.api.model.SchemaService;
35 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
36 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
39 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
40 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput;
41 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
42 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
43 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
44 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
45 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
46 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
47 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
48 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
49 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
50 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
51 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.opendaylight.yangtools.yang.common.RpcError;
68 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
75
76     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
77
78     private final RpcProviderRegistry rpcRegistry;
79     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
80     private final DistributedShardFactory distributedShardFactory;
81     private final DOMDataTreeService domDataTreeService;
82     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
83     private final DOMDataBroker domDataBroker;
84     private final NotificationPublishService notificationPublishService;
85     private final NotificationService notificationService;
86     private final SchemaService schemaService;
87     private final ClusterSingletonServiceProvider singletonService;
88     private final DOMRpcProviderService domRpcService;
89
90     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
91             new HashMap<>();
92
93     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
94
95     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
96     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
97     private FlappingSingletonService flappingSingletonService;
98     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
99
100     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
101                                      final DOMRpcProviderService domRpcService,
102                                      final ClusterSingletonServiceProvider singletonService,
103                                      final SchemaService schemaService,
104                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
105                                      final NotificationPublishService notificationPublishService,
106                                      final NotificationService notificationService,
107                                      final DOMDataBroker domDataBroker,
108                                      final DOMDataTreeService domDataTreeService,
109                                      final DistributedShardFactory distributedShardFactory) {
110         this.rpcRegistry = rpcRegistry;
111         this.domRpcService = domRpcService;
112         this.singletonService = singletonService;
113         this.schemaService = schemaService;
114         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
115         this.notificationPublishService = notificationPublishService;
116         this.notificationService = notificationService;
117         this.domDataBroker = domDataBroker;
118         this.domDataTreeService = domDataTreeService;
119         this.distributedShardFactory = distributedShardFactory;
120
121         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
122     }
123
124     @Override
125     public Future<RpcResult<Void>> unregisterSingletonConstant() {
126         LOG.debug("unregister-singleton-constant");
127
128         if (getSingletonConstantRegistration == null) {
129             LOG.debug("No get-singleton-constant registration present.");
130             final RpcError rpcError = RpcResultBuilder
131                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
132             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
133             return Futures.immediateFuture(result);
134         }
135
136         try {
137             getSingletonConstantRegistration.close();
138             getSingletonConstantRegistration = null;
139
140             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
141         } catch (final Exception e) {
142             LOG.debug("There was a problem closing the singleton constant service", e);
143             final RpcError rpcError = RpcResultBuilder
144                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
145             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
146             return Futures.immediateFuture(result);
147         }
148     }
149
150     @Override
151     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
152         LOG.debug("publish-notifications, input: {}", input);
153
154         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
155                 input.getSeconds(), input.getNotificationsPerSecond());
156
157         publishNotificationsTasks.put(input.getId(), task);
158
159         task.start();
160
161         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
162     }
163
164     @Override
165     public Future<RpcResult<Void>> subscribeDtcl() {
166         return null;
167     }
168
169     @Override
170     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
171         LOG.debug("write-transactions, input: {}", input);
172
173         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
174
175         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
176         writeTransactionsHandler.start(settableFuture);
177
178         return settableFuture;
179     }
180
181     @Override
182     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
183         return null;
184     }
185
186     @Override
187     public Future<RpcResult<Void>> becomeModuleLeader(BecomeModuleLeaderInput input) {
188         return null;
189     }
190
191     @Override
192     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
193         return null;
194     }
195
196     @Override
197     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
198
199         LOG.debug("subscribe-ynl, input: {}", input);
200
201         if (ynlRegistrations.containsKey(input.getId())) {
202             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
203                     "There is already ynl listener registered for this id: " + input.getId());
204             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
205         }
206
207         ynlRegistrations.put(input.getId(),
208                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
209
210         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
211     }
212
213     @Override
214     public Future<RpcResult<Void>> becomePrefixLeader(BecomePrefixLeaderInput input) {
215         return null;
216     }
217
218     @Override
219     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
220         LOG.debug("unregister-bound-constant, {}", input);
221
222         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
223                 routedRegistrations.remove(input.getContext());
224
225         if (registration == null) {
226             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
227             final RpcError rpcError = RpcResultBuilder
228                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
229             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
230             return Futures.immediateFuture(result);
231         }
232
233         registration.close();
234         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
235     }
236
237     @Override
238     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
239
240         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
241
242         if (input.getConstant() == null) {
243             final RpcError error = RpcResultBuilder.newError(
244                     ErrorType.RPC, "Invalid input.", "Constant value is null");
245             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
246         }
247
248         getSingletonConstantRegistration =
249                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
250
251         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
252     }
253
254     @Override
255     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
256         return null;
257     }
258
259     @Override
260     public Future<RpcResult<Void>> unregisterConstant() {
261
262         if (globalGetConstantRegistration == null) {
263             final RpcError rpcError = RpcResultBuilder
264                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
265             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
266             return Futures.immediateFuture(result);
267         }
268
269         globalGetConstantRegistration.close();
270         globalGetConstantRegistration = null;
271
272         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
273     }
274
275     @Override
276     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
277         LOG.debug("unregister-flapping-singleton received.");
278
279         if (flappingSingletonService == null) {
280             final RpcError rpcError = RpcResultBuilder
281                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
282             final RpcResult<UnregisterFlappingSingletonOutput> result =
283                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
284             return Futures.immediateFuture(result);
285         }
286
287         final long flapCount = flappingSingletonService.setInactive();
288         flappingSingletonService = null;
289
290         final UnregisterFlappingSingletonOutput output =
291                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
292
293         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
294     }
295
296     @Override
297     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
298         return null;
299     }
300
301     @Override
302     public Future<RpcResult<Void>> subscribeDdtl() {
303         return null;
304     }
305
306     @Override
307     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
308         LOG.debug("register-bound-constant: {}", input);
309
310         if (input.getContext() == null) {
311             final RpcError error = RpcResultBuilder.newError(
312                     ErrorType.RPC, "Invalid input.", "Context value is null");
313             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
314         }
315
316         if (input.getConstant() == null) {
317             final RpcError error = RpcResultBuilder.newError(
318                     ErrorType.RPC, "Invalid input.", "Constant value is null");
319             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
320         }
321
322         if (routedRegistrations.containsKey(input.getContext())) {
323             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
324                     "There is already a rpc registered for context: " + input.getContext());
325             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
326         }
327
328         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
329                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
330                         input.getConstant(), input.getContext());
331
332         routedRegistrations.put(input.getContext(), registration);
333         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
334     }
335
336     @Override
337     public Future<RpcResult<Void>> registerFlappingSingleton() {
338         LOG.debug("Received register-flapping-singleton.");
339
340         if (flappingSingletonService != null) {
341             final RpcError error = RpcResultBuilder.newError(
342                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
343             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
344         }
345
346         flappingSingletonService = new FlappingSingletonService(singletonService);
347
348         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
349     }
350
351     @Override
352     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
353         return null;
354     }
355
356     @Override
357     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
358         return null;
359     }
360
361     @Override
362     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
363         LOG.debug("Received unsubscribe-ynl, input: {}", input);
364
365         if (!ynlRegistrations.containsKey(input.getId())) {
366             final RpcError rpcError = RpcResultBuilder
367                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
368             final RpcResult<UnsubscribeYnlOutput> result =
369                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
370             return Futures.immediateFuture(result);
371         }
372
373         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
374         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
375
376         registration.close();
377
378         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
379     }
380
381     @Override
382     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
383             final CheckPublishNotificationsInput input) {
384
385         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
386
387         if (task == null) {
388             return Futures.immediateFuture(RpcResultBuilder.success(
389                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
390         }
391
392         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
393                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
394
395         if (task.getLastError() != null) {
396             final StringWriter sw = new StringWriter();
397             final PrintWriter pw = new PrintWriter(sw);
398             task.getLastError().printStackTrace(pw);
399             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
400         }
401
402         final CheckPublishNotificationsOutput output =
403                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
404
405         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
406     }
407
408     @Override
409     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
410         LOG.debug("producer-transactions, input: {}", input);
411
412         final ProduceTransactionsHandler handler =
413                 new ProduceTransactionsHandler(domDataTreeService, input);
414
415         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
416         handler.start(settableFuture);
417
418         return settableFuture;
419     }
420
421     @Override
422     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
423
424         LOG.debug("Received register-constant rpc, input: {}", input);
425
426         if (input.getConstant() == null) {
427             final RpcError error = RpcResultBuilder.newError(
428                     ErrorType.RPC, "Invalid input.", "Constant value is null");
429             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
430         }
431
432         if (globalGetConstantRegistration != null) {
433             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
434                     "There is already a get-constant rpc registered.");
435             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
436         }
437
438         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
439         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
440     }
441
442     @Override
443     public Future<RpcResult<Void>> unregisterDefaultConstant() {
444         return null;
445     }
446
447     @Override
448     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
449         return null;
450     }
451 }