2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.Collections;
25 import java.util.HashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import org.opendaylight.controller.cluster.ActorSystemProvider;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
35 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
38 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
39 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
40 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
41 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
48 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
51 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
59 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
60 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
61 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
62 import org.opendaylight.controller.sal.core.api.model.SchemaService;
63 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
64 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
66 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
69 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
145 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
146 import org.opendaylight.yangtools.concepts.ListenerRegistration;
147 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
148 import org.opendaylight.yangtools.yang.common.RpcError;
149 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
150 import org.opendaylight.yangtools.yang.common.RpcResult;
151 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
152 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
153 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
154 import org.slf4j.Logger;
155 import org.slf4j.LoggerFactory;
156 import scala.concurrent.duration.FiniteDuration;
158 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
160 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
161 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
162 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
164 private final RpcProviderRegistry rpcRegistry;
165 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
166 private final DistributedShardFactory distributedShardFactory;
167 private final DistributedDataStoreInterface configDataStore;
168 private final DOMDataTreeService domDataTreeService;
169 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
170 private final DOMDataBroker domDataBroker;
171 private final NotificationPublishService notificationPublishService;
172 private final NotificationService notificationService;
173 private final SchemaService schemaService;
174 private final ClusterSingletonServiceProvider singletonService;
175 private final DOMRpcProviderService domRpcService;
176 private final PrefixLeaderHandler prefixLeaderHandler;
177 private final PrefixShardHandler prefixShardHandler;
178 private final DOMDataTreeChangeService domDataTreeChangeService;
179 private final ActorSystem actorSystem;
181 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
182 routedRegistrations = new HashMap<>();
184 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
186 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
187 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
188 private FlappingSingletonService flappingSingletonService;
189 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
190 private IdIntsListener idIntsListener;
191 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
192 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
193 private IdIntsDOMDataTreeLIstener idIntsDdtl;
197 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
198 final DOMRpcProviderService domRpcService,
199 final ClusterSingletonServiceProvider singletonService,
200 final SchemaService schemaService,
201 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
202 final NotificationPublishService notificationPublishService,
203 final NotificationService notificationService,
204 final DOMDataBroker domDataBroker,
205 final DOMDataTreeService domDataTreeService,
206 final DistributedShardFactory distributedShardFactory,
207 final DistributedDataStoreInterface configDataStore,
208 final ActorSystemProvider actorSystemProvider) {
209 this.rpcRegistry = rpcRegistry;
210 this.domRpcService = domRpcService;
211 this.singletonService = singletonService;
212 this.schemaService = schemaService;
213 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
214 this.notificationPublishService = notificationPublishService;
215 this.notificationService = notificationService;
216 this.domDataBroker = domDataBroker;
217 this.domDataTreeService = domDataTreeService;
218 this.distributedShardFactory = distributedShardFactory;
219 this.configDataStore = configDataStore;
220 this.actorSystem = actorSystemProvider.getActorSystem();
222 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
224 domDataTreeChangeService =
225 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
227 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
229 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
230 bindingNormalizedNodeSerializer);
234 @SuppressWarnings("checkstyle:IllegalCatch")
235 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
236 final UnregisterSingletonConstantInput input) {
237 LOG.debug("unregister-singleton-constant");
239 if (getSingletonConstantRegistration == null) {
240 LOG.debug("No get-singleton-constant registration present.");
241 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
242 "No get-singleton-constant rpc registration present.");
243 final RpcResult<UnregisterSingletonConstantOutput> result =
244 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
245 return Futures.immediateFuture(result);
249 getSingletonConstantRegistration.close();
250 getSingletonConstantRegistration = null;
252 return Futures.immediateFuture(RpcResultBuilder.success(
253 new UnregisterSingletonConstantOutputBuilder().build()).build());
254 } catch (Exception e) {
255 LOG.debug("There was a problem closing the singleton constant service", e);
256 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
257 "There was a problem closing get-singleton-constant");
258 final RpcResult<UnregisterSingletonConstantOutput> result =
259 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
260 return Futures.immediateFuture(result);
265 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
266 final StartPublishNotificationsInput input) {
267 LOG.debug("publish-notifications, input: {}", input);
269 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
270 input.getSeconds(), input.getNotificationsPerSecond());
272 publishNotificationsTasks.put(input.getId(), task);
276 return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
281 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
283 if (dtclReg != null) {
284 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
285 "There is already dataTreeChangeListener registered on id-ints list.");
286 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
289 idIntsListener = new IdIntsListener();
291 dtclReg = domDataTreeChangeService
292 .registerDataTreeChangeListener(
293 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
294 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
297 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
301 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
302 LOG.debug("write-transactions, input: {}", input);
303 return WriteTransactionsHandler.start(domDataBroker, input);
307 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
312 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
313 final RemoveShardReplicaInput input) {
318 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
320 LOG.debug("subscribe-ynl, input: {}", input);
322 if (ynlRegistrations.containsKey(input.getId())) {
323 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
324 "There is already ynl listener registered for this id: " + input.getId());
325 return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
328 ynlRegistrations.put(input.getId(),
329 notificationService.registerNotificationListener(new YnlListener(input.getId())));
331 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
335 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
336 LOG.debug("remove-prefix-shard, input: {}", input);
338 return prefixShardHandler.onRemovePrefixShard(input);
342 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
343 final BecomePrefixLeaderInput input) {
344 LOG.debug("become-prefix-leader, input: {}", input);
346 return prefixLeaderHandler.makeLeaderLocal(input);
350 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
351 final UnregisterBoundConstantInput input) {
352 LOG.debug("unregister-bound-constant, {}", input);
354 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
355 routedRegistrations.remove(input.getContext());
357 if (rpcRegistration == null) {
358 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
359 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
360 "No get-constant rpc registration present.");
361 final RpcResult<UnregisterBoundConstantOutput> result =
362 RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
363 return Futures.immediateFuture(result);
366 rpcRegistration.close();
367 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
372 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
373 final RegisterSingletonConstantInput input) {
375 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
377 if (input.getConstant() == null) {
378 final RpcError error = RpcResultBuilder.newError(
379 ErrorType.RPC, "Invalid input.", "Constant value is null");
380 return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
381 .withRpcError(error).build());
384 getSingletonConstantRegistration =
385 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
387 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
392 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
393 final RegisterDefaultConstantInput input) {
398 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
399 final UnregisterConstantInput input) {
401 if (globalGetConstantRegistration == null) {
402 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
403 "No get-constant rpc registration present.");
404 return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
408 globalGetConstantRegistration.close();
409 globalGetConstantRegistration = null;
411 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
415 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
416 final UnregisterFlappingSingletonInput input) {
417 LOG.debug("unregister-flapping-singleton received.");
419 if (flappingSingletonService == null) {
420 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
421 "No flapping-singleton registration present.");
422 final RpcResult<UnregisterFlappingSingletonOutput> result =
423 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
424 return Futures.immediateFuture(result);
427 final long flapCount = flappingSingletonService.setInactive();
428 flappingSingletonService = null;
430 final UnregisterFlappingSingletonOutput output =
431 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
433 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
437 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
442 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
444 if (ddtlReg != null) {
445 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
446 "There is already dataTreeChangeListener registered on id-ints list.");
447 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
450 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
453 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
454 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
455 ProduceTransactionsHandler.ID_INT_YID)),
456 true, Collections.emptyList());
457 } catch (DOMDataTreeLoopException e) {
458 LOG.error("Failed to register DOMDataTreeListener.", e);
461 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
465 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
466 final RegisterBoundConstantInput input) {
467 LOG.debug("register-bound-constant: {}", input);
469 if (input.getContext() == null) {
470 final RpcError error = RpcResultBuilder.newError(
471 ErrorType.RPC, "Invalid input.", "Context value is null");
472 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
476 if (input.getConstant() == null) {
477 final RpcError error = RpcResultBuilder.newError(
478 ErrorType.RPC, "Invalid input.", "Constant value is null");
479 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
483 if (routedRegistrations.containsKey(input.getContext())) {
484 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
485 "There is already a rpc registered for context: " + input.getContext());
486 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
490 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
491 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
492 input.getConstant(), input.getContext());
494 routedRegistrations.put(input.getContext(), rpcRegistration);
495 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
500 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
501 final RegisterFlappingSingletonInput input) {
502 LOG.debug("Received register-flapping-singleton.");
504 if (flappingSingletonService != null) {
505 final RpcError error = RpcResultBuilder.newError(
506 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
507 return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
508 .withRpcError(error).build());
511 flappingSingletonService = new FlappingSingletonService(singletonService);
513 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
518 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
519 LOG.debug("Received unsubscribe-dtcl");
521 if (idIntsListener == null || dtclReg == null) {
522 final RpcError error = RpcResultBuilder.newError(
523 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
524 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
525 .withRpcError(error).build());
529 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
530 } catch (InterruptedException | ExecutionException | TimeoutException e) {
531 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
532 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
533 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
534 .withRpcError(error).build());
540 if (!idIntsListener.hasTriggered()) {
541 final RpcError error = RpcResultBuilder.newError(
542 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
543 + "any notifications.");
544 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
545 .withRpcError(error).build());
548 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
550 final Optional<NormalizedNode<?, ?>> readResult =
551 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
553 if (!readResult.isPresent()) {
554 final RpcError error = RpcResultBuilder.newError(
555 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
556 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
557 .withRpcError(error).build());
560 return Futures.immediateFuture(
561 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
562 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
564 } catch (final InterruptedException | ExecutionException e) {
565 final RpcError error = RpcResultBuilder.newError(
566 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
567 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
568 .withRpcError(error).build());
574 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
575 LOG.debug("create-prefix-shard, input: {}", input);
577 return prefixShardHandler.onCreatePrefixShard(input);
581 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
582 final DeconfigureIdIntsShardInput input) {
587 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
588 LOG.debug("Received unsubscribe-ynl, input: {}", input);
590 if (!ynlRegistrations.containsKey(input.getId())) {
591 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
592 "No ynl listener with this id registered.");
593 final RpcResult<UnsubscribeYnlOutput> result =
594 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
595 return Futures.immediateFuture(result);
598 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
599 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
603 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
607 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
608 final CheckPublishNotificationsInput input) {
610 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
613 return Futures.immediateFuture(RpcResultBuilder.success(
614 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
617 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
618 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
620 if (task.getLastError() != null) {
621 LOG.error("Last error for {}", task, task.getLastError());
622 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
625 final CheckPublishNotificationsOutput output =
626 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
628 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
632 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
633 final ProduceTransactionsInput input) {
634 LOG.debug("producer-transactions, input: {}", input);
635 return ProduceTransactionsHandler.start(domDataTreeService, input);
639 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
640 final ShutdownShardReplicaInput input) {
641 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
643 final String shardName = input.getShardName();
644 if (Strings.isNullOrEmpty(shardName)) {
645 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
646 "A valid shard name must be specified");
647 return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
651 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
655 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
656 final ShutdownPrefixShardReplicaInput input) {
657 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
659 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
661 if (shardPrefix == null) {
662 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
663 "A valid shard prefix must be specified");
664 return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
665 .withRpcError(rpcError).build());
668 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
669 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
671 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
674 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
675 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
676 final ActorContext context = configDataStore.getActorContext();
678 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
679 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
680 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
681 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
683 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
685 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
686 if (throwable != null) {
687 shutdownShardAsk.failure(throwable);
689 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
692 }, context.getClientDispatcher());
694 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
696 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
697 if (throwable != null) {
698 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
699 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
700 rpcResult.set(failedResult);
702 // according to Patterns.gracefulStop API, we don't have to
703 // check value of gracefulStopResult
704 rpcResult.set(RpcResultBuilder.success(success).build());
707 }, context.getClientDispatcher());
712 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
714 LOG.debug("Received register-constant rpc, input: {}", input);
716 if (input.getConstant() == null) {
717 final RpcError error = RpcResultBuilder.newError(
718 ErrorType.RPC, "Invalid input.", "Constant value is null");
719 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
723 if (globalGetConstantRegistration != null) {
724 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
725 "There is already a get-constant rpc registered.");
726 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
730 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
731 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
735 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
736 final UnregisterDefaultConstantInput input) {
741 @SuppressWarnings("checkstyle:IllegalCatch")
742 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
743 LOG.debug("Received unsubscribe-ddtl.");
745 if (idIntsDdtl == null || ddtlReg == null) {
746 final RpcError error = RpcResultBuilder.newError(
747 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
748 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
749 .withRpcError(error).build());
753 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
754 } catch (InterruptedException | ExecutionException | TimeoutException e) {
755 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
756 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
757 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
758 .withRpcError(error).build());
764 if (!idIntsDdtl.hasTriggered()) {
765 final RpcError error = RpcResultBuilder.newError(
766 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
767 + "any notifications.");
768 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
769 .withRpcError(error).build());
772 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
773 LOG.debug("Creating distributed datastore client for shard {}", shardName);
775 final ActorContext actorContext = configDataStore.getActorContext();
776 final Props distributedDataStoreClientProps =
777 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
778 "Shard-" + shardName, actorContext, shardName);
780 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
781 final DataStoreClient distributedDataStoreClient;
783 distributedDataStoreClient = SimpleDataStoreClientActor
784 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
785 } catch (RuntimeException e) {
786 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
787 clientActor.tell(PoisonPill.getInstance(), noSender());
788 final RpcError error = RpcResultBuilder.newError(
789 ErrorType.APPLICATION, "Unable to create ds client for read.",
790 "Unable to create ds client for read.");
791 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
792 .withRpcError(error).build());
795 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
796 final ClientTransaction tx = localHistory.createTransaction();
797 final ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
798 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
801 localHistory.close();
803 final Optional<NormalizedNode<?, ?>> optional = read.get();
804 if (!optional.isPresent()) {
805 LOG.warn("Final read from client is empty.");
806 final RpcError error = RpcResultBuilder.newError(
807 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
808 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
809 .withRpcError(error).build());
812 return Futures.immediateFuture(
813 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
814 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
816 } catch (InterruptedException | ExecutionException e) {
817 LOG.error("Unable to read data to verify ddtl data.", e);
818 final RpcError error = RpcResultBuilder.newError(
819 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
820 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
821 .withRpcError(error).build());
823 distributedDataStoreClient.close();
824 clientActor.tell(PoisonPill.getInstance(), noSender());