2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.SettableFuture;
13 import java.io.PrintWriter;
14 import java.io.StringWriter;
15 import java.util.HashMap;
17 import java.util.concurrent.Future;
18 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
19 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
20 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
21 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
22 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
23 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
24 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
25 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
26 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
27 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
28 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
34 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
35 import org.opendaylight.controller.sal.core.api.model.SchemaService;
36 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
37 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
40 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
41 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
42 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
43 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
44 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
45 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
46 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
47 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
48 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
49 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
50 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
51 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
67 import org.opendaylight.yangtools.concepts.ListenerRegistration;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.RpcError;
70 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
71 import org.opendaylight.yangtools.yang.common.RpcResult;
72 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
76 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
78 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
80 private final RpcProviderRegistry rpcRegistry;
81 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
82 private final DistributedShardFactory distributedShardFactory;
83 private final DOMDataTreeService domDataTreeService;
84 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
85 private final DOMDataBroker domDataBroker;
86 private final NotificationPublishService notificationPublishService;
87 private final NotificationService notificationService;
88 private final SchemaService schemaService;
89 private final ClusterSingletonServiceProvider singletonService;
90 private final DOMRpcProviderService domRpcService;
91 private final PrefixShardHandler prefixShardHandler;
93 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
96 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
98 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
99 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
100 private FlappingSingletonService flappingSingletonService;
101 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
103 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
104 final DOMRpcProviderService domRpcService,
105 final ClusterSingletonServiceProvider singletonService,
106 final SchemaService schemaService,
107 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
108 final NotificationPublishService notificationPublishService,
109 final NotificationService notificationService,
110 final DOMDataBroker domDataBroker,
111 final DOMDataTreeService domDataTreeService,
112 final DistributedShardFactory distributedShardFactory) {
113 this.rpcRegistry = rpcRegistry;
114 this.domRpcService = domRpcService;
115 this.singletonService = singletonService;
116 this.schemaService = schemaService;
117 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
118 this.notificationPublishService = notificationPublishService;
119 this.notificationService = notificationService;
120 this.domDataBroker = domDataBroker;
121 this.domDataTreeService = domDataTreeService;
122 this.distributedShardFactory = distributedShardFactory;
124 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
126 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
130 public Future<RpcResult<Void>> unregisterSingletonConstant() {
131 LOG.debug("unregister-singleton-constant");
133 if (getSingletonConstantRegistration == null) {
134 LOG.debug("No get-singleton-constant registration present.");
135 final RpcError rpcError = RpcResultBuilder
136 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
137 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
138 return Futures.immediateFuture(result);
142 getSingletonConstantRegistration.close();
143 getSingletonConstantRegistration = null;
145 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
146 } catch (final Exception e) {
147 LOG.debug("There was a problem closing the singleton constant service", e);
148 final RpcError rpcError = RpcResultBuilder
149 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
150 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
151 return Futures.immediateFuture(result);
156 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
157 LOG.debug("publish-notifications, input: {}", input);
159 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
160 input.getSeconds(), input.getNotificationsPerSecond());
162 publishNotificationsTasks.put(input.getId(), task);
166 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
170 public Future<RpcResult<Void>> subscribeDtcl() {
175 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
176 LOG.debug("write-transactions, input: {}", input);
178 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
180 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
181 writeTransactionsHandler.start(settableFuture);
183 return settableFuture;
187 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
192 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
197 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
199 LOG.debug("subscribe-ynl, input: {}", input);
201 if (ynlRegistrations.containsKey(input.getId())) {
202 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
203 "There is already ynl listener registered for this id: " + input.getId());
204 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
207 ynlRegistrations.put(input.getId(),
208 notificationService.registerNotificationListener(new YnlListener(input.getId())));
210 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
214 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
215 LOG.debug("remove-prefix-shard, input: {}", input);
217 return prefixShardHandler.onRemovePrefixShard(input);
221 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
226 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
227 LOG.debug("unregister-bound-constant, {}", input);
229 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
230 routedRegistrations.remove(input.getContext());
232 if (registration == null) {
233 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
234 final RpcError rpcError = RpcResultBuilder
235 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
236 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
237 return Futures.immediateFuture(result);
240 registration.close();
241 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
245 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
247 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
249 if (input.getConstant() == null) {
250 final RpcError error = RpcResultBuilder.newError(
251 ErrorType.RPC, "Invalid input.", "Constant value is null");
252 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
255 getSingletonConstantRegistration =
256 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
258 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
262 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
267 public Future<RpcResult<Void>> unregisterConstant() {
269 if (globalGetConstantRegistration == null) {
270 final RpcError rpcError = RpcResultBuilder
271 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
272 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
273 return Futures.immediateFuture(result);
276 globalGetConstantRegistration.close();
277 globalGetConstantRegistration = null;
279 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
283 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
284 LOG.debug("unregister-flapping-singleton received.");
286 if (flappingSingletonService == null) {
287 final RpcError rpcError = RpcResultBuilder
288 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
289 final RpcResult<UnregisterFlappingSingletonOutput> result =
290 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
291 return Futures.immediateFuture(result);
294 final long flapCount = flappingSingletonService.setInactive();
295 flappingSingletonService = null;
297 final UnregisterFlappingSingletonOutput output =
298 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
300 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
304 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
309 public Future<RpcResult<Void>> subscribeDdtl() {
314 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
315 LOG.debug("register-bound-constant: {}", input);
317 if (input.getContext() == null) {
318 final RpcError error = RpcResultBuilder.newError(
319 ErrorType.RPC, "Invalid input.", "Context value is null");
320 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
323 if (input.getConstant() == null) {
324 final RpcError error = RpcResultBuilder.newError(
325 ErrorType.RPC, "Invalid input.", "Constant value is null");
326 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
329 if (routedRegistrations.containsKey(input.getContext())) {
330 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
331 "There is already a rpc registered for context: " + input.getContext());
332 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
335 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
336 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
337 input.getConstant(), input.getContext());
339 routedRegistrations.put(input.getContext(), registration);
340 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
344 public Future<RpcResult<Void>> registerFlappingSingleton() {
345 LOG.debug("Received register-flapping-singleton.");
347 if (flappingSingletonService != null) {
348 final RpcError error = RpcResultBuilder.newError(
349 ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
350 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
353 flappingSingletonService = new FlappingSingletonService(singletonService);
355 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
359 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
364 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
365 LOG.debug("create-prefix-shard, input: {}", input);
367 return prefixShardHandler.onCreatePrefixShard(input);
371 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
376 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
377 LOG.debug("Received unsubscribe-ynl, input: {}", input);
379 if (!ynlRegistrations.containsKey(input.getId())) {
380 final RpcError rpcError = RpcResultBuilder
381 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
382 final RpcResult<UnsubscribeYnlOutput> result =
383 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
384 return Futures.immediateFuture(result);
387 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
388 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
390 registration.close();
392 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
396 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
397 final CheckPublishNotificationsInput input) {
399 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
402 return Futures.immediateFuture(RpcResultBuilder.success(
403 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
406 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
407 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
409 if (task.getLastError() != null) {
410 final StringWriter sw = new StringWriter();
411 final PrintWriter pw = new PrintWriter(sw);
412 task.getLastError().printStackTrace(pw);
413 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
416 final CheckPublishNotificationsOutput output =
417 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
419 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
423 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
424 LOG.debug("producer-transactions, input: {}", input);
426 final ProduceTransactionsHandler handler =
427 new ProduceTransactionsHandler(domDataTreeService, input);
429 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
430 handler.start(settableFuture);
432 return settableFuture;
436 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
438 LOG.debug("Received register-constant rpc, input: {}", input);
440 if (input.getConstant() == null) {
441 final RpcError error = RpcResultBuilder.newError(
442 ErrorType.RPC, "Invalid input.", "Constant value is null");
443 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
446 if (globalGetConstantRegistration != null) {
447 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
448 "There is already a get-constant rpc registered.");
449 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
452 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
453 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
457 public Future<RpcResult<Void>> unregisterDefaultConstant() {
462 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {