2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.controller.cluster.ActorSystemProvider;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
38 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
43 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
44 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
45 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
46 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
47 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
49 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
51 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
52 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
53 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
54 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
56 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
62 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
63 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
64 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
65 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
66 import org.opendaylight.controller.sal.core.api.model.SchemaService;
67 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
72 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
73 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
105 import org.opendaylight.yangtools.concepts.ListenerRegistration;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
109 import org.opendaylight.yangtools.yang.common.RpcResult;
110 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
111 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.slf4j.Logger;
114 import org.slf4j.LoggerFactory;
115 import scala.concurrent.duration.FiniteDuration;
117 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
119 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
120 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
121 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
123 private final RpcProviderRegistry rpcRegistry;
124 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
125 private final DistributedShardFactory distributedShardFactory;
126 private final DistributedDataStoreInterface configDataStore;
127 private final DOMDataTreeService domDataTreeService;
128 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
129 private final DOMDataBroker domDataBroker;
130 private final NotificationPublishService notificationPublishService;
131 private final NotificationService notificationService;
132 private final SchemaService schemaService;
133 private final ClusterSingletonServiceProvider singletonService;
134 private final DOMRpcProviderService domRpcService;
135 private final PrefixLeaderHandler prefixLeaderHandler;
136 private final PrefixShardHandler prefixShardHandler;
137 private final DOMDataTreeChangeService domDataTreeChangeService;
138 private final ActorSystem actorSystem;
140 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
143 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
145 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
146 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
147 private FlappingSingletonService flappingSingletonService;
148 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
149 private IdIntsListener idIntsListener;
150 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
151 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
152 private IdIntsDOMDataTreeLIstener idIntsDdtl;
156 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
157 final DOMRpcProviderService domRpcService,
158 final ClusterSingletonServiceProvider singletonService,
159 final SchemaService schemaService,
160 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
161 final NotificationPublishService notificationPublishService,
162 final NotificationService notificationService,
163 final DOMDataBroker domDataBroker,
164 final DOMDataTreeService domDataTreeService,
165 final DistributedShardFactory distributedShardFactory,
166 final DistributedDataStoreInterface configDataStore,
167 final ActorSystemProvider actorSystemProvider) {
168 this.rpcRegistry = rpcRegistry;
169 this.domRpcService = domRpcService;
170 this.singletonService = singletonService;
171 this.schemaService = schemaService;
172 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
173 this.notificationPublishService = notificationPublishService;
174 this.notificationService = notificationService;
175 this.domDataBroker = domDataBroker;
176 this.domDataTreeService = domDataTreeService;
177 this.distributedShardFactory = distributedShardFactory;
178 this.configDataStore = configDataStore;
179 this.actorSystem = actorSystemProvider.getActorSystem();
181 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
183 domDataTreeChangeService =
184 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
186 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
188 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
189 bindingNormalizedNodeSerializer);
193 public Future<RpcResult<Void>> unregisterSingletonConstant() {
194 LOG.debug("unregister-singleton-constant");
196 if (getSingletonConstantRegistration == null) {
197 LOG.debug("No get-singleton-constant registration present.");
198 final RpcError rpcError = RpcResultBuilder
199 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
200 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
201 return Futures.immediateFuture(result);
205 getSingletonConstantRegistration.close();
206 getSingletonConstantRegistration = null;
208 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
209 } catch (final Exception e) {
210 LOG.debug("There was a problem closing the singleton constant service", e);
211 final RpcError rpcError = RpcResultBuilder
212 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
213 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
214 return Futures.immediateFuture(result);
219 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
220 LOG.debug("publish-notifications, input: {}", input);
222 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
223 input.getSeconds(), input.getNotificationsPerSecond());
225 publishNotificationsTasks.put(input.getId(), task);
229 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
233 public Future<RpcResult<Void>> subscribeDtcl() {
235 if (dtclReg != null) {
236 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
237 "There is already dataTreeChangeListener registered on id-ints list.");
238 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
241 idIntsListener = new IdIntsListener();
243 dtclReg = domDataTreeChangeService
244 .registerDataTreeChangeListener(
245 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
246 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
249 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
253 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
254 LOG.debug("write-transactions, input: {}", input);
255 return WriteTransactionsHandler.start(domDataBroker, input);
259 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
264 public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
269 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
271 LOG.debug("subscribe-ynl, input: {}", input);
273 if (ynlRegistrations.containsKey(input.getId())) {
274 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
275 "There is already ynl listener registered for this id: " + input.getId());
276 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
279 ynlRegistrations.put(input.getId(),
280 notificationService.registerNotificationListener(new YnlListener(input.getId())));
282 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
286 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
287 LOG.debug("remove-prefix-shard, input: {}", input);
289 return prefixShardHandler.onRemovePrefixShard(input);
293 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
294 LOG.debug("become-prefix-leader, input: {}", input);
296 return prefixLeaderHandler.makeLeaderLocal(input);
300 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
301 LOG.debug("unregister-bound-constant, {}", input);
303 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
304 routedRegistrations.remove(input.getContext());
306 if (registration == null) {
307 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
308 final RpcError rpcError = RpcResultBuilder
309 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
310 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
311 return Futures.immediateFuture(result);
314 registration.close();
315 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
319 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
321 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
323 if (input.getConstant() == null) {
324 final RpcError error = RpcResultBuilder.newError(
325 ErrorType.RPC, "Invalid input.", "Constant value is null");
326 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
329 getSingletonConstantRegistration =
330 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
332 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
336 public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
341 public Future<RpcResult<Void>> unregisterConstant() {
343 if (globalGetConstantRegistration == null) {
344 final RpcError rpcError = RpcResultBuilder
345 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
346 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
347 return Futures.immediateFuture(result);
350 globalGetConstantRegistration.close();
351 globalGetConstantRegistration = null;
353 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
357 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
358 LOG.debug("unregister-flapping-singleton received.");
360 if (flappingSingletonService == null) {
361 final RpcError rpcError = RpcResultBuilder
362 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
363 final RpcResult<UnregisterFlappingSingletonOutput> result =
364 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
365 return Futures.immediateFuture(result);
368 final long flapCount = flappingSingletonService.setInactive();
369 flappingSingletonService = null;
371 final UnregisterFlappingSingletonOutput output =
372 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
374 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
378 public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
383 public Future<RpcResult<Void>> subscribeDdtl() {
385 if (ddtlReg != null) {
386 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
387 "There is already dataTreeChangeListener registered on id-ints list.");
388 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
391 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
395 domDataTreeService.registerListener(idIntsDdtl,
396 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
397 ProduceTransactionsHandler.ID_INT_YID))
398 , true, Collections.emptyList());
399 } catch (DOMDataTreeLoopException e) {
400 LOG.error("Failed to register DOMDataTreeListener.", e);
404 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
408 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
409 LOG.debug("register-bound-constant: {}", input);
411 if (input.getContext() == null) {
412 final RpcError error = RpcResultBuilder.newError(
413 ErrorType.RPC, "Invalid input.", "Context value is null");
414 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
417 if (input.getConstant() == null) {
418 final RpcError error = RpcResultBuilder.newError(
419 ErrorType.RPC, "Invalid input.", "Constant value is null");
420 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
423 if (routedRegistrations.containsKey(input.getContext())) {
424 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
425 "There is already a rpc registered for context: " + input.getContext());
426 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
429 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
430 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
431 input.getConstant(), input.getContext());
433 routedRegistrations.put(input.getContext(), registration);
434 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
438 public Future<RpcResult<Void>> registerFlappingSingleton() {
439 LOG.debug("Received register-flapping-singleton.");
441 if (flappingSingletonService != null) {
442 final RpcError error = RpcResultBuilder.newError(
443 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
444 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
447 flappingSingletonService = new FlappingSingletonService(singletonService);
449 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
453 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
454 LOG.debug("Received unsubscribe-dtcl");
456 if (idIntsListener == null || dtclReg == null) {
457 final RpcError error = RpcResultBuilder.newError(
458 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
459 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
463 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
464 } catch (InterruptedException | ExecutionException | TimeoutException e) {
465 final RpcError error = RpcResultBuilder.newError(
466 ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
467 "clustering-it", "clustering-it", e);
468 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
469 .withRpcError(error).build());
475 if (!idIntsListener.hasTriggered()) {
476 final RpcError error = RpcResultBuilder.newError(
477 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
478 "any notifications.");
479 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
480 .withRpcError(error).build());
483 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
485 final Optional<NormalizedNode<?, ?>> readResult =
486 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
488 if (!readResult.isPresent()) {
489 final RpcError error = RpcResultBuilder.newError(
490 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
491 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
492 .withRpcError(error).build());
495 return Futures.immediateFuture(
496 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
497 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
499 } catch (final ReadFailedException e) {
500 final RpcError error = RpcResultBuilder.newError(
501 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
502 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
503 .withRpcError(error).build());
509 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
510 LOG.debug("create-prefix-shard, input: {}", input);
512 return prefixShardHandler.onCreatePrefixShard(input);
516 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
521 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
522 LOG.debug("Received unsubscribe-ynl, input: {}", input);
524 if (!ynlRegistrations.containsKey(input.getId())) {
525 final RpcError rpcError = RpcResultBuilder
526 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
527 final RpcResult<UnsubscribeYnlOutput> result =
528 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
529 return Futures.immediateFuture(result);
532 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
533 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
535 registration.close();
537 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
541 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
542 final CheckPublishNotificationsInput input) {
544 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
547 return Futures.immediateFuture(RpcResultBuilder.success(
548 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
551 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
552 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
554 if (task.getLastError() != null) {
555 final StringWriter sw = new StringWriter();
556 final PrintWriter pw = new PrintWriter(sw);
557 task.getLastError().printStackTrace(pw);
558 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
561 final CheckPublishNotificationsOutput output =
562 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
564 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
568 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
569 LOG.debug("producer-transactions, input: {}", input);
570 return ProduceTransactionsHandler.start(domDataTreeService, input);
574 public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
575 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
577 final String shardName = input.getShardName();
578 if (Strings.isNullOrEmpty(shardName)) {
579 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
580 "A valid shard name must be specified");
581 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
584 return shutdownShardGracefully(shardName);
588 public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
589 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
591 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
593 if (shardPrefix == null) {
594 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
595 "A valid shard prefix must be specified");
596 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
599 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
600 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
602 return shutdownShardGracefully(cleanPrefixShardName);
605 private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
606 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
607 final ActorContext context = configDataStore.getActorContext();
609 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
610 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
611 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
612 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
614 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
616 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
617 if (throwable != null) {
618 shutdownShardAsk.failure(throwable);
620 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
623 }, context.getClientDispatcher());
625 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
627 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
628 if (throwable != null) {
629 final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
630 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
631 rpcResult.set(failedResult);
633 // according to Patterns.gracefulStop API, we don't have to
634 // check value of gracefulStopResult
635 rpcResult.set(RpcResultBuilder.<Void>success().build());
638 }, context.getClientDispatcher());
643 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
645 LOG.debug("Received register-constant rpc, input: {}", input);
647 if (input.getConstant() == null) {
648 final RpcError error = RpcResultBuilder.newError(
649 ErrorType.RPC, "Invalid input.", "Constant value is null");
650 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
653 if (globalGetConstantRegistration != null) {
654 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
655 "There is already a get-constant rpc registered.");
656 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
659 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
660 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
664 public Future<RpcResult<Void>> unregisterDefaultConstant() {
669 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
670 LOG.debug("Received unsubscribe-ddtl.");
672 if (idIntsDdtl == null || ddtlReg == null) {
673 final RpcError error = RpcResultBuilder.newError(
674 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
675 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
679 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
680 } catch (InterruptedException | ExecutionException | TimeoutException e) {
681 final RpcError error = RpcResultBuilder.newError(
682 ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
683 "clustering-it", "clustering-it", e);
684 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
685 .withRpcError(error).build());
691 if (!idIntsDdtl.hasTriggered()) {
692 final RpcError error = RpcResultBuilder.newError(
693 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
694 "any notifications.");
695 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
696 .withRpcError(error).build());
699 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
700 LOG.debug("Creating distributed datastore client for shard {}", shardName);
702 final ActorContext actorContext = configDataStore.getActorContext();
703 final Props distributedDataStoreClientProps =
704 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
705 "Shard-" + shardName, actorContext, shardName);
707 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
708 final DataStoreClient distributedDataStoreClient;
710 distributedDataStoreClient = SimpleDataStoreClientActor
711 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
712 } catch (final Exception e) {
713 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
714 clientActor.tell(PoisonPill.getInstance(), noSender());
715 final RpcError error = RpcResultBuilder.newError(
716 ErrorType.APPLICATION, "Unable to create ds client for read.",
717 "Unable to create ds client for read.");
718 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
719 .withRpcError(error).build());
722 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
723 final ClientTransaction tx = localHistory.createTransaction();
724 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
725 org.opendaylight.mdsal.common.api.ReadFailedException> read =
726 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
729 localHistory.close();
731 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
732 if (!optional.isPresent()) {
733 LOG.warn("Final read from client is empty.");
734 final RpcError error = RpcResultBuilder.newError(
735 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
736 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
737 .withRpcError(error).build());
740 return Futures.immediateFuture(
741 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
742 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
744 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
745 LOG.error("Unable to read data to verify ddtl data.", e);
746 final RpcError error = RpcResultBuilder.newError(
747 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
748 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
749 .withRpcError(error).build());
751 distributedDataStoreClient.close();
752 clientActor.tell(PoisonPill.getInstance(), noSender());