2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import static akka.actor.ActorRef.noSender;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Strings;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.Collections;
23 import java.util.HashMap;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
52 import org.opendaylight.mdsal.binding.api.NotificationService;
53 import org.opendaylight.mdsal.binding.api.RpcProviderService;
54 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
55 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
56 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
63 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
64 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
65 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
66 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
144 import org.opendaylight.yangtools.concepts.ListenerRegistration;
145 import org.opendaylight.yangtools.concepts.ObjectRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
148 import org.opendaylight.yangtools.yang.common.RpcResult;
149 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
150 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
151 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
152 import org.slf4j.Logger;
153 import org.slf4j.LoggerFactory;
154 import scala.concurrent.duration.FiniteDuration;
156 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
157 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
159 private final RpcProviderService rpcRegistry;
160 private final ObjectRegistration<OdlMdsalLowlevelControlService> registration;
161 private final DistributedShardFactory distributedShardFactory;
162 private final DistributedDataStoreInterface configDataStore;
163 private final DOMDataTreeService domDataTreeService;
164 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
165 private final DOMDataBroker domDataBroker;
166 private final NotificationPublishService notificationPublishService;
167 private final NotificationService notificationService;
168 private final DOMSchemaService schemaService;
169 private final ClusterSingletonServiceProvider singletonService;
170 private final DOMRpcProviderService domRpcService;
171 private final PrefixLeaderHandler prefixLeaderHandler;
172 private final PrefixShardHandler prefixShardHandler;
173 private final DOMDataTreeChangeService domDataTreeChangeService;
174 private final ActorSystem actorSystem;
176 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
177 routedRegistrations = new HashMap<>();
179 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
181 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
182 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
183 private FlappingSingletonService flappingSingletonService;
184 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
185 private IdIntsListener idIntsListener;
186 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
187 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
188 private IdIntsDOMDataTreeLIstener idIntsDdtl;
192 public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry,
193 final DOMRpcProviderService domRpcService,
194 final ClusterSingletonServiceProvider singletonService,
195 final DOMSchemaService schemaService,
196 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
197 final NotificationPublishService notificationPublishService,
198 final NotificationService notificationService,
199 final DOMDataBroker domDataBroker,
200 final DOMDataTreeService domDataTreeService,
201 final DistributedShardFactory distributedShardFactory,
202 final DistributedDataStoreInterface configDataStore,
203 final ActorSystemProvider actorSystemProvider) {
204 this.rpcRegistry = rpcRegistry;
205 this.domRpcService = domRpcService;
206 this.singletonService = singletonService;
207 this.schemaService = schemaService;
208 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
209 this.notificationPublishService = notificationPublishService;
210 this.notificationService = notificationService;
211 this.domDataBroker = domDataBroker;
212 this.domDataTreeService = domDataTreeService;
213 this.distributedShardFactory = distributedShardFactory;
214 this.configDataStore = configDataStore;
215 this.actorSystem = actorSystemProvider.getActorSystem();
217 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
218 domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
220 registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this);
222 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
223 bindingNormalizedNodeSerializer);
227 @SuppressWarnings("checkstyle:IllegalCatch")
228 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
229 final UnregisterSingletonConstantInput input) {
230 LOG.info("In unregisterSingletonConstant");
232 if (getSingletonConstantRegistration == null) {
233 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
234 "No prior RPC was registered").buildFuture();
238 getSingletonConstantRegistration.close();
239 getSingletonConstantRegistration = null;
241 return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
242 } catch (Exception e) {
243 String msg = "Error closing the singleton constant service";
245 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
246 ErrorType.APPLICATION, msg, e).buildFuture();
251 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
252 final StartPublishNotificationsInput input) {
253 LOG.info("In startPublishNotifications - input: {}", input);
255 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
256 input.getSeconds().toJava(), input.getNotificationsPerSecond().toJava());
258 publishNotificationsTasks.put(input.getId(), task);
262 return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
266 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
267 LOG.info("In subscribeDtcl - input: {}", input);
269 if (dtclReg != null) {
270 return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
271 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
274 idIntsListener = new IdIntsListener();
276 dtclReg = domDataTreeChangeService.registerDataTreeChangeListener(
277 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, WriteTransactionsHandler.ID_INT_YID),
280 return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
284 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
285 return WriteTransactionsHandler.start(domDataBroker, input);
289 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
294 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
295 final RemoveShardReplicaInput input) {
300 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
301 LOG.info("In subscribeYnl - input: {}", input);
303 if (ynlRegistrations.containsKey(input.getId())) {
304 return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
305 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
308 ynlRegistrations.put(input.getId(),
309 notificationService.registerNotificationListener(new YnlListener(input.getId())));
311 return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
315 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
316 LOG.info("In removePrefixShard - input: {}", input);
318 return prefixShardHandler.onRemovePrefixShard(input);
322 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
323 final BecomePrefixLeaderInput input) {
324 LOG.info("n becomePrefixLeader - input: {}", input);
326 return prefixLeaderHandler.makeLeaderLocal(input);
330 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
331 final UnregisterBoundConstantInput input) {
332 LOG.info("In unregisterBoundConstant - {}", input);
334 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
335 routedRegistrations.remove(input.getContext());
337 if (rpcRegistration == null) {
338 return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
339 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
342 rpcRegistration.close();
343 return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
347 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
348 final RegisterSingletonConstantInput input) {
349 LOG.info("In registerSingletonConstant - input: {}", input);
351 if (input.getConstant() == null) {
352 return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
353 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
356 getSingletonConstantRegistration =
357 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
359 return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
363 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
364 final RegisterDefaultConstantInput input) {
369 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
370 final UnregisterConstantInput input) {
371 LOG.info("In unregisterConstant");
373 if (globalGetConstantRegistration == null) {
374 return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
375 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
378 globalGetConstantRegistration.close();
379 globalGetConstantRegistration = null;
381 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
385 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
386 final UnregisterFlappingSingletonInput input) {
387 LOG.info("In unregisterFlappingSingleton");
389 if (flappingSingletonService == null) {
390 return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
391 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
394 final long flapCount = flappingSingletonService.setInactive();
395 flappingSingletonService = null;
397 return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
402 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
407 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
408 LOG.info("In subscribeDdtl");
410 if (ddtlReg != null) {
411 return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(ErrorType.RPC,
412 "data-exists", "There is already a listener registered for id-ints").buildFuture();
415 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
418 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
419 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
420 ProduceTransactionsHandler.ID_INT_YID)),
421 true, Collections.emptyList());
422 } catch (DOMDataTreeLoopException e) {
423 LOG.error("Failed to register DOMDataTreeListener", e);
424 return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(
425 ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture();
428 return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture();
432 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
433 final RegisterBoundConstantInput input) {
434 LOG.info("In registerBoundConstant - input: {}", input);
436 if (input.getContext() == null) {
437 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
438 ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
441 if (input.getConstant() == null) {
442 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
443 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
446 if (routedRegistrations.containsKey(input.getContext())) {
447 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
448 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
451 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
452 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
453 input.getConstant(), input.getContext());
455 routedRegistrations.put(input.getContext(), rpcRegistration);
456 return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
460 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
461 final RegisterFlappingSingletonInput input) {
462 LOG.info("In registerFlappingSingleton");
464 if (flappingSingletonService != null) {
465 return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
466 "data-exists", "There is already an rpc registered").buildFuture();
469 flappingSingletonService = new FlappingSingletonService(singletonService);
471 return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
475 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
476 LOG.info("In unsubscribeDtcl");
478 if (idIntsListener == null || dtclReg == null) {
479 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
480 ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
485 idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
486 } catch (InterruptedException | ExecutionException | TimeoutException e) {
487 LOG.error("Unable to finish notification processing", e);
488 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
489 "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
495 if (!idIntsListener.hasTriggered()) {
496 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
497 "id-ints listener has not received any notifications.").buildFuture();
500 try (DOMDataTreeReadTransaction rTx = domDataBroker.newReadOnlyTransaction()) {
501 final Optional<NormalizedNode<?, ?>> readResult = rTx.read(LogicalDatastoreType.CONFIGURATION,
502 WriteTransactionsHandler.ID_INT_YID).get();
504 if (!readResult.isPresent()) {
505 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
506 "No data read from id-ints list").buildFuture();
509 final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
511 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
512 idIntsListener.diffWithLocalCopy(readResult.get()));
515 return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
518 } catch (final InterruptedException | ExecutionException e) {
519 LOG.error("Final read of id-ints failed", e);
520 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
521 "Final read of id-ints failed", e).buildFuture();
526 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
527 LOG.info("In createPrefixShard - input: {}", input);
529 return prefixShardHandler.onCreatePrefixShard(input);
533 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
534 final DeconfigureIdIntsShardInput input) {
539 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
540 LOG.info("In unsubscribeYnl - input: {}", input);
542 if (!ynlRegistrations.containsKey(input.getId())) {
543 return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
544 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
547 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
548 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
552 return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
556 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
557 final CheckPublishNotificationsInput input) {
558 LOG.info("In checkPublishNotifications - input: {}", input);
560 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
563 return Futures.immediateFuture(RpcResultBuilder.success(
564 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
567 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
568 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
570 if (task.getLastError() != null) {
571 LOG.error("Last error for {}", task, task.getLastError());
572 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
575 final CheckPublishNotificationsOutput output =
576 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
578 return RpcResultBuilder.success(output).buildFuture();
582 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
583 final ProduceTransactionsInput input) {
584 LOG.info("In produceTransactions - input: {}", input);
585 return ProduceTransactionsHandler.start(domDataTreeService, input);
589 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
590 final ShutdownShardReplicaInput input) {
591 LOG.info("In shutdownShardReplica - input: {}", input);
593 final String shardName = input.getShardName();
594 if (Strings.isNullOrEmpty(shardName)) {
595 return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
596 shardName + "is not a valid shard name").buildFuture();
599 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
603 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
604 final ShutdownPrefixShardReplicaInput input) {
605 LOG.info("shutdownPrefixShardReplica - input: {}", input);
607 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
609 if (shardPrefix == null) {
610 return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
611 "A valid shard prefix must be specified").buildFuture();
614 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
615 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
617 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
620 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
621 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
622 final ActorUtils context = configDataStore.getActorUtils();
624 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
625 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
626 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
627 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
629 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
631 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
632 if (throwable != null) {
633 shutdownShardAsk.failure(throwable);
635 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
638 }, context.getClientDispatcher());
640 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
642 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
643 if (throwable != null) {
644 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
645 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
646 rpcResult.set(failedResult);
648 // according to Patterns.gracefulStop API, we don't have to
649 // check value of gracefulStopResult
650 rpcResult.set(RpcResultBuilder.success(success).build());
653 }, context.getClientDispatcher());
658 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
659 LOG.info("In registerConstant - input: {}", input);
661 if (input.getConstant() == null) {
662 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
663 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
666 if (globalGetConstantRegistration != null) {
667 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
668 "data-exists", "There is already an rpc registered").buildFuture();
671 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
672 return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
676 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
677 final UnregisterDefaultConstantInput input) {
682 @SuppressWarnings("checkstyle:IllegalCatch")
683 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
684 LOG.info("In unsubscribeDdtl");
686 if (idIntsDdtl == null || ddtlReg == null) {
687 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
688 ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
693 idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
694 } catch (InterruptedException | ExecutionException | TimeoutException e) {
695 LOG.error("Unable to finish notification processing", e);
696 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
697 "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
703 if (!idIntsDdtl.hasTriggered()) {
704 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
705 "No notification received.", "id-ints listener has not received any notifications").buildFuture();
708 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
709 LOG.debug("Creating distributed datastore client for shard {}", shardName);
711 final ActorUtils actorUtils = configDataStore.getActorUtils();
712 final Props distributedDataStoreClientProps =
713 SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(),
714 "Shard-" + shardName, actorUtils, shardName);
716 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
717 final DataStoreClient distributedDataStoreClient;
719 distributedDataStoreClient = SimpleDataStoreClientActor
720 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
721 } catch (RuntimeException e) {
722 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
723 clientActor.tell(PoisonPill.getInstance(), noSender());
724 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
725 .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
728 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
729 final ClientTransaction tx = localHistory.createTransaction();
730 final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
731 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
734 localHistory.close();
736 final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
737 if (!optional.isPresent()) {
738 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
739 "data-missing", "Final read from id-ints is empty").buildFuture();
742 return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
743 idIntsDdtl.checkEqual(optional.get()))).buildFuture();
745 } catch (InterruptedException | ExecutionException e) {
746 LOG.error("Unable to read data to verify ddtl data", e);
747 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
748 .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
750 distributedDataStoreClient.close();
751 clientActor.tell(PoisonPill.getInstance(), noSender());