2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.SettableFuture;
25 import java.util.Collections;
26 import java.util.HashMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
145 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
146 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
147 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
148 import org.opendaylight.yangtools.concepts.ListenerRegistration;
149 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
150 import org.opendaylight.yangtools.yang.common.RpcError;
151 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
152 import org.opendaylight.yangtools.yang.common.RpcResult;
153 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
154 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
155 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
156 import org.slf4j.Logger;
157 import org.slf4j.LoggerFactory;
158 import scala.concurrent.duration.FiniteDuration;
160 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
162 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
163 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
164 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
166 private final RpcProviderRegistry rpcRegistry;
167 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
168 private final DistributedShardFactory distributedShardFactory;
169 private final DistributedDataStoreInterface configDataStore;
170 private final DOMDataTreeService domDataTreeService;
171 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
172 private final DOMDataBroker domDataBroker;
173 private final NotificationPublishService notificationPublishService;
174 private final NotificationService notificationService;
175 private final SchemaService schemaService;
176 private final ClusterSingletonServiceProvider singletonService;
177 private final DOMRpcProviderService domRpcService;
178 private final PrefixLeaderHandler prefixLeaderHandler;
179 private final PrefixShardHandler prefixShardHandler;
180 private final DOMDataTreeChangeService domDataTreeChangeService;
181 private final ActorSystem actorSystem;
183 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
184 routedRegistrations = new HashMap<>();
186 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
188 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
189 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
190 private FlappingSingletonService flappingSingletonService;
191 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
192 private IdIntsListener idIntsListener;
193 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
194 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
195 private IdIntsDOMDataTreeLIstener idIntsDdtl;
199 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
200 final DOMRpcProviderService domRpcService,
201 final ClusterSingletonServiceProvider singletonService,
202 final SchemaService schemaService,
203 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
204 final NotificationPublishService notificationPublishService,
205 final NotificationService notificationService,
206 final DOMDataBroker domDataBroker,
207 final DOMDataTreeService domDataTreeService,
208 final DistributedShardFactory distributedShardFactory,
209 final DistributedDataStoreInterface configDataStore,
210 final ActorSystemProvider actorSystemProvider) {
211 this.rpcRegistry = rpcRegistry;
212 this.domRpcService = domRpcService;
213 this.singletonService = singletonService;
214 this.schemaService = schemaService;
215 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
216 this.notificationPublishService = notificationPublishService;
217 this.notificationService = notificationService;
218 this.domDataBroker = domDataBroker;
219 this.domDataTreeService = domDataTreeService;
220 this.distributedShardFactory = distributedShardFactory;
221 this.configDataStore = configDataStore;
222 this.actorSystem = actorSystemProvider.getActorSystem();
224 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
226 domDataTreeChangeService =
227 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
229 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
231 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
232 bindingNormalizedNodeSerializer);
236 @SuppressWarnings("checkstyle:IllegalCatch")
237 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
238 final UnregisterSingletonConstantInput input) {
239 LOG.debug("unregister-singleton-constant");
241 if (getSingletonConstantRegistration == null) {
242 LOG.debug("No get-singleton-constant registration present.");
243 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
244 "No get-singleton-constant rpc registration present.");
245 final RpcResult<UnregisterSingletonConstantOutput> result =
246 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
247 return Futures.immediateFuture(result);
251 getSingletonConstantRegistration.close();
252 getSingletonConstantRegistration = null;
254 return Futures.immediateFuture(RpcResultBuilder.success(
255 new UnregisterSingletonConstantOutputBuilder().build()).build());
256 } catch (Exception e) {
257 LOG.debug("There was a problem closing the singleton constant service", e);
258 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
259 "There was a problem closing get-singleton-constant");
260 final RpcResult<UnregisterSingletonConstantOutput> result =
261 RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withRpcError(rpcError).build();
262 return Futures.immediateFuture(result);
267 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
268 final StartPublishNotificationsInput input) {
269 LOG.debug("publish-notifications, input: {}", input);
271 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
272 input.getSeconds(), input.getNotificationsPerSecond());
274 publishNotificationsTasks.put(input.getId(), task);
278 return Futures.immediateFuture(RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build())
283 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
285 if (dtclReg != null) {
286 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
287 "There is already dataTreeChangeListener registered on id-ints list.");
288 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDtclOutput>failed().withRpcError(error).build());
291 idIntsListener = new IdIntsListener();
293 dtclReg = domDataTreeChangeService
294 .registerDataTreeChangeListener(
295 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
296 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
299 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).build());
303 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
304 LOG.debug("write-transactions, input: {}", input);
305 return WriteTransactionsHandler.start(domDataBroker, input);
309 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
314 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
315 final RemoveShardReplicaInput input) {
320 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
322 LOG.debug("subscribe-ynl, input: {}", input);
324 if (ynlRegistrations.containsKey(input.getId())) {
325 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
326 "There is already ynl listener registered for this id: " + input.getId());
327 return Futures.immediateFuture(RpcResultBuilder.<SubscribeYnlOutput>failed().withRpcError(error).build());
330 ynlRegistrations.put(input.getId(),
331 notificationService.registerNotificationListener(new YnlListener(input.getId())));
333 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).build());
337 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
338 LOG.debug("remove-prefix-shard, input: {}", input);
340 return prefixShardHandler.onRemovePrefixShard(input);
344 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
345 final BecomePrefixLeaderInput input) {
346 LOG.debug("become-prefix-leader, input: {}", input);
348 return prefixLeaderHandler.makeLeaderLocal(input);
352 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
353 final UnregisterBoundConstantInput input) {
354 LOG.debug("unregister-bound-constant, {}", input);
356 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
357 routedRegistrations.remove(input.getContext());
359 if (rpcRegistration == null) {
360 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
361 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
362 "No get-constant rpc registration present.");
363 final RpcResult<UnregisterBoundConstantOutput> result =
364 RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withRpcError(rpcError).build();
365 return Futures.immediateFuture(result);
368 rpcRegistration.close();
369 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build())
374 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
375 final RegisterSingletonConstantInput input) {
377 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
379 if (input.getConstant() == null) {
380 final RpcError error = RpcResultBuilder.newError(
381 ErrorType.RPC, "Invalid input.", "Constant value is null");
382 return Futures.immediateFuture(RpcResultBuilder.<RegisterSingletonConstantOutput>failed()
383 .withRpcError(error).build());
386 getSingletonConstantRegistration =
387 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
389 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build())
394 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
395 final RegisterDefaultConstantInput input) {
400 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
401 final UnregisterConstantInput input) {
403 if (globalGetConstantRegistration == null) {
404 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
405 "No get-constant rpc registration present.");
406 return Futures.immediateFuture(RpcResultBuilder.<UnregisterConstantOutput>failed().withRpcError(rpcError)
410 globalGetConstantRegistration.close();
411 globalGetConstantRegistration = null;
413 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
417 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
418 final UnregisterFlappingSingletonInput input) {
419 LOG.debug("unregister-flapping-singleton received.");
421 if (flappingSingletonService == null) {
422 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
423 "No flapping-singleton registration present.");
424 final RpcResult<UnregisterFlappingSingletonOutput> result =
425 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
426 return Futures.immediateFuture(result);
429 final long flapCount = flappingSingletonService.setInactive();
430 flappingSingletonService = null;
432 final UnregisterFlappingSingletonOutput output =
433 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
435 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
439 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
444 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
446 if (ddtlReg != null) {
447 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
448 "There is already dataTreeChangeListener registered on id-ints list.");
449 return Futures.immediateFuture(RpcResultBuilder.<SubscribeDdtlOutput>failed().withRpcError(error).build());
452 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
455 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
456 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
457 ProduceTransactionsHandler.ID_INT_YID)),
458 true, Collections.emptyList());
459 } catch (DOMDataTreeLoopException e) {
460 LOG.error("Failed to register DOMDataTreeListener.", e);
463 return Futures.immediateFuture(RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).build());
467 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
468 final RegisterBoundConstantInput input) {
469 LOG.debug("register-bound-constant: {}", input);
471 if (input.getContext() == null) {
472 final RpcError error = RpcResultBuilder.newError(
473 ErrorType.RPC, "Invalid input.", "Context value is null");
474 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
478 if (input.getConstant() == null) {
479 final RpcError error = RpcResultBuilder.newError(
480 ErrorType.RPC, "Invalid input.", "Constant value is null");
481 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
485 if (routedRegistrations.containsKey(input.getContext())) {
486 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
487 "There is already a rpc registered for context: " + input.getContext());
488 return Futures.immediateFuture(RpcResultBuilder.<RegisterBoundConstantOutput>failed().withRpcError(error)
492 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
493 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
494 input.getConstant(), input.getContext());
496 routedRegistrations.put(input.getContext(), rpcRegistration);
497 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build())
502 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
503 final RegisterFlappingSingletonInput input) {
504 LOG.debug("Received register-flapping-singleton.");
506 if (flappingSingletonService != null) {
507 final RpcError error = RpcResultBuilder.newError(
508 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
509 return Futures.immediateFuture(RpcResultBuilder.<RegisterFlappingSingletonOutput>failed()
510 .withRpcError(error).build());
513 flappingSingletonService = new FlappingSingletonService(singletonService);
515 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build())
520 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
521 LOG.debug("Received unsubscribe-dtcl");
523 if (idIntsListener == null || dtclReg == null) {
524 final RpcError error = RpcResultBuilder.newError(
525 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
526 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
527 .withRpcError(error).build());
531 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
532 } catch (InterruptedException | ExecutionException | TimeoutException e) {
533 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
534 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
535 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
536 .withRpcError(error).build());
542 if (!idIntsListener.hasTriggered()) {
543 final RpcError error = RpcResultBuilder.newError(
544 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
545 + "any notifications.");
546 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
547 .withRpcError(error).build());
550 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
552 final Optional<NormalizedNode<?, ?>> readResult =
553 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
555 if (!readResult.isPresent()) {
556 final RpcError error = RpcResultBuilder.newError(
557 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
558 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
559 .withRpcError(error).build());
562 return Futures.immediateFuture(
563 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
564 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
566 } catch (final ReadFailedException e) {
567 final RpcError error = RpcResultBuilder.newError(
568 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
569 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
570 .withRpcError(error).build());
576 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
577 LOG.debug("create-prefix-shard, input: {}", input);
579 return prefixShardHandler.onCreatePrefixShard(input);
583 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
584 final DeconfigureIdIntsShardInput input) {
589 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
590 LOG.debug("Received unsubscribe-ynl, input: {}", input);
592 if (!ynlRegistrations.containsKey(input.getId())) {
593 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
594 "No ynl listener with this id registered.");
595 final RpcResult<UnsubscribeYnlOutput> result =
596 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
597 return Futures.immediateFuture(result);
600 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
601 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
605 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
609 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
610 final CheckPublishNotificationsInput input) {
612 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
615 return Futures.immediateFuture(RpcResultBuilder.success(
616 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
619 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
620 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
622 if (task.getLastError() != null) {
623 LOG.error("Last error for {}", task, task.getLastError());
624 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
627 final CheckPublishNotificationsOutput output =
628 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
630 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
634 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
635 final ProduceTransactionsInput input) {
636 LOG.debug("producer-transactions, input: {}", input);
637 return ProduceTransactionsHandler.start(domDataTreeService, input);
641 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
642 final ShutdownShardReplicaInput input) {
643 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
645 final String shardName = input.getShardName();
646 if (Strings.isNullOrEmpty(shardName)) {
647 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
648 "A valid shard name must be specified");
649 return Futures.immediateFuture(RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withRpcError(rpcError)
653 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
657 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
658 final ShutdownPrefixShardReplicaInput input) {
659 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
661 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
663 if (shardPrefix == null) {
664 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
665 "A valid shard prefix must be specified");
666 return Futures.immediateFuture(RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed()
667 .withRpcError(rpcError).build());
670 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
671 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
673 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
676 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
677 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
678 final ActorContext context = configDataStore.getActorContext();
680 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
681 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
682 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
683 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
685 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
687 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
688 if (throwable != null) {
689 shutdownShardAsk.failure(throwable);
691 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
694 }, context.getClientDispatcher());
696 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
698 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
699 if (throwable != null) {
700 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
701 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
702 rpcResult.set(failedResult);
704 // according to Patterns.gracefulStop API, we don't have to
705 // check value of gracefulStopResult
706 rpcResult.set(RpcResultBuilder.success(success).build());
709 }, context.getClientDispatcher());
714 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
716 LOG.debug("Received register-constant rpc, input: {}", input);
718 if (input.getConstant() == null) {
719 final RpcError error = RpcResultBuilder.newError(
720 ErrorType.RPC, "Invalid input.", "Constant value is null");
721 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
725 if (globalGetConstantRegistration != null) {
726 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
727 "There is already a get-constant rpc registered.");
728 return Futures.immediateFuture(RpcResultBuilder.<RegisterConstantOutput>failed().withRpcError(error)
732 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
733 return Futures.immediateFuture(RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).build());
737 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
738 final UnregisterDefaultConstantInput input) {
743 @SuppressWarnings("checkstyle:IllegalCatch")
744 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
745 LOG.debug("Received unsubscribe-ddtl.");
747 if (idIntsDdtl == null || ddtlReg == null) {
748 final RpcError error = RpcResultBuilder.newError(
749 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
750 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
751 .withRpcError(error).build());
755 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
756 } catch (InterruptedException | ExecutionException | TimeoutException e) {
757 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
758 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
759 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
760 .withRpcError(error).build());
766 if (!idIntsDdtl.hasTriggered()) {
767 final RpcError error = RpcResultBuilder.newError(
768 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
769 + "any notifications.");
770 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
771 .withRpcError(error).build());
774 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
775 LOG.debug("Creating distributed datastore client for shard {}", shardName);
777 final ActorContext actorContext = configDataStore.getActorContext();
778 final Props distributedDataStoreClientProps =
779 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
780 "Shard-" + shardName, actorContext, shardName);
782 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
783 final DataStoreClient distributedDataStoreClient;
785 distributedDataStoreClient = SimpleDataStoreClientActor
786 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
787 } catch (RuntimeException e) {
788 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
789 clientActor.tell(PoisonPill.getInstance(), noSender());
790 final RpcError error = RpcResultBuilder.newError(
791 ErrorType.APPLICATION, "Unable to create ds client for read.",
792 "Unable to create ds client for read.");
793 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
794 .withRpcError(error).build());
797 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
798 final ClientTransaction tx = localHistory.createTransaction();
799 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
800 org.opendaylight.mdsal.common.api.ReadFailedException> read =
801 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
804 localHistory.close();
806 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
807 if (!optional.isPresent()) {
808 LOG.warn("Final read from client is empty.");
809 final RpcError error = RpcResultBuilder.newError(
810 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
811 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
812 .withRpcError(error).build());
815 return Futures.immediateFuture(
816 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
817 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
819 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
820 LOG.error("Unable to read data to verify ddtl data.", e);
821 final RpcError error = RpcResultBuilder.newError(
822 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
823 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
824 .withRpcError(error).build());
826 distributedDataStoreClient.close();
827 clientActor.tell(PoisonPill.getInstance(), noSender());