Bug 8015, Bug 7800: Do not block when publishing notifications
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.SettableFuture;
13 import java.io.PrintWriter;
14 import java.io.StringWriter;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.concurrent.Future;
18 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
19 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
20 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
21 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
22 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
23 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
24 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
25 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
26 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
28 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
29 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
31 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
32 import org.opendaylight.controller.sal.core.api.model.SchemaService;
33 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput;
38 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
39 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
40 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
41 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
42 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
43 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
44 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
45 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
46 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
47 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
48 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
49 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
50 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
51 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
61 import org.opendaylight.yangtools.concepts.ListenerRegistration;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcError;
64 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
65 import org.opendaylight.yangtools.yang.common.RpcResult;
66 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
71
72     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
73
74     private final RpcProviderRegistry rpcRegistry;
75     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
76     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
77     private final DOMDataBroker domDataBroker;
78     private final NotificationPublishService notificationPublishService;
79     private final NotificationService notificationService;
80     private final SchemaService schemaService;
81     private final ClusterSingletonServiceProvider singletonService;
82     private final DOMRpcProviderService domRpcService;
83
84     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
85             new HashMap<>();
86
87     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
88
89     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
90     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
91     private FlappingSingletonService flappingSingletonService;
92     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
93
94     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
95                                      final DOMRpcProviderService domRpcService,
96                                      final ClusterSingletonServiceProvider singletonService,
97                                      final SchemaService schemaService,
98                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
99                                      final NotificationPublishService notificationPublishService,
100                                      final NotificationService notificationService,
101                                      final DOMDataBroker domDataBroker) {
102         this.rpcRegistry = rpcRegistry;
103         this.domRpcService = domRpcService;
104         this.singletonService = singletonService;
105         this.schemaService = schemaService;
106         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
107         this.notificationPublishService = notificationPublishService;
108         this.notificationService = notificationService;
109         this.domDataBroker = domDataBroker;
110
111         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
112     }
113
114     @Override
115     public Future<RpcResult<Void>> unregisterSingletonConstant() {
116         LOG.debug("unregister-singleton-constant");
117
118         if (getSingletonConstantRegistration == null) {
119             LOG.debug("No get-singleton-constant registration present.");
120             final RpcError rpcError = RpcResultBuilder
121                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
122             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
123             return Futures.immediateFuture(result);
124         }
125
126         try {
127             getSingletonConstantRegistration.close();
128             getSingletonConstantRegistration = null;
129
130             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
131         } catch (final Exception e) {
132             LOG.debug("There was a problem closing the singleton constant service", e);
133             final RpcError rpcError = RpcResultBuilder
134                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
135             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
136             return Futures.immediateFuture(result);
137         }
138     }
139
140     @Override
141     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
142         LOG.debug("publish-notifications, input: {}", input);
143
144         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
145                 input.getSeconds(), input.getNotificationsPerSecond());
146
147         publishNotificationsTasks.put(input.getId(), task);
148
149         task.start();
150
151         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
152     }
153
154     @Override
155     public Future<RpcResult<Void>> subscribeDtcl() {
156         return null;
157     }
158
159     @Override
160     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
161         LOG.debug("write-transactions, input: {}", input);
162
163         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
164
165         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
166         writeTransactionsHandler.start(settableFuture);
167
168         return settableFuture;
169     }
170
171     @Override
172     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
173         return null;
174     }
175
176     @Override
177     public Future<RpcResult<Void>> becomeModuleLeader(BecomeModuleLeaderInput input) {
178         return null;
179     }
180
181     @Override
182     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
183         return null;
184     }
185
186     @Override
187     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
188
189         LOG.debug("subscribe-ynl, input: {}", input);
190
191         if (ynlRegistrations.containsKey(input.getId())) {
192             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
193                     "There is already ynl listener registered for this id: " + input.getId());
194             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
195         }
196
197         ynlRegistrations.put(input.getId(),
198                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
199
200         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
201     }
202
203     @Override
204     public Future<RpcResult<Void>> becomePrefixLeader(BecomePrefixLeaderInput input) {
205         return null;
206     }
207
208     @Override
209     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
210         LOG.debug("unregister-bound-constant, {}", input);
211
212         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
213                 routedRegistrations.remove(input.getContext());
214
215         if (registration == null) {
216             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
217             final RpcError rpcError = RpcResultBuilder
218                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
219             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
220             return Futures.immediateFuture(result);
221         }
222
223         registration.close();
224         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
225     }
226
227     @Override
228     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
229
230         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
231
232         if (input.getConstant() == null) {
233             final RpcError error = RpcResultBuilder.newError(
234                     ErrorType.RPC, "Invalid input.", "Constant value is null");
235             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
236         }
237
238         getSingletonConstantRegistration =
239                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
240
241         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
242     }
243
244     @Override
245     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
246         return null;
247     }
248
249     @Override
250     public Future<RpcResult<Void>> unregisterConstant() {
251
252         if (globalGetConstantRegistration == null) {
253             final RpcError rpcError = RpcResultBuilder
254                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
255             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
256             return Futures.immediateFuture(result);
257         }
258
259         globalGetConstantRegistration.close();
260         globalGetConstantRegistration = null;
261
262         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
263     }
264
265     @Override
266     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
267         LOG.debug("unregister-flapping-singleton received.");
268
269         if (flappingSingletonService == null) {
270             final RpcError rpcError = RpcResultBuilder
271                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
272             final RpcResult<UnregisterFlappingSingletonOutput> result =
273                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
274             return Futures.immediateFuture(result);
275         }
276
277         final long flapCount = flappingSingletonService.setInactive();
278         flappingSingletonService = null;
279
280         final UnregisterFlappingSingletonOutput output =
281                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
282
283         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
284     }
285
286     @Override
287     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
288         return null;
289     }
290
291     @Override
292     public Future<RpcResult<Void>> subscribeDdtl() {
293         return null;
294     }
295
296     @Override
297     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
298         LOG.debug("register-bound-constant: {}", input);
299
300         if (input.getContext() == null) {
301             final RpcError error = RpcResultBuilder.newError(
302                     ErrorType.RPC, "Invalid input.", "Context value is null");
303             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
304         }
305
306         if (input.getConstant() == null) {
307             final RpcError error = RpcResultBuilder.newError(
308                     ErrorType.RPC, "Invalid input.", "Constant value is null");
309             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
310         }
311
312         if (routedRegistrations.containsKey(input.getContext())) {
313             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
314                     "There is already a rpc registered for context: " + input.getContext());
315             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
316         }
317
318         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
319                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
320                         input.getConstant(), input.getContext());
321
322         routedRegistrations.put(input.getContext(), registration);
323         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
324     }
325
326     @Override
327     public Future<RpcResult<Void>> registerFlappingSingleton() {
328         LOG.debug("Received register-flapping-singleton.");
329
330         if (flappingSingletonService != null) {
331             final RpcError error = RpcResultBuilder.newError(
332                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
333             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
334         }
335
336         flappingSingletonService = new FlappingSingletonService(singletonService);
337
338         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
339     }
340
341     @Override
342     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
343         return null;
344     }
345
346     @Override
347     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
348         return null;
349     }
350
351     @Override
352     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
353         LOG.debug("Received unsubscribe-ynl, input: {}", input);
354
355         if (!ynlRegistrations.containsKey(input.getId())) {
356             final RpcError rpcError = RpcResultBuilder
357                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
358             final RpcResult<UnsubscribeYnlOutput> result =
359                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
360             return Futures.immediateFuture(result);
361         }
362
363         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
364         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
365
366         registration.close();
367
368         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
369     }
370
371     @Override
372     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
373             final CheckPublishNotificationsInput input) {
374
375         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
376
377         if (task == null) {
378             return Futures.immediateFuture(RpcResultBuilder.success(
379                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
380         }
381
382         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
383                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
384
385         if (task.getLastError() != null) {
386             final StringWriter sw = new StringWriter();
387             final PrintWriter pw = new PrintWriter(sw);
388             task.getLastError().printStackTrace(pw);
389             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
390         }
391
392         final CheckPublishNotificationsOutput output =
393                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
394
395         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
396     }
397
398     @Override
399     public Future<RpcResult<Void>> produceTransactions(ProduceTransactionsInput input) {
400         return null;
401     }
402
403     @Override
404     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
405
406         LOG.debug("Received register-constant rpc, input: {}", input);
407
408         if (input.getConstant() == null) {
409             final RpcError error = RpcResultBuilder.newError(
410                     ErrorType.RPC, "Invalid input.", "Constant value is null");
411             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
412         }
413
414         if (globalGetConstantRegistration != null) {
415             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
416                     "There is already a get-constant rpc registered.");
417             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
418         }
419
420         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
421         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
422     }
423
424     @Override
425     public Future<RpcResult<Void>> unregisterDefaultConstant() {
426         return null;
427     }
428
429     @Override
430     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
431         return null;
432     }
433 }