2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import static akka.actor.ActorRef.noSender;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
148 import org.opendaylight.yangtools.yang.common.RpcResult;
149 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
150 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
151 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
152 import org.slf4j.Logger;
153 import org.slf4j.LoggerFactory;
154 import scala.concurrent.duration.FiniteDuration;
156 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
158 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
159 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
160 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
162 private final RpcProviderRegistry rpcRegistry;
163 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
164 private final DistributedShardFactory distributedShardFactory;
165 private final DistributedDataStoreInterface configDataStore;
166 private final DOMDataTreeService domDataTreeService;
167 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
168 private final DOMDataBroker domDataBroker;
169 private final NotificationPublishService notificationPublishService;
170 private final NotificationService notificationService;
171 private final DOMSchemaService schemaService;
172 private final ClusterSingletonServiceProvider singletonService;
173 private final DOMRpcProviderService domRpcService;
174 private final PrefixLeaderHandler prefixLeaderHandler;
175 private final PrefixShardHandler prefixShardHandler;
176 private final DOMDataTreeChangeService domDataTreeChangeService;
177 private final ActorSystem actorSystem;
179 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
180 routedRegistrations = new HashMap<>();
182 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
184 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
185 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
186 private FlappingSingletonService flappingSingletonService;
187 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
188 private IdIntsListener idIntsListener;
189 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
190 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
191 private IdIntsDOMDataTreeLIstener idIntsDdtl;
195 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
196 final DOMRpcProviderService domRpcService,
197 final ClusterSingletonServiceProvider singletonService,
198 final DOMSchemaService schemaService,
199 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
200 final NotificationPublishService notificationPublishService,
201 final NotificationService notificationService,
202 final DOMDataBroker domDataBroker,
203 final DOMDataTreeService domDataTreeService,
204 final DistributedShardFactory distributedShardFactory,
205 final DistributedDataStoreInterface configDataStore,
206 final ActorSystemProvider actorSystemProvider) {
207 this.rpcRegistry = rpcRegistry;
208 this.domRpcService = domRpcService;
209 this.singletonService = singletonService;
210 this.schemaService = schemaService;
211 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
212 this.notificationPublishService = notificationPublishService;
213 this.notificationService = notificationService;
214 this.domDataBroker = domDataBroker;
215 this.domDataTreeService = domDataTreeService;
216 this.distributedShardFactory = distributedShardFactory;
217 this.configDataStore = configDataStore;
218 this.actorSystem = actorSystemProvider.getActorSystem();
220 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
222 domDataTreeChangeService =
223 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
225 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
227 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
228 bindingNormalizedNodeSerializer);
232 @SuppressWarnings("checkstyle:IllegalCatch")
233 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
234 final UnregisterSingletonConstantInput input) {
235 LOG.info("In unregisterSingletonConstant");
237 if (getSingletonConstantRegistration == null) {
238 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
239 "No prior RPC was registered").buildFuture();
243 getSingletonConstantRegistration.close();
244 getSingletonConstantRegistration = null;
246 return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
247 } catch (Exception e) {
248 String msg = "Error closing the singleton constant service";
250 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
251 ErrorType.APPLICATION, msg, e).buildFuture();
256 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
257 final StartPublishNotificationsInput input) {
258 LOG.info("In startPublishNotifications - input: {}", input);
260 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
261 input.getSeconds(), input.getNotificationsPerSecond());
263 publishNotificationsTasks.put(input.getId(), task);
267 return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
271 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
272 LOG.info("In subscribeDtcl - input: {}", input);
274 if (dtclReg != null) {
275 return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
276 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
279 idIntsListener = new IdIntsListener();
281 dtclReg = domDataTreeChangeService
282 .registerDataTreeChangeListener(
283 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
284 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
287 return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
291 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
292 return WriteTransactionsHandler.start(domDataBroker, input);
296 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
301 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
302 final RemoveShardReplicaInput input) {
307 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
308 LOG.info("In subscribeYnl - input: {}", input);
310 if (ynlRegistrations.containsKey(input.getId())) {
311 return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
312 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
315 ynlRegistrations.put(input.getId(),
316 notificationService.registerNotificationListener(new YnlListener(input.getId())));
318 return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
322 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
323 LOG.info("In removePrefixShard - input: {}", input);
325 return prefixShardHandler.onRemovePrefixShard(input);
329 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
330 final BecomePrefixLeaderInput input) {
331 LOG.info("n becomePrefixLeader - input: {}", input);
333 return prefixLeaderHandler.makeLeaderLocal(input);
337 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
338 final UnregisterBoundConstantInput input) {
339 LOG.info("In unregisterBoundConstant - {}", input);
341 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
342 routedRegistrations.remove(input.getContext());
344 if (rpcRegistration == null) {
345 return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
346 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
349 rpcRegistration.close();
350 return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
354 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
355 final RegisterSingletonConstantInput input) {
356 LOG.info("In registerSingletonConstant - input: {}", input);
358 if (input.getConstant() == null) {
359 return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
360 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
363 getSingletonConstantRegistration =
364 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
366 return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
370 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
371 final RegisterDefaultConstantInput input) {
376 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
377 final UnregisterConstantInput input) {
378 LOG.info("In unregisterConstant");
380 if (globalGetConstantRegistration == null) {
381 return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
382 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
385 globalGetConstantRegistration.close();
386 globalGetConstantRegistration = null;
388 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
392 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
393 final UnregisterFlappingSingletonInput input) {
394 LOG.info("In unregisterFlappingSingleton");
396 if (flappingSingletonService == null) {
397 return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
398 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
401 final long flapCount = flappingSingletonService.setInactive();
402 flappingSingletonService = null;
404 return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
409 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
414 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
415 LOG.info("In subscribeDdtl");
417 if (ddtlReg != null) {
418 return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(ErrorType.RPC,
419 "data-exists", "There is already a listener registered for id-ints").buildFuture();
422 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
425 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
426 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
427 ProduceTransactionsHandler.ID_INT_YID)),
428 true, Collections.emptyList());
429 } catch (DOMDataTreeLoopException e) {
430 LOG.error("Failed to register DOMDataTreeListener", e);
431 return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(
432 ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture();
435 return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture();
439 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
440 final RegisterBoundConstantInput input) {
441 LOG.info("In registerBoundConstant - input: {}", input);
443 if (input.getContext() == null) {
444 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
445 ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
448 if (input.getConstant() == null) {
449 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
450 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
453 if (routedRegistrations.containsKey(input.getContext())) {
454 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
455 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
458 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
459 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
460 input.getConstant(), input.getContext());
462 routedRegistrations.put(input.getContext(), rpcRegistration);
463 return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
467 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
468 final RegisterFlappingSingletonInput input) {
469 LOG.info("In registerFlappingSingleton");
471 if (flappingSingletonService != null) {
472 return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
473 "data-exists", "There is already an rpc registered").buildFuture();
476 flappingSingletonService = new FlappingSingletonService(singletonService);
478 return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
482 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
483 LOG.info("In unsubscribeDtcl");
485 if (idIntsListener == null || dtclReg == null) {
486 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
487 ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
492 idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
493 } catch (InterruptedException | ExecutionException | TimeoutException e) {
494 LOG.error("Unable to finish notification processing", e);
495 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
496 "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
502 if (!idIntsListener.hasTriggered()) {
503 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
504 "id-ints listener has not received any notifications.").buildFuture();
507 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
509 final Optional<NormalizedNode<?, ?>> readResult =
510 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
512 if (!readResult.isPresent()) {
513 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
514 "No data read from id-ints list").buildFuture();
517 final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
519 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
520 idIntsListener.diffWithLocalCopy(readResult.get()));
523 return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
526 } catch (final InterruptedException | ExecutionException e) {
527 LOG.error("Final read of id-ints failed", e);
528 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
529 "Final read of id-ints failed", e).buildFuture();
534 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
535 LOG.info("In createPrefixShard - input: {}", input);
537 return prefixShardHandler.onCreatePrefixShard(input);
541 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
542 final DeconfigureIdIntsShardInput input) {
547 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
548 LOG.info("In unsubscribeYnl - input: {}", input);
550 if (!ynlRegistrations.containsKey(input.getId())) {
551 return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
552 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
555 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
556 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
560 return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
564 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
565 final CheckPublishNotificationsInput input) {
566 LOG.info("In checkPublishNotifications - input: {}", input);
568 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
571 return Futures.immediateFuture(RpcResultBuilder.success(
572 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
575 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
576 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
578 if (task.getLastError() != null) {
579 LOG.error("Last error for {}", task, task.getLastError());
580 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
583 final CheckPublishNotificationsOutput output =
584 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
586 return RpcResultBuilder.success(output).buildFuture();
590 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
591 final ProduceTransactionsInput input) {
592 LOG.info("In produceTransactions - input: {}", input);
593 return ProduceTransactionsHandler.start(domDataTreeService, input);
597 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
598 final ShutdownShardReplicaInput input) {
599 LOG.info("In shutdownShardReplica - input: {}", input);
601 final String shardName = input.getShardName();
602 if (Strings.isNullOrEmpty(shardName)) {
603 return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
604 shardName + "is not a valid shard name").buildFuture();
607 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
611 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
612 final ShutdownPrefixShardReplicaInput input) {
613 LOG.info("shutdownPrefixShardReplica - input: {}", input);
615 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
617 if (shardPrefix == null) {
618 return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
619 "A valid shard prefix must be specified").buildFuture();
622 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
623 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
625 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
628 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
629 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
630 final ActorUtils context = configDataStore.getActorUtils();
632 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
633 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
634 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
635 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
637 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
639 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
640 if (throwable != null) {
641 shutdownShardAsk.failure(throwable);
643 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
646 }, context.getClientDispatcher());
648 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
650 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
651 if (throwable != null) {
652 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
653 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
654 rpcResult.set(failedResult);
656 // according to Patterns.gracefulStop API, we don't have to
657 // check value of gracefulStopResult
658 rpcResult.set(RpcResultBuilder.success(success).build());
661 }, context.getClientDispatcher());
666 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
667 LOG.info("In registerConstant - input: {}", input);
669 if (input.getConstant() == null) {
670 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
671 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
674 if (globalGetConstantRegistration != null) {
675 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
676 "data-exists", "There is already an rpc registered").buildFuture();
679 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
680 return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
684 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
685 final UnregisterDefaultConstantInput input) {
690 @SuppressWarnings("checkstyle:IllegalCatch")
691 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
692 LOG.info("In unsubscribeDdtl");
694 if (idIntsDdtl == null || ddtlReg == null) {
695 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
696 ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
701 idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
702 } catch (InterruptedException | ExecutionException | TimeoutException e) {
703 LOG.error("Unable to finish notification processing", e);
704 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
705 "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
711 if (!idIntsDdtl.hasTriggered()) {
712 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
713 "No notification received.", "id-ints listener has not received any notifications").buildFuture();
716 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
717 LOG.debug("Creating distributed datastore client for shard {}", shardName);
719 final ActorUtils actorUtils = configDataStore.getActorUtils();
720 final Props distributedDataStoreClientProps =
721 SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(),
722 "Shard-" + shardName, actorUtils, shardName);
724 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
725 final DataStoreClient distributedDataStoreClient;
727 distributedDataStoreClient = SimpleDataStoreClientActor
728 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
729 } catch (RuntimeException e) {
730 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
731 clientActor.tell(PoisonPill.getInstance(), noSender());
732 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
733 .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
736 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
737 final ClientTransaction tx = localHistory.createTransaction();
738 final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
739 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
742 localHistory.close();
744 final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
745 if (!optional.isPresent()) {
746 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
747 "data-missing", "Final read from id-ints is empty").buildFuture();
750 return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
751 idIntsDdtl.checkEqual(optional.get()))).buildFuture();
753 } catch (InterruptedException | ExecutionException e) {
754 LOG.error("Unable to read data to verify ddtl data", e);
755 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
756 .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
758 distributedDataStoreClient.close();
759 clientActor.tell(PoisonPill.getInstance(), noSender());