2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.controller.cluster.ActorSystemProvider;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
38 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
43 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
44 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
45 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
46 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
47 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
49 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
51 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
52 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
53 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
54 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
56 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
62 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
63 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
64 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
65 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
66 import org.opendaylight.controller.sal.core.api.model.SchemaService;
67 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
72 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
73 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
105 import org.opendaylight.yangtools.concepts.ListenerRegistration;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
109 import org.opendaylight.yangtools.yang.common.RpcResult;
110 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
111 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.slf4j.Logger;
114 import org.slf4j.LoggerFactory;
115 import scala.concurrent.duration.FiniteDuration;
117 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
119 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
120 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
121 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
123 private final RpcProviderRegistry rpcRegistry;
124 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
125 private final DistributedShardFactory distributedShardFactory;
126 private final DistributedDataStoreInterface configDataStore;
127 private final DOMDataTreeService domDataTreeService;
128 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
129 private final DOMDataBroker domDataBroker;
130 private final NotificationPublishService notificationPublishService;
131 private final NotificationService notificationService;
132 private final SchemaService schemaService;
133 private final ClusterSingletonServiceProvider singletonService;
134 private final DOMRpcProviderService domRpcService;
135 private final PrefixLeaderHandler prefixLeaderHandler;
136 private final PrefixShardHandler prefixShardHandler;
137 private final DOMDataTreeChangeService domDataTreeChangeService;
138 private final ActorSystem actorSystem;
140 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
143 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
145 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
146 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
147 private FlappingSingletonService flappingSingletonService;
148 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
149 private IdIntsListener idIntsListener;
150 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
151 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
152 private IdIntsDOMDataTreeLIstener idIntsDdtl;
156 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
157 final DOMRpcProviderService domRpcService,
158 final ClusterSingletonServiceProvider singletonService,
159 final SchemaService schemaService,
160 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
161 final NotificationPublishService notificationPublishService,
162 final NotificationService notificationService,
163 final DOMDataBroker domDataBroker,
164 final DOMDataTreeService domDataTreeService,
165 final DistributedShardFactory distributedShardFactory,
166 final DistributedDataStoreInterface configDataStore,
167 final ActorSystemProvider actorSystemProvider) {
168 this.rpcRegistry = rpcRegistry;
169 this.domRpcService = domRpcService;
170 this.singletonService = singletonService;
171 this.schemaService = schemaService;
172 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
173 this.notificationPublishService = notificationPublishService;
174 this.notificationService = notificationService;
175 this.domDataBroker = domDataBroker;
176 this.domDataTreeService = domDataTreeService;
177 this.distributedShardFactory = distributedShardFactory;
178 this.configDataStore = configDataStore;
179 this.actorSystem = actorSystemProvider.getActorSystem();
181 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
183 domDataTreeChangeService =
184 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
186 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
188 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
189 bindingNormalizedNodeSerializer);
193 public Future<RpcResult<Void>> unregisterSingletonConstant() {
194 LOG.debug("unregister-singleton-constant");
196 if (getSingletonConstantRegistration == null) {
197 LOG.debug("No get-singleton-constant registration present.");
198 final RpcError rpcError = RpcResultBuilder
199 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
200 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
201 return Futures.immediateFuture(result);
205 getSingletonConstantRegistration.close();
206 getSingletonConstantRegistration = null;
208 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
209 } catch (final Exception e) {
210 LOG.debug("There was a problem closing the singleton constant service", e);
211 final RpcError rpcError = RpcResultBuilder
212 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
213 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
214 return Futures.immediateFuture(result);
219 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
220 LOG.debug("publish-notifications, input: {}", input);
222 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
223 input.getSeconds(), input.getNotificationsPerSecond());
225 publishNotificationsTasks.put(input.getId(), task);
229 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
233 public Future<RpcResult<Void>> subscribeDtcl() {
235 if (dtclReg != null) {
236 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
237 "There is already dataTreeChangeListener registered on id-ints list.");
238 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
241 idIntsListener = new IdIntsListener();
243 dtclReg = domDataTreeChangeService
244 .registerDataTreeChangeListener(
245 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
246 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
249 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
253 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
254 LOG.debug("write-transactions, input: {}", input);
256 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
258 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
259 writeTransactionsHandler.start(settableFuture);
261 return settableFuture;
265 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
270 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
275 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
277 LOG.debug("subscribe-ynl, input: {}", input);
279 if (ynlRegistrations.containsKey(input.getId())) {
280 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
281 "There is already ynl listener registered for this id: " + input.getId());
282 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
285 ynlRegistrations.put(input.getId(),
286 notificationService.registerNotificationListener(new YnlListener(input.getId())));
288 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
292 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
293 LOG.debug("remove-prefix-shard, input: {}", input);
295 return prefixShardHandler.onRemovePrefixShard(input);
299 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
300 LOG.debug("become-prefix-leader, input: {}", input);
302 return prefixLeaderHandler.makeLeaderLocal(input);
306 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
307 LOG.debug("unregister-bound-constant, {}", input);
309 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
310 routedRegistrations.remove(input.getContext());
312 if (registration == null) {
313 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
314 final RpcError rpcError = RpcResultBuilder
315 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
316 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
317 return Futures.immediateFuture(result);
320 registration.close();
321 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
325 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
327 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
329 if (input.getConstant() == null) {
330 final RpcError error = RpcResultBuilder.newError(
331 ErrorType.RPC, "Invalid input.", "Constant value is null");
332 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
335 getSingletonConstantRegistration =
336 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
338 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
342 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
347 public Future<RpcResult<Void>> unregisterConstant() {
349 if (globalGetConstantRegistration == null) {
350 final RpcError rpcError = RpcResultBuilder
351 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
352 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
353 return Futures.immediateFuture(result);
356 globalGetConstantRegistration.close();
357 globalGetConstantRegistration = null;
359 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
363 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
364 LOG.debug("unregister-flapping-singleton received.");
366 if (flappingSingletonService == null) {
367 final RpcError rpcError = RpcResultBuilder
368 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
369 final RpcResult<UnregisterFlappingSingletonOutput> result =
370 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
371 return Futures.immediateFuture(result);
374 final long flapCount = flappingSingletonService.setInactive();
375 flappingSingletonService = null;
377 final UnregisterFlappingSingletonOutput output =
378 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
380 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
384 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
389 public Future<RpcResult<Void>> subscribeDdtl() {
391 if (ddtlReg != null) {
392 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
393 "There is already dataTreeChangeListener registered on id-ints list.");
394 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
397 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
401 domDataTreeService.registerListener(idIntsDdtl,
402 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
403 ProduceTransactionsHandler.ID_INT_YID))
404 , true, Collections.emptyList());
405 } catch (DOMDataTreeLoopException e) {
406 LOG.error("Failed to register DOMDataTreeListener.", e);
410 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
414 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
415 LOG.debug("register-bound-constant: {}", input);
417 if (input.getContext() == null) {
418 final RpcError error = RpcResultBuilder.newError(
419 ErrorType.RPC, "Invalid input.", "Context value is null");
420 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
423 if (input.getConstant() == null) {
424 final RpcError error = RpcResultBuilder.newError(
425 ErrorType.RPC, "Invalid input.", "Constant value is null");
426 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
429 if (routedRegistrations.containsKey(input.getContext())) {
430 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
431 "There is already a rpc registered for context: " + input.getContext());
432 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
435 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
436 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
437 input.getConstant(), input.getContext());
439 routedRegistrations.put(input.getContext(), registration);
440 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
444 public Future<RpcResult<Void>> registerFlappingSingleton() {
445 LOG.debug("Received register-flapping-singleton.");
447 if (flappingSingletonService != null) {
448 final RpcError error = RpcResultBuilder.newError(
449 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
450 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
453 flappingSingletonService = new FlappingSingletonService(singletonService);
455 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
459 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
460 LOG.debug("Received unsubscribe-dtcl");
462 if (idIntsListener == null || dtclReg == null) {
463 final RpcError error = RpcResultBuilder.newError(
464 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
465 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
469 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
470 } catch (InterruptedException | ExecutionException | TimeoutException e) {
471 final RpcError error = RpcResultBuilder.newError(
472 ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
473 "clustering-it", "clustering-it", e);
474 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
475 .withRpcError(error).build());
481 if (!idIntsListener.hasTriggered()) {
482 final RpcError error = RpcResultBuilder.newError(
483 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
484 "any notifications.");
485 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
486 .withRpcError(error).build());
489 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
491 final Optional<NormalizedNode<?, ?>> readResult =
492 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
494 if (!readResult.isPresent()) {
495 final RpcError error = RpcResultBuilder.newError(
496 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
497 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
498 .withRpcError(error).build());
501 return Futures.immediateFuture(
502 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
503 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
505 } catch (final ReadFailedException e) {
506 final RpcError error = RpcResultBuilder.newError(
507 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
508 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
509 .withRpcError(error).build());
515 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
516 LOG.debug("create-prefix-shard, input: {}", input);
518 return prefixShardHandler.onCreatePrefixShard(input);
522 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
527 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
528 LOG.debug("Received unsubscribe-ynl, input: {}", input);
530 if (!ynlRegistrations.containsKey(input.getId())) {
531 final RpcError rpcError = RpcResultBuilder
532 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
533 final RpcResult<UnsubscribeYnlOutput> result =
534 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
535 return Futures.immediateFuture(result);
538 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
539 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
541 registration.close();
543 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
547 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
548 final CheckPublishNotificationsInput input) {
550 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
553 return Futures.immediateFuture(RpcResultBuilder.success(
554 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
557 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
558 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
560 if (task.getLastError() != null) {
561 final StringWriter sw = new StringWriter();
562 final PrintWriter pw = new PrintWriter(sw);
563 task.getLastError().printStackTrace(pw);
564 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
567 final CheckPublishNotificationsOutput output =
568 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
570 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
574 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
575 LOG.debug("producer-transactions, input: {}", input);
577 final ProduceTransactionsHandler handler =
578 new ProduceTransactionsHandler(domDataTreeService, input);
580 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
581 handler.start(settableFuture);
583 return settableFuture;
587 public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
588 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
590 final String shardName = input.getShardName();
591 if (Strings.isNullOrEmpty(shardName)) {
592 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
593 "A valid shard name must be specified");
594 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
597 return shutdownShardGracefully(shardName);
601 public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
602 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
604 final InstanceIdentifier shardPrefix = input.getPrefix();
606 if (shardPrefix == null) {
607 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
608 "A valid shard prefix must be specified");
609 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
612 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
613 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
615 return shutdownShardGracefully(cleanPrefixShardName);
618 private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
619 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
620 final ActorContext context = configDataStore.getActorContext();
622 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
623 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
624 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
625 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
627 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
629 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
630 if (throwable != null) {
631 shutdownShardAsk.failure(throwable);
633 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
636 }, context.getClientDispatcher());
638 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
640 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
641 if (throwable != null) {
642 final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
643 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
644 rpcResult.set(failedResult);
646 // according to Patterns.gracefulStop API, we don't have to
647 // check value of gracefulStopResult
648 rpcResult.set(RpcResultBuilder.<Void>success().build());
651 }, context.getClientDispatcher());
656 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
658 LOG.debug("Received register-constant rpc, input: {}", input);
660 if (input.getConstant() == null) {
661 final RpcError error = RpcResultBuilder.newError(
662 ErrorType.RPC, "Invalid input.", "Constant value is null");
663 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
666 if (globalGetConstantRegistration != null) {
667 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
668 "There is already a get-constant rpc registered.");
669 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
672 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
673 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
677 public Future<RpcResult<Void>> unregisterDefaultConstant() {
682 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
683 LOG.debug("Received unsubscribe-ddtl.");
685 if (idIntsDdtl == null || ddtlReg == null) {
686 final RpcError error = RpcResultBuilder.newError(
687 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
688 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
692 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
693 } catch (InterruptedException | ExecutionException | TimeoutException e) {
694 final RpcError error = RpcResultBuilder.newError(
695 ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
696 "clustering-it", "clustering-it", e);
697 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
698 .withRpcError(error).build());
704 if (!idIntsDdtl.hasTriggered()) {
705 final RpcError error = RpcResultBuilder.newError(
706 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
707 "any notifications.");
708 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
709 .withRpcError(error).build());
712 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
713 LOG.debug("Creating distributed datastore client for shard {}", shardName);
715 final ActorContext actorContext = configDataStore.getActorContext();
716 final Props distributedDataStoreClientProps =
717 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
718 "Shard-" + shardName, actorContext, shardName);
720 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
721 final DataStoreClient distributedDataStoreClient;
723 distributedDataStoreClient = SimpleDataStoreClientActor
724 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
725 } catch (final Exception e) {
726 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
727 clientActor.tell(PoisonPill.getInstance(), noSender());
728 final RpcError error = RpcResultBuilder.newError(
729 ErrorType.APPLICATION, "Unable to create ds client for read.",
730 "Unable to create ds client for read.");
731 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
732 .withRpcError(error).build());
735 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
736 final ClientTransaction tx = localHistory.createTransaction();
737 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
738 org.opendaylight.mdsal.common.api.ReadFailedException> read =
739 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
742 localHistory.close();
744 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
745 if (!optional.isPresent()) {
746 LOG.warn("Final read from client is empty.");
747 final RpcError error = RpcResultBuilder.newError(
748 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
749 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
750 .withRpcError(error).build());
753 return Futures.immediateFuture(
754 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
755 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
757 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
758 LOG.error("Unable to read data to verify ddtl data.", e);
759 final RpcError error = RpcResultBuilder.newError(
760 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
761 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
762 .withRpcError(error).build());
764 distributedDataStoreClient.close();
765 clientActor.tell(PoisonPill.getInstance(), noSender());