2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.controller.cluster.ActorSystemProvider;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
38 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
43 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
44 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
45 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
46 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
47 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
49 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
51 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
52 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
53 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
54 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
56 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
62 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
63 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
64 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
65 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
66 import org.opendaylight.controller.sal.core.api.model.SchemaService;
67 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
72 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
73 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
105 import org.opendaylight.yangtools.concepts.ListenerRegistration;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
109 import org.opendaylight.yangtools.yang.common.RpcResult;
110 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
111 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.slf4j.Logger;
114 import org.slf4j.LoggerFactory;
115 import scala.concurrent.duration.FiniteDuration;
117 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
119 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
120 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
121 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
123 private final RpcProviderRegistry rpcRegistry;
124 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
125 private final DistributedShardFactory distributedShardFactory;
126 private final DistributedDataStoreInterface configDataStore;
127 private final DOMDataTreeService domDataTreeService;
128 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
129 private final DOMDataBroker domDataBroker;
130 private final NotificationPublishService notificationPublishService;
131 private final NotificationService notificationService;
132 private final SchemaService schemaService;
133 private final ClusterSingletonServiceProvider singletonService;
134 private final DOMRpcProviderService domRpcService;
135 private final PrefixLeaderHandler prefixLeaderHandler;
136 private final PrefixShardHandler prefixShardHandler;
137 private final DOMDataTreeChangeService domDataTreeChangeService;
138 private final ActorSystem actorSystem;
140 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
141 routedRegistrations = new HashMap<>();
143 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
145 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
146 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
147 private FlappingSingletonService flappingSingletonService;
148 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
149 private IdIntsListener idIntsListener;
150 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
151 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
152 private IdIntsDOMDataTreeLIstener idIntsDdtl;
156 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
157 final DOMRpcProviderService domRpcService,
158 final ClusterSingletonServiceProvider singletonService,
159 final SchemaService schemaService,
160 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
161 final NotificationPublishService notificationPublishService,
162 final NotificationService notificationService,
163 final DOMDataBroker domDataBroker,
164 final DOMDataTreeService domDataTreeService,
165 final DistributedShardFactory distributedShardFactory,
166 final DistributedDataStoreInterface configDataStore,
167 final ActorSystemProvider actorSystemProvider) {
168 this.rpcRegistry = rpcRegistry;
169 this.domRpcService = domRpcService;
170 this.singletonService = singletonService;
171 this.schemaService = schemaService;
172 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
173 this.notificationPublishService = notificationPublishService;
174 this.notificationService = notificationService;
175 this.domDataBroker = domDataBroker;
176 this.domDataTreeService = domDataTreeService;
177 this.distributedShardFactory = distributedShardFactory;
178 this.configDataStore = configDataStore;
179 this.actorSystem = actorSystemProvider.getActorSystem();
181 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
183 domDataTreeChangeService =
184 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
186 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
188 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
189 bindingNormalizedNodeSerializer);
193 @SuppressWarnings("checkstyle:IllegalCatch")
194 public Future<RpcResult<Void>> unregisterSingletonConstant() {
195 LOG.debug("unregister-singleton-constant");
197 if (getSingletonConstantRegistration == null) {
198 LOG.debug("No get-singleton-constant registration present.");
199 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
200 "No get-singleton-constant rpc registration present.");
201 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
202 return Futures.immediateFuture(result);
206 getSingletonConstantRegistration.close();
207 getSingletonConstantRegistration = null;
209 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
210 } catch (Exception e) {
211 LOG.debug("There was a problem closing the singleton constant service", e);
212 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
213 "There was a problem closing get-singleton-constant");
214 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
215 return Futures.immediateFuture(result);
220 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
221 LOG.debug("publish-notifications, input: {}", input);
223 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
224 input.getSeconds(), input.getNotificationsPerSecond());
226 publishNotificationsTasks.put(input.getId(), task);
230 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
234 public Future<RpcResult<Void>> subscribeDtcl() {
236 if (dtclReg != null) {
237 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
238 "There is already dataTreeChangeListener registered on id-ints list.");
239 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
242 idIntsListener = new IdIntsListener();
244 dtclReg = domDataTreeChangeService
245 .registerDataTreeChangeListener(
246 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
247 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
250 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
254 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
255 LOG.debug("write-transactions, input: {}", input);
256 return WriteTransactionsHandler.start(domDataBroker, input);
260 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
265 public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
270 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
272 LOG.debug("subscribe-ynl, input: {}", input);
274 if (ynlRegistrations.containsKey(input.getId())) {
275 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
276 "There is already ynl listener registered for this id: " + input.getId());
277 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
280 ynlRegistrations.put(input.getId(),
281 notificationService.registerNotificationListener(new YnlListener(input.getId())));
283 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
287 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
288 LOG.debug("remove-prefix-shard, input: {}", input);
290 return prefixShardHandler.onRemovePrefixShard(input);
294 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
295 LOG.debug("become-prefix-leader, input: {}", input);
297 return prefixLeaderHandler.makeLeaderLocal(input);
301 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
302 LOG.debug("unregister-bound-constant, {}", input);
304 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
305 routedRegistrations.remove(input.getContext());
307 if (rpcRegistration == null) {
308 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
309 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
310 "No get-constant rpc registration present.");
311 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
312 return Futures.immediateFuture(result);
315 rpcRegistration.close();
316 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
320 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
322 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
324 if (input.getConstant() == null) {
325 final RpcError error = RpcResultBuilder.newError(
326 ErrorType.RPC, "Invalid input.", "Constant value is null");
327 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
330 getSingletonConstantRegistration =
331 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
333 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
337 public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
342 public Future<RpcResult<Void>> unregisterConstant() {
344 if (globalGetConstantRegistration == null) {
345 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
346 "No get-constant rpc registration present.");
347 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
348 return Futures.immediateFuture(result);
351 globalGetConstantRegistration.close();
352 globalGetConstantRegistration = null;
354 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
358 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
359 LOG.debug("unregister-flapping-singleton received.");
361 if (flappingSingletonService == null) {
362 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
363 "No flapping-singleton registration present.");
364 final RpcResult<UnregisterFlappingSingletonOutput> result =
365 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
366 return Futures.immediateFuture(result);
369 final long flapCount = flappingSingletonService.setInactive();
370 flappingSingletonService = null;
372 final UnregisterFlappingSingletonOutput output =
373 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
375 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
379 public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
384 public Future<RpcResult<Void>> subscribeDdtl() {
386 if (ddtlReg != null) {
387 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
388 "There is already dataTreeChangeListener registered on id-ints list.");
389 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
392 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
395 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
396 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
397 ProduceTransactionsHandler.ID_INT_YID)),
398 true, Collections.emptyList());
399 } catch (DOMDataTreeLoopException e) {
400 LOG.error("Failed to register DOMDataTreeListener.", e);
404 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
408 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
409 LOG.debug("register-bound-constant: {}", input);
411 if (input.getContext() == null) {
412 final RpcError error = RpcResultBuilder.newError(
413 ErrorType.RPC, "Invalid input.", "Context value is null");
414 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
417 if (input.getConstant() == null) {
418 final RpcError error = RpcResultBuilder.newError(
419 ErrorType.RPC, "Invalid input.", "Constant value is null");
420 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
423 if (routedRegistrations.containsKey(input.getContext())) {
424 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
425 "There is already a rpc registered for context: " + input.getContext());
426 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
429 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
430 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
431 input.getConstant(), input.getContext());
433 routedRegistrations.put(input.getContext(), rpcRegistration);
434 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
438 public Future<RpcResult<Void>> registerFlappingSingleton() {
439 LOG.debug("Received register-flapping-singleton.");
441 if (flappingSingletonService != null) {
442 final RpcError error = RpcResultBuilder.newError(
443 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
444 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
447 flappingSingletonService = new FlappingSingletonService(singletonService);
449 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
453 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
454 LOG.debug("Received unsubscribe-dtcl");
456 if (idIntsListener == null || dtclReg == null) {
457 final RpcError error = RpcResultBuilder.newError(
458 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
459 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
460 .withRpcError(error).build());
464 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
465 } catch (InterruptedException | ExecutionException | TimeoutException e) {
466 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
467 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
468 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
469 .withRpcError(error).build());
475 if (!idIntsListener.hasTriggered()) {
476 final RpcError error = RpcResultBuilder.newError(
477 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
478 + "any notifications.");
479 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
480 .withRpcError(error).build());
483 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
485 final Optional<NormalizedNode<?, ?>> readResult =
486 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
488 if (!readResult.isPresent()) {
489 final RpcError error = RpcResultBuilder.newError(
490 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
491 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
492 .withRpcError(error).build());
495 return Futures.immediateFuture(
496 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
497 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
499 } catch (final ReadFailedException e) {
500 final RpcError error = RpcResultBuilder.newError(
501 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
502 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
503 .withRpcError(error).build());
509 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
510 LOG.debug("create-prefix-shard, input: {}", input);
512 return prefixShardHandler.onCreatePrefixShard(input);
516 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
521 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
522 LOG.debug("Received unsubscribe-ynl, input: {}", input);
524 if (!ynlRegistrations.containsKey(input.getId())) {
525 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
526 "No ynl listener with this id registered.");
527 final RpcResult<UnsubscribeYnlOutput> result =
528 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
529 return Futures.immediateFuture(result);
532 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
533 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
537 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
541 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
542 final CheckPublishNotificationsInput input) {
544 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
547 return Futures.immediateFuture(RpcResultBuilder.success(
548 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
551 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
552 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
554 if (task.getLastError() != null) {
555 final StringWriter sw = new StringWriter();
556 final PrintWriter pw = new PrintWriter(sw);
557 LOG.error("Last error for {}", task, task.getLastError());
558 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
561 final CheckPublishNotificationsOutput output =
562 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
564 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
568 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
569 LOG.debug("producer-transactions, input: {}", input);
570 return ProduceTransactionsHandler.start(domDataTreeService, input);
574 public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
575 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
577 final String shardName = input.getShardName();
578 if (Strings.isNullOrEmpty(shardName)) {
579 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
580 "A valid shard name must be specified");
581 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
584 return shutdownShardGracefully(shardName);
588 public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
589 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
591 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
593 if (shardPrefix == null) {
594 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
595 "A valid shard prefix must be specified");
596 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
599 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
600 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
602 return shutdownShardGracefully(cleanPrefixShardName);
605 private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
606 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
607 final ActorContext context = configDataStore.getActorContext();
609 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
610 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
611 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
612 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
614 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
616 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
617 if (throwable != null) {
618 shutdownShardAsk.failure(throwable);
620 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
623 }, context.getClientDispatcher());
625 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
627 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
628 if (throwable != null) {
629 final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
630 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
631 rpcResult.set(failedResult);
633 // according to Patterns.gracefulStop API, we don't have to
634 // check value of gracefulStopResult
635 rpcResult.set(RpcResultBuilder.<Void>success().build());
638 }, context.getClientDispatcher());
643 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
645 LOG.debug("Received register-constant rpc, input: {}", input);
647 if (input.getConstant() == null) {
648 final RpcError error = RpcResultBuilder.newError(
649 ErrorType.RPC, "Invalid input.", "Constant value is null");
650 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
653 if (globalGetConstantRegistration != null) {
654 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
655 "There is already a get-constant rpc registered.");
656 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
659 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
660 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
664 public Future<RpcResult<Void>> unregisterDefaultConstant() {
669 @SuppressWarnings("checkstyle:IllegalCatch")
670 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
671 LOG.debug("Received unsubscribe-ddtl.");
673 if (idIntsDdtl == null || ddtlReg == null) {
674 final RpcError error = RpcResultBuilder.newError(
675 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
676 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
677 .withRpcError(error).build());
681 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
682 } catch (InterruptedException | ExecutionException | TimeoutException e) {
683 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
684 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
685 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
686 .withRpcError(error).build());
692 if (!idIntsDdtl.hasTriggered()) {
693 final RpcError error = RpcResultBuilder.newError(
694 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
695 + "any notifications.");
696 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
697 .withRpcError(error).build());
700 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
701 LOG.debug("Creating distributed datastore client for shard {}", shardName);
703 final ActorContext actorContext = configDataStore.getActorContext();
704 final Props distributedDataStoreClientProps =
705 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
706 "Shard-" + shardName, actorContext, shardName);
708 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
709 final DataStoreClient distributedDataStoreClient;
711 distributedDataStoreClient = SimpleDataStoreClientActor
712 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
713 } catch (RuntimeException e) {
714 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
715 clientActor.tell(PoisonPill.getInstance(), noSender());
716 final RpcError error = RpcResultBuilder.newError(
717 ErrorType.APPLICATION, "Unable to create ds client for read.",
718 "Unable to create ds client for read.");
719 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
720 .withRpcError(error).build());
723 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
724 final ClientTransaction tx = localHistory.createTransaction();
725 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
726 org.opendaylight.mdsal.common.api.ReadFailedException> read =
727 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
730 localHistory.close();
732 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
733 if (!optional.isPresent()) {
734 LOG.warn("Final read from client is empty.");
735 final RpcError error = RpcResultBuilder.newError(
736 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
737 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
738 .withRpcError(error).build());
741 return Futures.immediateFuture(
742 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
743 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
745 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
746 LOG.error("Unable to read data to verify ddtl data.", e);
747 final RpcError error = RpcResultBuilder.newError(
748 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
749 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
750 .withRpcError(error).build());
752 distributedDataStoreClient.close();
753 clientActor.tell(PoisonPill.getInstance(), noSender());