2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import static akka.actor.ActorRef.noSender;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError;
148 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
149 import org.opendaylight.yangtools.yang.common.RpcResult;
150 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
151 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
152 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
153 import org.slf4j.Logger;
154 import org.slf4j.LoggerFactory;
155 import scala.concurrent.duration.FiniteDuration;
157 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
159 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
160 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
161 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
163 private final RpcProviderRegistry rpcRegistry;
164 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
165 private final DistributedShardFactory distributedShardFactory;
166 private final DistributedDataStoreInterface configDataStore;
167 private final DOMDataTreeService domDataTreeService;
168 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
169 private final DOMDataBroker domDataBroker;
170 private final NotificationPublishService notificationPublishService;
171 private final NotificationService notificationService;
172 private final DOMSchemaService schemaService;
173 private final ClusterSingletonServiceProvider singletonService;
174 private final DOMRpcProviderService domRpcService;
175 private final PrefixLeaderHandler prefixLeaderHandler;
176 private final PrefixShardHandler prefixShardHandler;
177 private final DOMDataTreeChangeService domDataTreeChangeService;
178 private final ActorSystem actorSystem;
180 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
181 routedRegistrations = new HashMap<>();
183 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
185 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
186 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
187 private FlappingSingletonService flappingSingletonService;
188 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
189 private IdIntsListener idIntsListener;
190 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
191 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
192 private IdIntsDOMDataTreeLIstener idIntsDdtl;
196 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
197 final DOMRpcProviderService domRpcService,
198 final ClusterSingletonServiceProvider singletonService,
199 final DOMSchemaService schemaService,
200 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
201 final NotificationPublishService notificationPublishService,
202 final NotificationService notificationService,
203 final DOMDataBroker domDataBroker,
204 final DOMDataTreeService domDataTreeService,
205 final DistributedShardFactory distributedShardFactory,
206 final DistributedDataStoreInterface configDataStore,
207 final ActorSystemProvider actorSystemProvider) {
208 this.rpcRegistry = rpcRegistry;
209 this.domRpcService = domRpcService;
210 this.singletonService = singletonService;
211 this.schemaService = schemaService;
212 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
213 this.notificationPublishService = notificationPublishService;
214 this.notificationService = notificationService;
215 this.domDataBroker = domDataBroker;
216 this.domDataTreeService = domDataTreeService;
217 this.distributedShardFactory = distributedShardFactory;
218 this.configDataStore = configDataStore;
219 this.actorSystem = actorSystemProvider.getActorSystem();
221 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
223 domDataTreeChangeService =
224 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
226 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
228 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
229 bindingNormalizedNodeSerializer);
233 @SuppressWarnings("checkstyle:IllegalCatch")
234 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
235 final UnregisterSingletonConstantInput input) {
236 LOG.debug("unregister-singleton-constant");
238 if (getSingletonConstantRegistration == null) {
239 LOG.debug("No get-singleton-constant registration present.");
240 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
241 "No get-singleton-constant rpc registration present.");
242 final RpcResult<UnregisterSingletonConstantOutput> result =
243 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
244 return Futures.immediateFuture(result);
248 getSingletonConstantRegistration.close();
249 getSingletonConstantRegistration = null;
251 return Futures.immediateFuture(RpcResultBuilder.success(
252 new UnregisterSingletonConstantOutputBuilder().build()).build());
253 } catch (Exception e) {
254 LOG.debug("There was a problem closing the singleton constant service", e);
255 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
256 "There was a problem closing get-singleton-constant");
257 final RpcResult<UnregisterSingletonConstantOutput> result =
258 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
259 return Futures.immediateFuture(result);
264 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
265 final StartPublishNotificationsInput input) {
266 LOG.debug("publish-notifications, input: {}", input);
268 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
269 input.getSeconds(), input.getNotificationsPerSecond());
271 publishNotificationsTasks.put(input.getId(), task);
275 return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
280 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
282 if (dtclReg != null) {
283 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
284 "There is already dataTreeChangeListener registered on id-ints list.");
285 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
288 idIntsListener = new IdIntsListener();
290 dtclReg = domDataTreeChangeService
291 .registerDataTreeChangeListener(
292 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
293 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
296 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
300 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
301 LOG.debug("write-transactions, input: {}", input);
302 return WriteTransactionsHandler.start(domDataBroker, input);
306 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
311 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
312 final RemoveShardReplicaInput input) {
317 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
319 LOG.debug("subscribe-ynl, input: {}", input);
321 if (ynlRegistrations.containsKey(input.getId())) {
322 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
323 "There is already ynl listener registered for this id: " + input.getId());
324 return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
327 ynlRegistrations.put(input.getId(),
328 notificationService.registerNotificationListener(new YnlListener(input.getId())));
330 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
334 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
335 LOG.debug("remove-prefix-shard, input: {}", input);
337 return prefixShardHandler.onRemovePrefixShard(input);
341 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
342 final BecomePrefixLeaderInput input) {
343 LOG.debug("become-prefix-leader, input: {}", input);
345 return prefixLeaderHandler.makeLeaderLocal(input);
349 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
350 final UnregisterBoundConstantInput input) {
351 LOG.debug("unregister-bound-constant, {}", input);
353 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
354 routedRegistrations.remove(input.getContext());
356 if (rpcRegistration == null) {
357 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
358 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
359 "No get-constant rpc registration present.");
360 final RpcResult<UnregisterBoundConstantOutput> result =
361 RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
362 return Futures.immediateFuture(result);
365 rpcRegistration.close();
366 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
371 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
372 final RegisterSingletonConstantInput input) {
374 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
376 if (input.getConstant() == null) {
377 final RpcError error = RpcResultBuilder.newError(
378 ErrorType.RPC, "Invalid input.", "Constant value is null");
379 return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
380 .withRpcError(error).build());
383 getSingletonConstantRegistration =
384 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
386 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
391 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
392 final RegisterDefaultConstantInput input) {
397 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
398 final UnregisterConstantInput input) {
400 if (globalGetConstantRegistration == null) {
401 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
402 "No get-constant rpc registration present.");
403 return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
407 globalGetConstantRegistration.close();
408 globalGetConstantRegistration = null;
410 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
414 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
415 final UnregisterFlappingSingletonInput input) {
416 LOG.debug("unregister-flapping-singleton received.");
418 if (flappingSingletonService == null) {
419 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
420 "No flapping-singleton registration present.");
421 final RpcResult<UnregisterFlappingSingletonOutput> result =
422 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
423 return Futures.immediateFuture(result);
426 final long flapCount = flappingSingletonService.setInactive();
427 flappingSingletonService = null;
429 final UnregisterFlappingSingletonOutput output =
430 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
432 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
436 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
441 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
443 if (ddtlReg != null) {
444 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
445 "There is already dataTreeChangeListener registered on id-ints list.");
446 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
449 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
452 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
453 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
454 ProduceTransactionsHandler.ID_INT_YID)),
455 true, Collections.emptyList());
456 } catch (DOMDataTreeLoopException e) {
457 LOG.error("Failed to register DOMDataTreeListener.", e);
460 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
464 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
465 final RegisterBoundConstantInput input) {
466 LOG.debug("register-bound-constant: {}", input);
468 if (input.getContext() == null) {
469 final RpcError error = RpcResultBuilder.newError(
470 ErrorType.RPC, "Invalid input.", "Context value is null");
471 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
475 if (input.getConstant() == null) {
476 final RpcError error = RpcResultBuilder.newError(
477 ErrorType.RPC, "Invalid input.", "Constant value is null");
478 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
482 if (routedRegistrations.containsKey(input.getContext())) {
483 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
484 "There is already a rpc registered for context: " + input.getContext());
485 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
489 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
490 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
491 input.getConstant(), input.getContext());
493 routedRegistrations.put(input.getContext(), rpcRegistration);
494 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
499 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
500 final RegisterFlappingSingletonInput input) {
501 LOG.debug("Received register-flapping-singleton.");
503 if (flappingSingletonService != null) {
504 final RpcError error = RpcResultBuilder.newError(
505 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
506 return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
507 .withRpcError(error).build());
510 flappingSingletonService = new FlappingSingletonService(singletonService);
512 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
517 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
518 LOG.debug("Received unsubscribe-dtcl");
520 if (idIntsListener == null || dtclReg == null) {
521 final RpcError error = RpcResultBuilder.newError(
522 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
523 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
524 .withRpcError(error).build());
528 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
529 } catch (InterruptedException | ExecutionException | TimeoutException e) {
530 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
531 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
532 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
533 .withRpcError(error).build());
539 if (!idIntsListener.hasTriggered()) {
540 final RpcError error = RpcResultBuilder.newError(
541 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
542 + "any notifications.");
543 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
544 .withRpcError(error).build());
547 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
549 final Optional<NormalizedNode<?, ?>> readResult =
550 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
552 if (!readResult.isPresent()) {
553 final RpcError error = RpcResultBuilder.newError(
554 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
555 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
556 .withRpcError(error).build());
559 return Futures.immediateFuture(
560 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
561 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
563 } catch (final InterruptedException | ExecutionException e) {
564 final RpcError error = RpcResultBuilder.newError(
565 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
566 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
567 .withRpcError(error).build());
573 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
574 LOG.debug("create-prefix-shard, input: {}", input);
576 return prefixShardHandler.onCreatePrefixShard(input);
580 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
581 final DeconfigureIdIntsShardInput input) {
586 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
587 LOG.debug("Received unsubscribe-ynl, input: {}", input);
589 if (!ynlRegistrations.containsKey(input.getId())) {
590 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
591 "No ynl listener with this id registered.");
592 final RpcResult<UnsubscribeYnlOutput> result =
593 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
594 return Futures.immediateFuture(result);
597 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
598 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
602 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
606 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
607 final CheckPublishNotificationsInput input) {
609 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
612 return Futures.immediateFuture(RpcResultBuilder.success(
613 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
616 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
617 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
619 if (task.getLastError() != null) {
620 LOG.error("Last error for {}", task, task.getLastError());
621 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
624 final CheckPublishNotificationsOutput output =
625 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
627 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
631 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
632 final ProduceTransactionsInput input) {
633 LOG.debug("producer-transactions, input: {}", input);
634 return ProduceTransactionsHandler.start(domDataTreeService, input);
638 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
639 final ShutdownShardReplicaInput input) {
640 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
642 final String shardName = input.getShardName();
643 if (Strings.isNullOrEmpty(shardName)) {
644 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
645 "A valid shard name must be specified");
646 return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
650 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
654 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
655 final ShutdownPrefixShardReplicaInput input) {
656 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
658 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
660 if (shardPrefix == null) {
661 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
662 "A valid shard prefix must be specified");
663 return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
664 .withRpcError(rpcError).build());
667 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
668 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
670 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
673 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
674 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
675 final ActorContext context = configDataStore.getActorContext();
677 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
678 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
679 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
680 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
682 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
684 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
685 if (throwable != null) {
686 shutdownShardAsk.failure(throwable);
688 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
691 }, context.getClientDispatcher());
693 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
695 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
696 if (throwable != null) {
697 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
698 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
699 rpcResult.set(failedResult);
701 // according to Patterns.gracefulStop API, we don't have to
702 // check value of gracefulStopResult
703 rpcResult.set(RpcResultBuilder.success(success).build());
706 }, context.getClientDispatcher());
711 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
713 LOG.debug("Received register-constant rpc, input: {}", input);
715 if (input.getConstant() == null) {
716 final RpcError error = RpcResultBuilder.newError(
717 ErrorType.RPC, "Invalid input.", "Constant value is null");
718 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
722 if (globalGetConstantRegistration != null) {
723 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
724 "There is already a get-constant rpc registered.");
725 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
729 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
730 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
734 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
735 final UnregisterDefaultConstantInput input) {
740 @SuppressWarnings("checkstyle:IllegalCatch")
741 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
742 LOG.debug("Received unsubscribe-ddtl.");
744 if (idIntsDdtl == null || ddtlReg == null) {
745 final RpcError error = RpcResultBuilder.newError(
746 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
747 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
748 .withRpcError(error).build());
752 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
753 } catch (InterruptedException | ExecutionException | TimeoutException e) {
754 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
755 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
756 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
757 .withRpcError(error).build());
763 if (!idIntsDdtl.hasTriggered()) {
764 final RpcError error = RpcResultBuilder.newError(
765 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
766 + "any notifications.");
767 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
768 .withRpcError(error).build());
771 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
772 LOG.debug("Creating distributed datastore client for shard {}", shardName);
774 final ActorContext actorContext = configDataStore.getActorContext();
775 final Props distributedDataStoreClientProps =
776 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
777 "Shard-" + shardName, actorContext, shardName);
779 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
780 final DataStoreClient distributedDataStoreClient;
782 distributedDataStoreClient = SimpleDataStoreClientActor
783 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
784 } catch (RuntimeException e) {
785 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
786 clientActor.tell(PoisonPill.getInstance(), noSender());
787 final RpcError error = RpcResultBuilder.newError(
788 ErrorType.APPLICATION, "Unable to create ds client for read.",
789 "Unable to create ds client for read.");
790 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
791 .withRpcError(error).build());
794 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
795 final ClientTransaction tx = localHistory.createTransaction();
796 final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
797 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
800 localHistory.close();
802 final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
803 if (!optional.isPresent()) {
804 LOG.warn("Final read from client is empty.");
805 final RpcError error = RpcResultBuilder.newError(
806 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
807 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
808 .withRpcError(error).build());
811 return Futures.immediateFuture(
812 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
813 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
815 } catch (InterruptedException | ExecutionException e) {
816 LOG.error("Unable to read data to verify ddtl data.", e);
817 final RpcError error = RpcResultBuilder.newError(
818 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
819 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
820 .withRpcError(error).build());
822 distributedDataStoreClient.close();
823 clientActor.tell(PoisonPill.getInstance(), noSender());