e7880ac1bfbc17438f0f32c674f63516398dcd62
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import static akka.actor.ActorRef.noSender;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Strings;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
52 import org.opendaylight.mdsal.binding.api.NotificationService;
53 import org.opendaylight.mdsal.binding.api.RpcProviderService;
54 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
55 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
56 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
63 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
64 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
65 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
66 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
144 import org.opendaylight.yangtools.concepts.ListenerRegistration;
145 import org.opendaylight.yangtools.concepts.ObjectRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
148 import org.opendaylight.yangtools.yang.common.RpcResult;
149 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
150 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
151 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
152 import org.slf4j.Logger;
153 import org.slf4j.LoggerFactory;
154 import scala.concurrent.duration.FiniteDuration;
155
156 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
157     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
158
159     private final RpcProviderService rpcRegistry;
160     private final ObjectRegistration<OdlMdsalLowlevelControlService> registration;
161     private final DistributedShardFactory distributedShardFactory;
162     private final DistributedDataStoreInterface configDataStore;
163     private final DOMDataTreeService domDataTreeService;
164     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
165     private final DOMDataBroker domDataBroker;
166     private final NotificationPublishService notificationPublishService;
167     private final NotificationService notificationService;
168     private final DOMSchemaService schemaService;
169     private final ClusterSingletonServiceProvider singletonService;
170     private final DOMRpcProviderService domRpcService;
171     private final PrefixLeaderHandler prefixLeaderHandler;
172     private final PrefixShardHandler prefixShardHandler;
173     private final DOMDataTreeChangeService domDataTreeChangeService;
174     private final ActorSystem actorSystem;
175
176     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
177             routedRegistrations = new HashMap<>();
178
179     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
180
181     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
182     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
183     private FlappingSingletonService flappingSingletonService;
184     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
185     private IdIntsListener idIntsListener;
186     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
187     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
188     private IdIntsDOMDataTreeLIstener idIntsDdtl;
189
190
191
192     public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry,
193                                      final DOMRpcProviderService domRpcService,
194                                      final ClusterSingletonServiceProvider singletonService,
195                                      final DOMSchemaService schemaService,
196                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
197                                      final NotificationPublishService notificationPublishService,
198                                      final NotificationService notificationService,
199                                      final DOMDataBroker domDataBroker,
200                                      final DOMDataTreeService domDataTreeService,
201                                      final DistributedShardFactory distributedShardFactory,
202                                      final DistributedDataStoreInterface configDataStore,
203                                      final ActorSystemProvider actorSystemProvider) {
204         this.rpcRegistry = rpcRegistry;
205         this.domRpcService = domRpcService;
206         this.singletonService = singletonService;
207         this.schemaService = schemaService;
208         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
209         this.notificationPublishService = notificationPublishService;
210         this.notificationService = notificationService;
211         this.domDataBroker = domDataBroker;
212         this.domDataTreeService = domDataTreeService;
213         this.distributedShardFactory = distributedShardFactory;
214         this.configDataStore = configDataStore;
215         this.actorSystem = actorSystemProvider.getActorSystem();
216
217         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
218         domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
219
220         registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this);
221
222         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
223                 bindingNormalizedNodeSerializer);
224     }
225
226     @Override
227     @SuppressWarnings("checkstyle:IllegalCatch")
228     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
229             final UnregisterSingletonConstantInput input) {
230         LOG.info("In unregisterSingletonConstant");
231
232         if (getSingletonConstantRegistration == null) {
233             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
234                     "No prior RPC was registered").buildFuture();
235         }
236
237         try {
238             getSingletonConstantRegistration.close();
239             getSingletonConstantRegistration = null;
240
241             return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
242         } catch (Exception e) {
243             String msg = "Error closing the singleton constant service";
244             LOG.error(msg, e);
245             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
246                     ErrorType.APPLICATION, msg, e).buildFuture();
247         }
248     }
249
250     @Override
251     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
252             final StartPublishNotificationsInput input) {
253         LOG.info("In startPublishNotifications - input: {}", input);
254
255         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
256                 input.getSeconds().toJava(), input.getNotificationsPerSecond().toJava());
257
258         publishNotificationsTasks.put(input.getId(), task);
259
260         task.start();
261
262         return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
263     }
264
265     @Override
266     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
267         LOG.info("In subscribeDtcl - input: {}", input);
268
269         if (dtclReg != null) {
270             return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
271                 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
272         }
273
274         idIntsListener = new IdIntsListener();
275
276         dtclReg = domDataTreeChangeService.registerDataTreeChangeListener(
277             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, WriteTransactionsHandler.ID_INT_YID),
278             idIntsListener);
279
280         return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
281     }
282
283     @Override
284     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
285         return WriteTransactionsHandler.start(domDataBroker, input);
286     }
287
288     @Override
289     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
290         return null;
291     }
292
293     @Override
294     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
295             final RemoveShardReplicaInput input) {
296         return null;
297     }
298
299     @Override
300     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
301         LOG.info("In subscribeYnl - input: {}", input);
302
303         if (ynlRegistrations.containsKey(input.getId())) {
304             return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
305                 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
306         }
307
308         ynlRegistrations.put(input.getId(),
309                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
310
311         return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
312     }
313
314     @Override
315     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
316         LOG.info("In removePrefixShard - input: {}", input);
317
318         return prefixShardHandler.onRemovePrefixShard(input);
319     }
320
321     @Override
322     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
323             final BecomePrefixLeaderInput input) {
324         LOG.info("n becomePrefixLeader - input: {}", input);
325
326         return prefixLeaderHandler.makeLeaderLocal(input);
327     }
328
329     @Override
330     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
331             final UnregisterBoundConstantInput input) {
332         LOG.info("In unregisterBoundConstant - {}", input);
333
334         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
335                 routedRegistrations.remove(input.getContext());
336
337         if (rpcRegistration == null) {
338             return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
339                 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
340         }
341
342         rpcRegistration.close();
343         return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
344     }
345
346     @Override
347     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
348             final RegisterSingletonConstantInput input) {
349         LOG.info("In registerSingletonConstant - input: {}", input);
350
351         if (input.getConstant() == null) {
352             return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
353                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
354         }
355
356         getSingletonConstantRegistration =
357                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
358
359         return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
360     }
361
362     @Override
363     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
364             final RegisterDefaultConstantInput input) {
365         return null;
366     }
367
368     @Override
369     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
370             final UnregisterConstantInput input) {
371         LOG.info("In unregisterConstant");
372
373         if (globalGetConstantRegistration == null) {
374             return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
375                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
376         }
377
378         globalGetConstantRegistration.close();
379         globalGetConstantRegistration = null;
380
381         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
382     }
383
384     @Override
385     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
386             final UnregisterFlappingSingletonInput input) {
387         LOG.info("In unregisterFlappingSingleton");
388
389         if (flappingSingletonService == null) {
390             return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
391                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
392         }
393
394         final long flapCount = flappingSingletonService.setInactive();
395         flappingSingletonService = null;
396
397         return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
398                 .buildFuture();
399     }
400
401     @Override
402     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
403         return null;
404     }
405
406     @Override
407     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
408         LOG.info("In subscribeDdtl");
409
410         if (ddtlReg != null) {
411             return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(ErrorType.RPC,
412                 "data-exists", "There is already a listener registered for id-ints").buildFuture();
413         }
414
415         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
416
417         try {
418             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
419                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
420                             ProduceTransactionsHandler.ID_INT_YID)),
421                     true, Collections.emptyList());
422         } catch (DOMDataTreeLoopException e) {
423             LOG.error("Failed to register DOMDataTreeListener", e);
424             return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(
425                 ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture();
426         }
427
428         return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture();
429     }
430
431     @Override
432     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
433             final RegisterBoundConstantInput input) {
434         LOG.info("In registerBoundConstant - input: {}", input);
435
436         if (input.getContext() == null) {
437             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
438                     ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
439         }
440
441         if (input.getConstant() == null) {
442             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
443                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
444         }
445
446         if (routedRegistrations.containsKey(input.getContext())) {
447             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
448                 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
449         }
450
451         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
452                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
453                         input.getConstant(), input.getContext());
454
455         routedRegistrations.put(input.getContext(), rpcRegistration);
456         return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
457     }
458
459     @Override
460     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
461             final RegisterFlappingSingletonInput input) {
462         LOG.info("In registerFlappingSingleton");
463
464         if (flappingSingletonService != null) {
465             return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
466                 "data-exists", "There is already an rpc registered").buildFuture();
467         }
468
469         flappingSingletonService = new FlappingSingletonService(singletonService);
470
471         return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
472     }
473
474     @Override
475     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
476         LOG.info("In unsubscribeDtcl");
477
478         if (idIntsListener == null || dtclReg == null) {
479             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
480                     ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
481         }
482
483         long timeout = 120L;
484         try {
485             idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
486         } catch (InterruptedException | ExecutionException | TimeoutException e) {
487             LOG.error("Unable to finish notification processing", e);
488             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
489                     "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
490         }
491
492         dtclReg.close();
493         dtclReg = null;
494
495         if (!idIntsListener.hasTriggered()) {
496             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
497                     "id-ints listener has not received any notifications.").buildFuture();
498         }
499
500         try (DOMDataTreeReadTransaction rTx = domDataBroker.newReadOnlyTransaction()) {
501             final Optional<NormalizedNode<?, ?>> readResult = rTx.read(LogicalDatastoreType.CONFIGURATION,
502                 WriteTransactionsHandler.ID_INT_YID).get();
503
504             if (!readResult.isPresent()) {
505                 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
506                         "No data read from id-ints list").buildFuture();
507             }
508
509             final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
510             if (!nodesEqual) {
511                 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
512                         idIntsListener.diffWithLocalCopy(readResult.get()));
513             }
514
515             return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
516                     .buildFuture();
517
518         } catch (final InterruptedException | ExecutionException e) {
519             LOG.error("Final read of id-ints failed", e);
520             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
521                     "Final read of id-ints failed", e).buildFuture();
522         }
523     }
524
525     @Override
526     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
527         LOG.info("In createPrefixShard - input: {}", input);
528
529         return prefixShardHandler.onCreatePrefixShard(input);
530     }
531
532     @Override
533     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
534             final DeconfigureIdIntsShardInput input) {
535         return null;
536     }
537
538     @Override
539     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
540         LOG.info("In unsubscribeYnl - input: {}", input);
541
542         if (!ynlRegistrations.containsKey(input.getId())) {
543             return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
544                 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
545         }
546
547         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
548         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
549
550         reg.close();
551
552         return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
553     }
554
555     @Override
556     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
557             final CheckPublishNotificationsInput input) {
558         LOG.info("In checkPublishNotifications - input: {}", input);
559
560         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
561
562         if (task == null) {
563             return Futures.immediateFuture(RpcResultBuilder.success(
564                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
565         }
566
567         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
568                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
569
570         if (task.getLastError() != null) {
571             LOG.error("Last error for {}", task, task.getLastError());
572             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
573         }
574
575         final CheckPublishNotificationsOutput output =
576                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
577
578         return RpcResultBuilder.success(output).buildFuture();
579     }
580
581     @Override
582     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
583             final ProduceTransactionsInput input) {
584         LOG.info("In produceTransactions - input: {}", input);
585         return ProduceTransactionsHandler.start(domDataTreeService, input);
586     }
587
588     @Override
589     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
590             final ShutdownShardReplicaInput input) {
591         LOG.info("In shutdownShardReplica - input: {}", input);
592
593         final String shardName = input.getShardName();
594         if (Strings.isNullOrEmpty(shardName)) {
595             return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
596                 shardName + "is not a valid shard name").buildFuture();
597         }
598
599         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
600     }
601
602     @Override
603     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
604             final ShutdownPrefixShardReplicaInput input) {
605         LOG.info("shutdownPrefixShardReplica - input: {}", input);
606
607         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
608
609         if (shardPrefix == null) {
610             return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
611                     "A valid shard prefix must be specified").buildFuture();
612         }
613
614         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
615         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
616
617         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
618     }
619
620     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
621         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
622         final ActorUtils context = configDataStore.getActorUtils();
623
624         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
625                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
626         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
627         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
628
629         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
630             @Override
631             public void onComplete(final Throwable throwable, final ActorRef actorRef) {
632                 if (throwable != null) {
633                     shutdownShardAsk.failure(throwable);
634                 } else {
635                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
636                 }
637             }
638         }, context.getClientDispatcher());
639
640         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
641             @Override
642             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
643                 if (throwable != null) {
644                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
645                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
646                     rpcResult.set(failedResult);
647                 } else {
648                     // according to Patterns.gracefulStop API, we don't have to
649                     // check value of gracefulStopResult
650                     rpcResult.set(RpcResultBuilder.success(success).build());
651                 }
652             }
653         }, context.getClientDispatcher());
654         return rpcResult;
655     }
656
657     @Override
658     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
659         LOG.info("In registerConstant - input: {}", input);
660
661         if (input.getConstant() == null) {
662             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
663                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
664         }
665
666         if (globalGetConstantRegistration != null) {
667             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
668                     "data-exists", "There is already an rpc registered").buildFuture();
669         }
670
671         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
672         return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
673     }
674
675     @Override
676     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
677             final UnregisterDefaultConstantInput input) {
678         return null;
679     }
680
681     @Override
682     @SuppressWarnings("checkstyle:IllegalCatch")
683     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
684         LOG.info("In unsubscribeDdtl");
685
686         if (idIntsDdtl == null || ddtlReg == null) {
687             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
688                     ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
689         }
690
691         long timeout = 120L;
692         try {
693             idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
694         } catch (InterruptedException | ExecutionException | TimeoutException e) {
695             LOG.error("Unable to finish notification processing", e);
696             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
697                     "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
698         }
699
700         ddtlReg.close();
701         ddtlReg = null;
702
703         if (!idIntsDdtl.hasTriggered()) {
704             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
705                     "No notification received.", "id-ints listener has not received any notifications").buildFuture();
706         }
707
708         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
709         LOG.debug("Creating distributed datastore client for shard {}", shardName);
710
711         final ActorUtils actorUtils = configDataStore.getActorUtils();
712         final Props distributedDataStoreClientProps =
713                 SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(),
714                         "Shard-" + shardName, actorUtils, shardName);
715
716         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
717         final DataStoreClient distributedDataStoreClient;
718         try {
719             distributedDataStoreClient = SimpleDataStoreClientActor
720                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
721         } catch (RuntimeException e) {
722             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
723             clientActor.tell(PoisonPill.getInstance(), noSender());
724             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
725                     .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
726         }
727
728         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
729         final ClientTransaction tx = localHistory.createTransaction();
730         final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
731                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
732
733         tx.abort();
734         localHistory.close();
735         try {
736             final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
737             if (!optional.isPresent()) {
738                 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
739                         "data-missing", "Final read from id-ints is empty").buildFuture();
740             }
741
742             return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
743                     idIntsDdtl.checkEqual(optional.get()))).buildFuture();
744
745         } catch (InterruptedException | ExecutionException e) {
746             LOG.error("Unable to read data to verify ddtl data", e);
747             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
748                     .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
749         } finally {
750             distributedDataStoreClient.close();
751             clientActor.tell(PoisonPill.getInstance(), noSender());
752         }
753     }
754 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.