2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import static akka.actor.ActorRef.noSender;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError;
148 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
149 import org.opendaylight.yangtools.yang.common.RpcResult;
150 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
151 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
152 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
153 import org.slf4j.Logger;
154 import org.slf4j.LoggerFactory;
155 import scala.concurrent.duration.FiniteDuration;
157 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
159 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
160 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
161 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
163 private final RpcProviderRegistry rpcRegistry;
164 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
165 private final DistributedShardFactory distributedShardFactory;
166 private final DistributedDataStoreInterface configDataStore;
167 private final DOMDataTreeService domDataTreeService;
168 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
169 private final DOMDataBroker domDataBroker;
170 private final NotificationPublishService notificationPublishService;
171 private final NotificationService notificationService;
172 private final DOMSchemaService schemaService;
173 private final ClusterSingletonServiceProvider singletonService;
174 private final DOMRpcProviderService domRpcService;
175 private final PrefixLeaderHandler prefixLeaderHandler;
176 private final PrefixShardHandler prefixShardHandler;
177 private final DOMDataTreeChangeService domDataTreeChangeService;
178 private final ActorSystem actorSystem;
180 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
181 routedRegistrations = new HashMap<>();
183 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
185 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
186 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
187 private FlappingSingletonService flappingSingletonService;
188 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
189 private IdIntsListener idIntsListener;
190 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
191 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
192 private IdIntsDOMDataTreeLIstener idIntsDdtl;
196 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
197 final DOMRpcProviderService domRpcService,
198 final ClusterSingletonServiceProvider singletonService,
199 final DOMSchemaService schemaService,
200 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
201 final NotificationPublishService notificationPublishService,
202 final NotificationService notificationService,
203 final DOMDataBroker domDataBroker,
204 final DOMDataTreeService domDataTreeService,
205 final DistributedShardFactory distributedShardFactory,
206 final DistributedDataStoreInterface configDataStore,
207 final ActorSystemProvider actorSystemProvider) {
208 this.rpcRegistry = rpcRegistry;
209 this.domRpcService = domRpcService;
210 this.singletonService = singletonService;
211 this.schemaService = schemaService;
212 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
213 this.notificationPublishService = notificationPublishService;
214 this.notificationService = notificationService;
215 this.domDataBroker = domDataBroker;
216 this.domDataTreeService = domDataTreeService;
217 this.distributedShardFactory = distributedShardFactory;
218 this.configDataStore = configDataStore;
219 this.actorSystem = actorSystemProvider.getActorSystem();
221 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
223 domDataTreeChangeService =
224 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
226 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
228 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
229 bindingNormalizedNodeSerializer);
233 @SuppressWarnings("checkstyle:IllegalCatch")
234 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
235 final UnregisterSingletonConstantInput input) {
236 LOG.debug("unregister-singleton-constant");
238 if (getSingletonConstantRegistration == null) {
239 LOG.debug("No get-singleton-constant registration present.");
240 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
241 "No get-singleton-constant rpc registration present.");
242 final RpcResult<UnregisterSingletonConstantOutput> result =
243 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
244 return Futures.immediateFuture(result);
248 getSingletonConstantRegistration.close();
249 getSingletonConstantRegistration = null;
251 return Futures.immediateFuture(RpcResultBuilder.success(
252 new UnregisterSingletonConstantOutputBuilder().build()).build());
253 } catch (Exception e) {
254 LOG.debug("There was a problem closing the singleton constant service", e);
255 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
256 "There was a problem closing get-singleton-constant");
257 final RpcResult<UnregisterSingletonConstantOutput> result =
258 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
259 return Futures.immediateFuture(result);
264 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
265 final StartPublishNotificationsInput input) {
266 LOG.debug("publish-notifications, input: {}", input);
268 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
269 input.getSeconds(), input.getNotificationsPerSecond());
271 publishNotificationsTasks.put(input.getId(), task);
275 return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
280 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
282 if (dtclReg != null) {
283 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
284 "There is already dataTreeChangeListener registered on id-ints list.");
285 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
288 idIntsListener = new IdIntsListener();
290 dtclReg = domDataTreeChangeService
291 .registerDataTreeChangeListener(
292 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
293 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
296 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
300 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
301 LOG.debug("write-transactions, input: {}", input);
302 return WriteTransactionsHandler.start(domDataBroker, input);
306 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
311 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
312 final RemoveShardReplicaInput input) {
317 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
319 LOG.debug("subscribe-ynl, input: {}", input);
321 if (ynlRegistrations.containsKey(input.getId())) {
322 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
323 "There is already ynl listener registered for this id: " + input.getId());
324 return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
327 ynlRegistrations.put(input.getId(),
328 notificationService.registerNotificationListener(new YnlListener(input.getId())));
330 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
334 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
335 LOG.debug("remove-prefix-shard, input: {}", input);
337 return prefixShardHandler.onRemovePrefixShard(input);
341 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
342 final BecomePrefixLeaderInput input) {
343 LOG.debug("become-prefix-leader, input: {}", input);
345 return prefixLeaderHandler.makeLeaderLocal(input);
349 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
350 final UnregisterBoundConstantInput input) {
351 LOG.debug("unregister-bound-constant, {}", input);
353 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
354 routedRegistrations.remove(input.getContext());
356 if (rpcRegistration == null) {
357 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
358 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
359 "No get-constant rpc registration present.");
360 final RpcResult<UnregisterBoundConstantOutput> result =
361 RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
362 return Futures.immediateFuture(result);
365 rpcRegistration.close();
366 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
371 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
372 final RegisterSingletonConstantInput input) {
374 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
376 if (input.getConstant() == null) {
377 final RpcError error = RpcResultBuilder.newError(
378 ErrorType.RPC, "Invalid input.", "Constant value is null");
379 return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
380 .withRpcError(error).build());
383 getSingletonConstantRegistration =
384 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
386 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
391 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
392 final RegisterDefaultConstantInput input) {
397 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
398 final UnregisterConstantInput input) {
400 if (globalGetConstantRegistration == null) {
401 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
402 "No get-constant rpc registration present.");
403 return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
407 globalGetConstantRegistration.close();
408 globalGetConstantRegistration = null;
410 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
414 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
415 final UnregisterFlappingSingletonInput input) {
416 LOG.debug("unregister-flapping-singleton received.");
418 if (flappingSingletonService == null) {
419 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
420 "No flapping-singleton registration present.");
421 final RpcResult<UnregisterFlappingSingletonOutput> result =
422 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
423 return Futures.immediateFuture(result);
426 final long flapCount = flappingSingletonService.setInactive();
427 flappingSingletonService = null;
429 final UnregisterFlappingSingletonOutput output =
430 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
432 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
436 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
441 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
443 if (ddtlReg != null) {
444 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
445 "There is already dataTreeChangeListener registered on id-ints list.");
446 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
449 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
452 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
453 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
454 ProduceTransactionsHandler.ID_INT_YID)),
455 true, Collections.emptyList());
456 } catch (DOMDataTreeLoopException e) {
457 LOG.error("Failed to register DOMDataTreeListener.", e);
460 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
464 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
465 final RegisterBoundConstantInput input) {
466 LOG.debug("register-bound-constant: {}", input);
468 if (input.getContext() == null) {
469 final RpcError error = RpcResultBuilder.newError(
470 ErrorType.RPC, "Invalid input.", "Context value is null");
471 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
475 if (input.getConstant() == null) {
476 final RpcError error = RpcResultBuilder.newError(
477 ErrorType.RPC, "Invalid input.", "Constant value is null");
478 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
482 if (routedRegistrations.containsKey(input.getContext())) {
483 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
484 "There is already a rpc registered for context: " + input.getContext());
485 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
489 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
490 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
491 input.getConstant(), input.getContext());
493 routedRegistrations.put(input.getContext(), rpcRegistration);
494 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
499 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
500 final RegisterFlappingSingletonInput input) {
501 LOG.debug("Received register-flapping-singleton.");
503 if (flappingSingletonService != null) {
504 final RpcError error = RpcResultBuilder.newError(
505 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
506 return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
507 .withRpcError(error).build());
510 flappingSingletonService = new FlappingSingletonService(singletonService);
512 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
517 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
518 LOG.debug("Received unsubscribe-dtcl");
520 if (idIntsListener == null || dtclReg == null) {
521 final RpcError error = RpcResultBuilder.newError(
522 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
523 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
524 .withRpcError(error).build());
528 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
529 } catch (InterruptedException | ExecutionException | TimeoutException e) {
530 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
531 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
532 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
533 .withRpcError(error).build());
539 if (!idIntsListener.hasTriggered()) {
540 final RpcError error = RpcResultBuilder.newError(ErrorType.APPLICATION, "operation-failed",
541 "id-ints listener has not received any notifications.");
542 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
543 .withRpcError(error).build());
546 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
548 final Optional<NormalizedNode<?, ?>> readResult =
549 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
551 if (!readResult.isPresent()) {
552 final RpcError error = RpcResultBuilder.newError(
553 ErrorType.APPLICATION, "data-missing", "No data read from id-ints list.");
554 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
555 .withRpcError(error).build());
558 final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
560 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
561 idIntsListener.diffWithLocalCopy(readResult.get()));
564 return Futures.immediateFuture(
565 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual)).build());
567 } catch (final InterruptedException | ExecutionException e) {
568 final RpcError error = RpcResultBuilder.newError(
569 ErrorType.APPLICATION, "operation-failed", "Final read from id-ints failed.", null, null, e);
570 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
571 .withRpcError(error).build());
577 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
578 LOG.debug("create-prefix-shard, input: {}", input);
580 return prefixShardHandler.onCreatePrefixShard(input);
584 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
585 final DeconfigureIdIntsShardInput input) {
590 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
591 LOG.debug("Received unsubscribe-ynl, input: {}", input);
593 if (!ynlRegistrations.containsKey(input.getId())) {
594 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
595 "No ynl listener with this id registered.");
596 final RpcResult<UnsubscribeYnlOutput> result =
597 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
598 return Futures.immediateFuture(result);
601 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
602 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
606 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
610 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
611 final CheckPublishNotificationsInput input) {
613 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
616 return Futures.immediateFuture(RpcResultBuilder.success(
617 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
620 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
621 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
623 if (task.getLastError() != null) {
624 LOG.error("Last error for {}", task, task.getLastError());
625 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
628 final CheckPublishNotificationsOutput output =
629 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
631 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
635 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
636 final ProduceTransactionsInput input) {
637 LOG.debug("producer-transactions, input: {}", input);
638 return ProduceTransactionsHandler.start(domDataTreeService, input);
642 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
643 final ShutdownShardReplicaInput input) {
644 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
646 final String shardName = input.getShardName();
647 if (Strings.isNullOrEmpty(shardName)) {
648 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
649 "A valid shard name must be specified");
650 return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
654 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
658 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
659 final ShutdownPrefixShardReplicaInput input) {
660 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
662 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
664 if (shardPrefix == null) {
665 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
666 "A valid shard prefix must be specified");
667 return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
668 .withRpcError(rpcError).build());
671 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
672 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
674 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
677 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
678 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
679 final ActorContext context = configDataStore.getActorContext();
681 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
682 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
683 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
684 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
686 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
688 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
689 if (throwable != null) {
690 shutdownShardAsk.failure(throwable);
692 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
695 }, context.getClientDispatcher());
697 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
699 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
700 if (throwable != null) {
701 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
702 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
703 rpcResult.set(failedResult);
705 // according to Patterns.gracefulStop API, we don't have to
706 // check value of gracefulStopResult
707 rpcResult.set(RpcResultBuilder.success(success).build());
710 }, context.getClientDispatcher());
715 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
717 LOG.debug("Received register-constant rpc, input: {}", input);
719 if (input.getConstant() == null) {
720 final RpcError error = RpcResultBuilder.newError(
721 ErrorType.RPC, "Invalid input.", "Constant value is null");
722 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
726 if (globalGetConstantRegistration != null) {
727 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
728 "There is already a get-constant rpc registered.");
729 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
733 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
734 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
738 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
739 final UnregisterDefaultConstantInput input) {
744 @SuppressWarnings("checkstyle:IllegalCatch")
745 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
746 LOG.debug("Received unsubscribe-ddtl.");
748 if (idIntsDdtl == null || ddtlReg == null) {
749 final RpcError error = RpcResultBuilder.newError(
750 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
751 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
752 .withRpcError(error).build());
756 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
757 } catch (InterruptedException | ExecutionException | TimeoutException e) {
758 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
759 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
760 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
761 .withRpcError(error).build());
767 if (!idIntsDdtl.hasTriggered()) {
768 final RpcError error = RpcResultBuilder.newError(
769 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
770 + "any notifications.");
771 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
772 .withRpcError(error).build());
775 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
776 LOG.debug("Creating distributed datastore client for shard {}", shardName);
778 final ActorContext actorContext = configDataStore.getActorContext();
779 final Props distributedDataStoreClientProps =
780 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
781 "Shard-" + shardName, actorContext, shardName);
783 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
784 final DataStoreClient distributedDataStoreClient;
786 distributedDataStoreClient = SimpleDataStoreClientActor
787 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
788 } catch (RuntimeException e) {
789 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
790 clientActor.tell(PoisonPill.getInstance(), noSender());
791 final RpcError error = RpcResultBuilder.newError(
792 ErrorType.APPLICATION, "Unable to create ds client for read.",
793 "Unable to create ds client for read.");
794 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
795 .withRpcError(error).build());
798 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
799 final ClientTransaction tx = localHistory.createTransaction();
800 final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
801 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
804 localHistory.close();
806 final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
807 if (!optional.isPresent()) {
808 LOG.warn("Final read from client is empty.");
809 final RpcError error = RpcResultBuilder.newError(
810 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
811 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
812 .withRpcError(error).build());
815 return Futures.immediateFuture(
816 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
817 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
819 } catch (InterruptedException | ExecutionException e) {
820 LOG.error("Unable to read data to verify ddtl data.", e);
821 final RpcError error = RpcResultBuilder.newError(
822 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
823 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
824 .withRpcError(error).build());
826 distributedDataStoreClient.close();
827 clientActor.tell(PoisonPill.getInstance(), noSender());