Merge "BUG 2412 - restconf RestconfDocumentedExceptionMapper class migration"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.pattern.AskTimeoutException;
20 import akka.util.Timeout;
21 import com.codahale.metrics.JmxReporter;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import com.google.common.util.concurrent.RateLimiter;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.Configuration;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
38 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.ExecutionContext;
51 import scala.concurrent.Future;
52 import scala.concurrent.duration.Duration;
53 import scala.concurrent.duration.FiniteDuration;
54
55 /**
56  * The ActorContext class contains utility methods which could be used by
57  * non-actors (like DistributedDataStore) to work with actors a little more
58  * easily. An ActorContext can be freely passed around to local object instances
59  * but should not be passed to actors especially remote actors
60  */
61 public class ActorContext {
62     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
63     private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
64     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
65     private static final String METRIC_RATE = "rate";
66     private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
67     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
68                                                               new Mapper<Throwable, Throwable>() {
69         @Override
70         public Throwable apply(Throwable failure) {
71             Throwable actualFailure = failure;
72             if(failure instanceof AskTimeoutException) {
73                 // A timeout exception most likely means the shard isn't initialized.
74                 actualFailure = new NotInitializedException(
75                         "Timed out trying to find the primary shard. Most likely cause is the " +
76                         "shard is not initialized yet.");
77             }
78
79             return actualFailure;
80         }
81     };
82     public static final String MAILBOX = "bounded-mailbox";
83
84     private final ActorSystem actorSystem;
85     private final ActorRef shardManager;
86     private final ClusterWrapper clusterWrapper;
87     private final Configuration configuration;
88     private DatastoreContext datastoreContext;
89     private FiniteDuration operationDuration;
90     private Timeout operationTimeout;
91     private final String selfAddressHostPort;
92     private RateLimiter txRateLimiter;
93     private final MetricRegistry metricRegistry = new MetricRegistry();
94     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
95     private final int transactionOutstandingOperationLimit;
96     private Timeout transactionCommitOperationTimeout;
97     private final Dispatchers dispatchers;
98
99     private volatile SchemaContext schemaContext;
100     private volatile boolean updated;
101
102     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
103             ClusterWrapper clusterWrapper, Configuration configuration) {
104         this(actorSystem, shardManager, clusterWrapper, configuration,
105                 DatastoreContext.newBuilder().build());
106     }
107
108     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
109             ClusterWrapper clusterWrapper, Configuration configuration,
110             DatastoreContext datastoreContext) {
111         this.actorSystem = actorSystem;
112         this.shardManager = shardManager;
113         this.clusterWrapper = clusterWrapper;
114         this.configuration = configuration;
115         this.datastoreContext = datastoreContext;
116         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
117
118         setCachedProperties();
119
120         Address selfAddress = clusterWrapper.getSelfAddress();
121         if (selfAddress != null && !selfAddress.host().isEmpty()) {
122             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
123         } else {
124             selfAddressHostPort = null;
125         }
126
127         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
128         jmxReporter.start();
129
130     }
131
132     private void setCachedProperties() {
133         txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
134
135         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
136         operationTimeout = new Timeout(operationDuration);
137
138         transactionCommitOperationTimeout =  new Timeout(Duration.create(
139                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
140     }
141
142     public DatastoreContext getDatastoreContext() {
143         return datastoreContext;
144     }
145
146     public ActorSystem getActorSystem() {
147         return actorSystem;
148     }
149
150     public ActorRef getShardManager() {
151         return shardManager;
152     }
153
154     public ActorSelection actorSelection(String actorPath) {
155         return actorSystem.actorSelection(actorPath);
156     }
157
158     public ActorSelection actorSelection(ActorPath actorPath) {
159         return actorSystem.actorSelection(actorPath);
160     }
161
162     public void setSchemaContext(SchemaContext schemaContext) {
163         this.schemaContext = schemaContext;
164
165         if(shardManager != null) {
166             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
167         }
168     }
169
170     public void setDatastoreContext(DatastoreContext context) {
171         this.datastoreContext = context;
172         setCachedProperties();
173
174         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
175         // will be published immediately even though they may not be immediately visible to other
176         // threads due to unsynchronized reads. That's OK though - we're going for eventual
177         // consistency here as immediately visible updates to these members aren't critical. These
178         // members could've been made volatile but wanted to avoid volatile reads as these are
179         // accessed often and updates will be infrequent.
180
181         updated = true;
182
183         if(shardManager != null) {
184             shardManager.tell(context, ActorRef.noSender());
185         }
186     }
187
188     public SchemaContext getSchemaContext() {
189         return schemaContext;
190     }
191
192     /**
193      * Finds the primary shard for the given shard name
194      *
195      * @param shardName
196      * @return
197      */
198     public Optional<ActorSelection> findPrimaryShard(String shardName) {
199         String path = findPrimaryPathOrNull(shardName);
200         if (path == null){
201             return Optional.absent();
202         }
203         return Optional.of(actorSystem.actorSelection(path));
204     }
205
206     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
207         Future<Object> future = executeOperationAsync(shardManager,
208                 new FindPrimary(shardName, true).toSerializable(),
209                 datastoreContext.getShardInitializationTimeout());
210
211         return future.transform(new Mapper<Object, ActorSelection>() {
212             @Override
213             public ActorSelection checkedApply(Object response) throws Exception {
214                 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
215                     PrimaryFound found = PrimaryFound.fromSerializable(response);
216
217                     LOG.debug("Primary found {}", found.getPrimaryPath());
218                     return actorSystem.actorSelection(found.getPrimaryPath());
219                 } else if(response instanceof ActorNotInitialized) {
220                     throw new NotInitializedException(
221                             String.format("Found primary shard %s but it's not initialized yet. " +
222                                           "Please try again later", shardName));
223                 } else if(response instanceof PrimaryNotFound) {
224                     throw new PrimaryNotFoundException(
225                             String.format("No primary shard found for %S.", shardName));
226                 }
227
228                 throw new UnknownMessageException(String.format(
229                         "FindPrimary returned unkown response: %s", response));
230             }
231         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
232     }
233
234     /**
235      * Finds a local shard given its shard name and return it's ActorRef
236      *
237      * @param shardName the name of the local shard that needs to be found
238      * @return a reference to a local shard actor which represents the shard
239      *         specified by the shardName
240      */
241     public Optional<ActorRef> findLocalShard(String shardName) {
242         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
243
244         if (result instanceof LocalShardFound) {
245             LocalShardFound found = (LocalShardFound) result;
246             LOG.debug("Local shard found {}", found.getPath());
247             return Optional.of(found.getPath());
248         }
249
250         return Optional.absent();
251     }
252
253     /**
254      * Finds a local shard async given its shard name and return a Future from which to obtain the
255      * ActorRef.
256      *
257      * @param shardName the name of the local shard that needs to be found
258      */
259     public Future<ActorRef> findLocalShardAsync( final String shardName) {
260         Future<Object> future = executeOperationAsync(shardManager,
261                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
262
263         return future.map(new Mapper<Object, ActorRef>() {
264             @Override
265             public ActorRef checkedApply(Object response) throws Throwable {
266                 if(response instanceof LocalShardFound) {
267                     LocalShardFound found = (LocalShardFound)response;
268                     LOG.debug("Local shard found {}", found.getPath());
269                     return found.getPath();
270                 } else if(response instanceof ActorNotInitialized) {
271                     throw new NotInitializedException(
272                             String.format("Found local shard for %s but it's not initialized yet.",
273                                     shardName));
274                 } else if(response instanceof LocalShardNotFound) {
275                     throw new LocalShardNotFoundException(
276                             String.format("Local shard for %s does not exist.", shardName));
277                 }
278
279                 throw new UnknownMessageException(String.format(
280                         "FindLocalShard returned unkown response: %s", response));
281             }
282         }, getClientDispatcher());
283     }
284
285     private String findPrimaryPathOrNull(String shardName) {
286         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
287
288         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
289             PrimaryFound found = PrimaryFound.fromSerializable(result);
290
291             LOG.debug("Primary found {}", found.getPrimaryPath());
292             return found.getPrimaryPath();
293
294         } else if (result.getClass().equals(ActorNotInitialized.class)){
295             throw new NotInitializedException(
296                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
297             );
298
299         } else {
300             return null;
301         }
302     }
303
304
305     /**
306      * Executes an operation on a local actor and wait for it's response
307      *
308      * @param actor
309      * @param message
310      * @return The response of the operation
311      */
312     public Object executeOperation(ActorRef actor, Object message) {
313         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
314
315         try {
316             return Await.result(future, operationDuration);
317         } catch (Exception e) {
318             throw new TimeoutException("Sending message " + message.getClass().toString() +
319                     " to actor " + actor.toString() + " failed. Try again later.", e);
320         }
321     }
322
323     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
324         Preconditions.checkArgument(actor != null, "actor must not be null");
325         Preconditions.checkArgument(message != null, "message must not be null");
326
327         LOG.debug("Sending message {} to {}", message.getClass(), actor);
328         return ask(actor, message, timeout);
329     }
330
331     /**
332      * Execute an operation on a remote actor and wait for it's response
333      *
334      * @param actor
335      * @param message
336      * @return
337      */
338     public Object executeOperation(ActorSelection actor, Object message) {
339         Future<Object> future = executeOperationAsync(actor, message);
340
341         try {
342             return Await.result(future, operationDuration);
343         } catch (Exception e) {
344             throw new TimeoutException("Sending message " + message.getClass().toString() +
345                     " to actor " + actor.toString() + " failed. Try again later.", e);
346         }
347     }
348
349     /**
350      * Execute an operation on a remote actor asynchronously.
351      *
352      * @param actor the ActorSelection
353      * @param message the message to send
354      * @param timeout the operation timeout
355      * @return a Future containing the eventual result
356      */
357     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
358             Timeout timeout) {
359         Preconditions.checkArgument(actor != null, "actor must not be null");
360         Preconditions.checkArgument(message != null, "message must not be null");
361
362         LOG.debug("Sending message {} to {}", message.getClass(), actor);
363
364         return ask(actor, message, timeout);
365     }
366
367     /**
368      * Execute an operation on a remote actor asynchronously.
369      *
370      * @param actor the ActorSelection
371      * @param message the message to send
372      * @return a Future containing the eventual result
373      */
374     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
375         return executeOperationAsync(actor, message, operationTimeout);
376     }
377
378     /**
379      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
380      * reply (essentially set and forget).
381      *
382      * @param actor the ActorSelection
383      * @param message the message to send
384      */
385     public void sendOperationAsync(ActorSelection actor, Object message) {
386         Preconditions.checkArgument(actor != null, "actor must not be null");
387         Preconditions.checkArgument(message != null, "message must not be null");
388
389         LOG.debug("Sending message {} to {}", message.getClass(), actor);
390
391         actor.tell(message, ActorRef.noSender());
392     }
393
394     public void shutdown() {
395         shardManager.tell(PoisonPill.getInstance(), null);
396         actorSystem.shutdown();
397     }
398
399     public ClusterWrapper getClusterWrapper() {
400         return clusterWrapper;
401     }
402
403     public String getCurrentMemberName(){
404         return clusterWrapper.getCurrentMemberName();
405     }
406
407     /**
408      * Send the message to each and every shard
409      *
410      * @param message
411      */
412     public void broadcast(Object message){
413         for(String shardName : configuration.getAllShardNames()){
414
415             Optional<ActorSelection> primary = findPrimaryShard(shardName);
416             if (primary.isPresent()) {
417                 primary.get().tell(message, ActorRef.noSender());
418             } else {
419                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
420                         message.getClass().getSimpleName(), shardName);
421             }
422         }
423     }
424
425     public FiniteDuration getOperationDuration() {
426         return operationDuration;
427     }
428
429     public boolean isPathLocal(String path) {
430         if (Strings.isNullOrEmpty(path)) {
431             return false;
432         }
433
434         int pathAtIndex = path.indexOf('@');
435         if (pathAtIndex == -1) {
436             //if the path is of local format, then its local and is co-located
437             return true;
438
439         } else if (selfAddressHostPort != null) {
440             // self-address and tx actor path, both are of remote path format
441             int slashIndex = path.indexOf('/', pathAtIndex);
442
443             if (slashIndex == -1) {
444                 return false;
445             }
446
447             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
448             return hostPort.equals(selfAddressHostPort);
449
450         } else {
451             // self address is local format and tx actor path is remote format
452             return false;
453         }
454     }
455
456     /**
457      * @deprecated This method is present only to support backward compatibility with Helium and should not be
458      * used any further
459      *
460      *
461      * @param primaryPath
462      * @param localPathOfRemoteActor
463      * @return
464     */
465     @Deprecated
466     public String resolvePath(final String primaryPath,
467                                             final String localPathOfRemoteActor) {
468         StringBuilder builder = new StringBuilder();
469         String[] primaryPathElements = primaryPath.split("/");
470         builder.append(primaryPathElements[0]).append("//")
471             .append(primaryPathElements[1]).append(primaryPathElements[2]);
472         String[] remotePathElements = localPathOfRemoteActor.split("/");
473         for (int i = 3; i < remotePathElements.length; i++) {
474                 builder.append("/").append(remotePathElements[i]);
475             }
476
477         return builder.toString();
478     }
479
480     /**
481      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
482      * should begin throttling the operations
483      *
484      * Parking reading this configuration here because we need to get to the actor system settings
485      *
486      * @return
487      */
488     public int getTransactionOutstandingOperationLimit(){
489         return transactionOutstandingOperationLimit;
490     }
491
492     /**
493      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
494      * us to create a timer for pretty much anything.
495      *
496      * @param operationName
497      * @return
498      */
499     public Timer getOperationTimer(String operationName){
500         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
501         return metricRegistry.timer(rate);
502     }
503
504     /**
505      * Get the type of the data store to which this ActorContext belongs
506      *
507      * @return
508      */
509     public String getDataStoreType() {
510         return datastoreContext.getDataStoreType();
511     }
512
513     /**
514      * Set the number of transaction creation permits that are to be allowed
515      *
516      * @param permitsPerSecond
517      */
518     public void setTxCreationLimit(double permitsPerSecond){
519         txRateLimiter.setRate(permitsPerSecond);
520     }
521
522     /**
523      * Get the current transaction creation rate limit
524      * @return
525      */
526     public double getTxCreationLimit(){
527         return txRateLimiter.getRate();
528     }
529
530     /**
531      * Try to acquire a transaction creation permit. Will block if no permits are available.
532      */
533     public void acquireTxCreationPermit(){
534         txRateLimiter.acquire();
535     }
536
537     /**
538      * Return the operation timeout to be used when committing transactions
539      * @return
540      */
541     public Timeout getTransactionCommitOperationTimeout(){
542         return transactionCommitOperationTimeout;
543     }
544
545     /**
546      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
547      * code on the datastore
548      * @return
549      */
550     public ExecutionContext getClientDispatcher() {
551         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
552     }
553
554     public String getNotificationDispatcherPath(){
555         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
556     }
557
558 }