2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import static akka.actor.ActorRef.noSender;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError;
148 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
149 import org.opendaylight.yangtools.yang.common.RpcResult;
150 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
151 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
152 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
153 import org.slf4j.Logger;
154 import org.slf4j.LoggerFactory;
155 import scala.concurrent.duration.FiniteDuration;
157 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
159 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
160 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
161 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
163 private final RpcProviderRegistry rpcRegistry;
164 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
165 private final DistributedShardFactory distributedShardFactory;
166 private final DistributedDataStoreInterface configDataStore;
167 private final DOMDataTreeService domDataTreeService;
168 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
169 private final DOMDataBroker domDataBroker;
170 private final NotificationPublishService notificationPublishService;
171 private final NotificationService notificationService;
172 private final DOMSchemaService schemaService;
173 private final ClusterSingletonServiceProvider singletonService;
174 private final DOMRpcProviderService domRpcService;
175 private final PrefixLeaderHandler prefixLeaderHandler;
176 private final PrefixShardHandler prefixShardHandler;
177 private final DOMDataTreeChangeService domDataTreeChangeService;
178 private final ActorSystem actorSystem;
180 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
181 routedRegistrations = new HashMap<>();
183 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
185 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
186 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
187 private FlappingSingletonService flappingSingletonService;
188 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
189 private IdIntsListener idIntsListener;
190 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
191 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
192 private IdIntsDOMDataTreeLIstener idIntsDdtl;
196 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
197 final DOMRpcProviderService domRpcService,
198 final ClusterSingletonServiceProvider singletonService,
199 final DOMSchemaService schemaService,
200 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
201 final NotificationPublishService notificationPublishService,
202 final NotificationService notificationService,
203 final DOMDataBroker domDataBroker,
204 final DOMDataTreeService domDataTreeService,
205 final DistributedShardFactory distributedShardFactory,
206 final DistributedDataStoreInterface configDataStore,
207 final ActorSystemProvider actorSystemProvider) {
208 this.rpcRegistry = rpcRegistry;
209 this.domRpcService = domRpcService;
210 this.singletonService = singletonService;
211 this.schemaService = schemaService;
212 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
213 this.notificationPublishService = notificationPublishService;
214 this.notificationService = notificationService;
215 this.domDataBroker = domDataBroker;
216 this.domDataTreeService = domDataTreeService;
217 this.distributedShardFactory = distributedShardFactory;
218 this.configDataStore = configDataStore;
219 this.actorSystem = actorSystemProvider.getActorSystem();
221 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
223 domDataTreeChangeService =
224 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
226 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
228 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
229 bindingNormalizedNodeSerializer);
233 @SuppressWarnings("checkstyle:IllegalCatch")
234 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
235 final UnregisterSingletonConstantInput input) {
236 LOG.debug("unregister-singleton-constant");
238 if (getSingletonConstantRegistration == null) {
239 LOG.debug("No get-singleton-constant registration present.");
240 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
241 "No get-singleton-constant rpc registration present.");
242 final RpcResult<UnregisterSingletonConstantOutput> result =
243 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
244 return Futures.immediateFuture(result);
248 getSingletonConstantRegistration.close();
249 getSingletonConstantRegistration = null;
251 return Futures.immediateFuture(RpcResultBuilder.success(
252 new UnregisterSingletonConstantOutputBuilder().build()).build());
253 } catch (Exception e) {
254 LOG.debug("There was a problem closing the singleton constant service", e);
255 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
256 "There was a problem closing get-singleton-constant");
257 final RpcResult<UnregisterSingletonConstantOutput> result =
258 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
259 return Futures.immediateFuture(result);
264 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
265 final StartPublishNotificationsInput input) {
266 LOG.debug("publish-notifications, input: {}", input);
268 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
269 input.getSeconds(), input.getNotificationsPerSecond());
271 publishNotificationsTasks.put(input.getId(), task);
275 return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
280 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
282 if (dtclReg != null) {
283 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
284 "There is already dataTreeChangeListener registered on id-ints list.");
285 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
288 idIntsListener = new IdIntsListener();
290 dtclReg = domDataTreeChangeService
291 .registerDataTreeChangeListener(
292 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
293 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
296 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
300 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
301 LOG.debug("write-transactions, input: {}", input);
302 return WriteTransactionsHandler.start(domDataBroker, input);
306 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
311 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
312 final RemoveShardReplicaInput input) {
317 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
319 LOG.debug("subscribe-ynl, input: {}", input);
321 if (ynlRegistrations.containsKey(input.getId())) {
322 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
323 "There is already ynl listener registered for this id: " + input.getId());
324 return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
327 ynlRegistrations.put(input.getId(),
328 notificationService.registerNotificationListener(new YnlListener(input.getId())));
330 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
334 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
335 LOG.debug("remove-prefix-shard, input: {}", input);
337 return prefixShardHandler.onRemovePrefixShard(input);
341 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
342 final BecomePrefixLeaderInput input) {
343 LOG.debug("become-prefix-leader, input: {}", input);
345 return prefixLeaderHandler.makeLeaderLocal(input);
349 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
350 final UnregisterBoundConstantInput input) {
351 LOG.debug("unregister-bound-constant, {}", input);
353 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
354 routedRegistrations.remove(input.getContext());
356 if (rpcRegistration == null) {
357 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
358 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
359 "No get-constant rpc registration present.");
360 final RpcResult<UnregisterBoundConstantOutput> result =
361 RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
362 return Futures.immediateFuture(result);
365 rpcRegistration.close();
366 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
371 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
372 final RegisterSingletonConstantInput input) {
374 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
376 if (input.getConstant() == null) {
377 final RpcError error = RpcResultBuilder.newError(
378 ErrorType.RPC, "Invalid input.", "Constant value is null");
379 return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
380 .withRpcError(error).build());
383 getSingletonConstantRegistration =
384 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
386 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
391 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
392 final RegisterDefaultConstantInput input) {
397 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
398 final UnregisterConstantInput input) {
400 if (globalGetConstantRegistration == null) {
401 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
402 "No get-constant rpc registration present.");
403 return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
407 globalGetConstantRegistration.close();
408 globalGetConstantRegistration = null;
410 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
414 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
415 final UnregisterFlappingSingletonInput input) {
416 LOG.debug("unregister-flapping-singleton received.");
418 if (flappingSingletonService == null) {
419 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
420 "No flapping-singleton registration present.");
421 final RpcResult<UnregisterFlappingSingletonOutput> result =
422 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
423 return Futures.immediateFuture(result);
426 final long flapCount = flappingSingletonService.setInactive();
427 flappingSingletonService = null;
429 final UnregisterFlappingSingletonOutput output =
430 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
432 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
436 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
441 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
443 if (ddtlReg != null) {
444 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
445 "There is already dataTreeChangeListener registered on id-ints list.");
446 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
449 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
452 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
453 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
454 ProduceTransactionsHandler.ID_INT_YID)),
455 true, Collections.emptyList());
456 } catch (DOMDataTreeLoopException e) {
457 LOG.error("Failed to register DOMDataTreeListener.", e);
460 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
464 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
465 final RegisterBoundConstantInput input) {
466 LOG.debug("register-bound-constant: {}", input);
468 if (input.getContext() == null) {
469 final RpcError error = RpcResultBuilder.newError(
470 ErrorType.RPC, "Invalid input.", "Context value is null");
471 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
475 if (input.getConstant() == null) {
476 final RpcError error = RpcResultBuilder.newError(
477 ErrorType.RPC, "Invalid input.", "Constant value is null");
478 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
482 if (routedRegistrations.containsKey(input.getContext())) {
483 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
484 "There is already a rpc registered for context: " + input.getContext());
485 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
489 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
490 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
491 input.getConstant(), input.getContext());
493 routedRegistrations.put(input.getContext(), rpcRegistration);
494 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
499 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
500 final RegisterFlappingSingletonInput input) {
501 LOG.debug("Received register-flapping-singleton.");
503 if (flappingSingletonService != null) {
504 final RpcError error = RpcResultBuilder.newError(
505 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
506 return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
507 .withRpcError(error).build());
510 flappingSingletonService = new FlappingSingletonService(singletonService);
512 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
517 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
518 LOG.debug("Received unsubscribe-dtcl");
520 if (idIntsListener == null || dtclReg == null) {
521 final RpcError error = RpcResultBuilder.newError(
522 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
523 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
524 .withRpcError(error).build());
528 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
529 } catch (InterruptedException | ExecutionException | TimeoutException e) {
530 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
531 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
532 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
533 .withRpcError(error).build());
539 if (!idIntsListener.hasTriggered()) {
540 final RpcError error = RpcResultBuilder.newError(
541 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
542 + "any notifications.");
543 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
544 .withRpcError(error).build());
547 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
549 final Optional<NormalizedNode<?, ?>> readResult =
550 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
552 if (!readResult.isPresent()) {
553 final RpcError error = RpcResultBuilder.newError(
554 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
555 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
556 .withRpcError(error).build());
559 final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
561 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
562 idIntsListener.diffWithLocalCopy(readResult.get()));
565 return Futures.immediateFuture(
566 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual)).build());
568 } catch (final InterruptedException | ExecutionException e) {
569 final RpcError error = RpcResultBuilder.newError(
570 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
571 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
572 .withRpcError(error).build());
578 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
579 LOG.debug("create-prefix-shard, input: {}", input);
581 return prefixShardHandler.onCreatePrefixShard(input);
585 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
586 final DeconfigureIdIntsShardInput input) {
591 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
592 LOG.debug("Received unsubscribe-ynl, input: {}", input);
594 if (!ynlRegistrations.containsKey(input.getId())) {
595 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
596 "No ynl listener with this id registered.");
597 final RpcResult<UnsubscribeYnlOutput> result =
598 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
599 return Futures.immediateFuture(result);
602 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
603 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
607 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
611 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
612 final CheckPublishNotificationsInput input) {
614 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
617 return Futures.immediateFuture(RpcResultBuilder.success(
618 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
621 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
622 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
624 if (task.getLastError() != null) {
625 LOG.error("Last error for {}", task, task.getLastError());
626 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
629 final CheckPublishNotificationsOutput output =
630 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
632 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
636 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
637 final ProduceTransactionsInput input) {
638 LOG.debug("producer-transactions, input: {}", input);
639 return ProduceTransactionsHandler.start(domDataTreeService, input);
643 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
644 final ShutdownShardReplicaInput input) {
645 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
647 final String shardName = input.getShardName();
648 if (Strings.isNullOrEmpty(shardName)) {
649 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
650 "A valid shard name must be specified");
651 return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
655 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
659 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
660 final ShutdownPrefixShardReplicaInput input) {
661 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
663 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
665 if (shardPrefix == null) {
666 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
667 "A valid shard prefix must be specified");
668 return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
669 .withRpcError(rpcError).build());
672 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
673 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
675 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
678 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
679 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
680 final ActorContext context = configDataStore.getActorContext();
682 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
683 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
684 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
685 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
687 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
689 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
690 if (throwable != null) {
691 shutdownShardAsk.failure(throwable);
693 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
696 }, context.getClientDispatcher());
698 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
700 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
701 if (throwable != null) {
702 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
703 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
704 rpcResult.set(failedResult);
706 // according to Patterns.gracefulStop API, we don't have to
707 // check value of gracefulStopResult
708 rpcResult.set(RpcResultBuilder.success(success).build());
711 }, context.getClientDispatcher());
716 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
718 LOG.debug("Received register-constant rpc, input: {}", input);
720 if (input.getConstant() == null) {
721 final RpcError error = RpcResultBuilder.newError(
722 ErrorType.RPC, "Invalid input.", "Constant value is null");
723 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
727 if (globalGetConstantRegistration != null) {
728 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
729 "There is already a get-constant rpc registered.");
730 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
734 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
735 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
739 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
740 final UnregisterDefaultConstantInput input) {
745 @SuppressWarnings("checkstyle:IllegalCatch")
746 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
747 LOG.debug("Received unsubscribe-ddtl.");
749 if (idIntsDdtl == null || ddtlReg == null) {
750 final RpcError error = RpcResultBuilder.newError(
751 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
752 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
753 .withRpcError(error).build());
757 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
758 } catch (InterruptedException | ExecutionException | TimeoutException e) {
759 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
760 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
761 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
762 .withRpcError(error).build());
768 if (!idIntsDdtl.hasTriggered()) {
769 final RpcError error = RpcResultBuilder.newError(
770 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
771 + "any notifications.");
772 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
773 .withRpcError(error).build());
776 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
777 LOG.debug("Creating distributed datastore client for shard {}", shardName);
779 final ActorContext actorContext = configDataStore.getActorContext();
780 final Props distributedDataStoreClientProps =
781 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
782 "Shard-" + shardName, actorContext, shardName);
784 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
785 final DataStoreClient distributedDataStoreClient;
787 distributedDataStoreClient = SimpleDataStoreClientActor
788 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
789 } catch (RuntimeException e) {
790 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
791 clientActor.tell(PoisonPill.getInstance(), noSender());
792 final RpcError error = RpcResultBuilder.newError(
793 ErrorType.APPLICATION, "Unable to create ds client for read.",
794 "Unable to create ds client for read.");
795 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
796 .withRpcError(error).build());
799 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
800 final ClientTransaction tx = localHistory.createTransaction();
801 final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
802 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
805 localHistory.close();
807 final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
808 if (!optional.isPresent()) {
809 LOG.warn("Final read from client is empty.");
810 final RpcError error = RpcResultBuilder.newError(
811 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
812 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
813 .withRpcError(error).build());
816 return Futures.immediateFuture(
817 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
818 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
820 } catch (InterruptedException | ExecutionException e) {
821 LOG.error("Unable to read data to verify ddtl data.", e);
822 final RpcError error = RpcResultBuilder.newError(
823 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
824 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
825 .withRpcError(error).build());
827 distributedDataStoreClient.close();
828 clientActor.tell(PoisonPill.getInstance(), noSender());