0fb09d8231903bbc9b530f488039adc6b8672b90
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Futures;
19 import akka.dispatch.Mapper;
20 import akka.pattern.AskTimeoutException;
21 import akka.util.Timeout;
22 import com.codahale.metrics.JmxReporter;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Strings;
29 import com.google.common.cache.Cache;
30 import com.google.common.cache.CacheBuilder;
31 import com.google.common.util.concurrent.RateLimiter;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
42 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
49 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
50 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.Await;
54 import scala.concurrent.ExecutionContext;
55 import scala.concurrent.Future;
56 import scala.concurrent.duration.Duration;
57 import scala.concurrent.duration.FiniteDuration;
58
59 /**
60  * The ActorContext class contains utility methods which could be used by
61  * non-actors (like DistributedDataStore) to work with actors a little more
62  * easily. An ActorContext can be freely passed around to local object instances
63  * but should not be passed to actors especially remote actors
64  */
65 public class ActorContext {
66     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
67     private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
68     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
69     private static final String METRIC_RATE = "rate";
70     private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
71     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
72                                                               new Mapper<Throwable, Throwable>() {
73         @Override
74         public Throwable apply(Throwable failure) {
75             Throwable actualFailure = failure;
76             if(failure instanceof AskTimeoutException) {
77                 // A timeout exception most likely means the shard isn't initialized.
78                 actualFailure = new NotInitializedException(
79                         "Timed out trying to find the primary shard. Most likely cause is the " +
80                         "shard is not initialized yet.");
81             }
82
83             return actualFailure;
84         }
85     };
86     public static final String MAILBOX = "bounded-mailbox";
87
88     private final ActorSystem actorSystem;
89     private final ActorRef shardManager;
90     private final ClusterWrapper clusterWrapper;
91     private final Configuration configuration;
92     private DatastoreContext datastoreContext;
93     private FiniteDuration operationDuration;
94     private Timeout operationTimeout;
95     private final String selfAddressHostPort;
96     private RateLimiter txRateLimiter;
97     private final MetricRegistry metricRegistry = new MetricRegistry();
98     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
99     private final int transactionOutstandingOperationLimit;
100     private Timeout transactionCommitOperationTimeout;
101     private final Dispatchers dispatchers;
102     private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
103
104     private volatile SchemaContext schemaContext;
105     private volatile boolean updated;
106
107     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
108             ClusterWrapper clusterWrapper, Configuration configuration) {
109         this(actorSystem, shardManager, clusterWrapper, configuration,
110                 DatastoreContext.newBuilder().build());
111     }
112
113     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
114             ClusterWrapper clusterWrapper, Configuration configuration,
115             DatastoreContext datastoreContext) {
116         this.actorSystem = actorSystem;
117         this.shardManager = shardManager;
118         this.clusterWrapper = clusterWrapper;
119         this.configuration = configuration;
120         this.datastoreContext = datastoreContext;
121         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
122
123         setCachedProperties();
124         primaryShardActorSelectionCache = CacheBuilder.newBuilder()
125                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
126                 .build();
127
128         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
129         operationTimeout = new Timeout(operationDuration);
130         transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
131                 TimeUnit.SECONDS));
132
133         Address selfAddress = clusterWrapper.getSelfAddress();
134         if (selfAddress != null && !selfAddress.host().isEmpty()) {
135             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
136         } else {
137             selfAddressHostPort = null;
138         }
139
140         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
141         jmxReporter.start();
142
143     }
144
145     private void setCachedProperties() {
146         txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
147
148         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
149         operationTimeout = new Timeout(operationDuration);
150
151         transactionCommitOperationTimeout =  new Timeout(Duration.create(
152                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
153     }
154
155     public DatastoreContext getDatastoreContext() {
156         return datastoreContext;
157     }
158
159     public ActorSystem getActorSystem() {
160         return actorSystem;
161     }
162
163     public ActorRef getShardManager() {
164         return shardManager;
165     }
166
167     public ActorSelection actorSelection(String actorPath) {
168         return actorSystem.actorSelection(actorPath);
169     }
170
171     public ActorSelection actorSelection(ActorPath actorPath) {
172         return actorSystem.actorSelection(actorPath);
173     }
174
175     public void setSchemaContext(SchemaContext schemaContext) {
176         this.schemaContext = schemaContext;
177
178         if(shardManager != null) {
179             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
180         }
181     }
182
183     public void setDatastoreContext(DatastoreContext context) {
184         this.datastoreContext = context;
185         setCachedProperties();
186
187         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
188         // will be published immediately even though they may not be immediately visible to other
189         // threads due to unsynchronized reads. That's OK though - we're going for eventual
190         // consistency here as immediately visible updates to these members aren't critical. These
191         // members could've been made volatile but wanted to avoid volatile reads as these are
192         // accessed often and updates will be infrequent.
193
194         updated = true;
195
196         if(shardManager != null) {
197             shardManager.tell(context, ActorRef.noSender());
198         }
199     }
200
201     public SchemaContext getSchemaContext() {
202         return schemaContext;
203     }
204
205     /**
206      * Finds the primary shard for the given shard name
207      *
208      * @param shardName
209      * @return
210      */
211     public Optional<ActorSelection> findPrimaryShard(String shardName) {
212         String path = findPrimaryPathOrNull(shardName);
213         if (path == null){
214             return Optional.absent();
215         }
216         return Optional.of(actorSystem.actorSelection(path));
217     }
218
219     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
220         Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
221         if(ret != null){
222             return ret;
223         }
224         Future<Object> future = executeOperationAsync(shardManager,
225                 new FindPrimary(shardName, true).toSerializable(),
226                 datastoreContext.getShardInitializationTimeout());
227
228         return future.transform(new Mapper<Object, ActorSelection>() {
229             @Override
230             public ActorSelection checkedApply(Object response) throws Exception {
231                 if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
232                     PrimaryFound found = PrimaryFound.fromSerializable(response);
233
234                     LOG.debug("Primary found {}", found.getPrimaryPath());
235                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
236                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
237                     return actorSelection;
238                 } else if(response instanceof ActorNotInitialized) {
239                     throw new NotInitializedException(
240                             String.format("Found primary shard %s but it's not initialized yet. " +
241                                           "Please try again later", shardName));
242                 } else if(response instanceof PrimaryNotFound) {
243                     throw new PrimaryNotFoundException(
244                             String.format("No primary shard found for %S.", shardName));
245                 }
246
247                 throw new UnknownMessageException(String.format(
248                         "FindPrimary returned unkown response: %s", response));
249             }
250         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
251     }
252
253     /**
254      * Finds a local shard given its shard name and return it's ActorRef
255      *
256      * @param shardName the name of the local shard that needs to be found
257      * @return a reference to a local shard actor which represents the shard
258      *         specified by the shardName
259      */
260     public Optional<ActorRef> findLocalShard(String shardName) {
261         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
262
263         if (result instanceof LocalShardFound) {
264             LocalShardFound found = (LocalShardFound) result;
265             LOG.debug("Local shard found {}", found.getPath());
266             return Optional.of(found.getPath());
267         }
268
269         return Optional.absent();
270     }
271
272     /**
273      * Finds a local shard async given its shard name and return a Future from which to obtain the
274      * ActorRef.
275      *
276      * @param shardName the name of the local shard that needs to be found
277      */
278     public Future<ActorRef> findLocalShardAsync( final String shardName) {
279         Future<Object> future = executeOperationAsync(shardManager,
280                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
281
282         return future.map(new Mapper<Object, ActorRef>() {
283             @Override
284             public ActorRef checkedApply(Object response) throws Throwable {
285                 if(response instanceof LocalShardFound) {
286                     LocalShardFound found = (LocalShardFound)response;
287                     LOG.debug("Local shard found {}", found.getPath());
288                     return found.getPath();
289                 } else if(response instanceof ActorNotInitialized) {
290                     throw new NotInitializedException(
291                             String.format("Found local shard for %s but it's not initialized yet.",
292                                     shardName));
293                 } else if(response instanceof LocalShardNotFound) {
294                     throw new LocalShardNotFoundException(
295                             String.format("Local shard for %s does not exist.", shardName));
296                 }
297
298                 throw new UnknownMessageException(String.format(
299                         "FindLocalShard returned unkown response: %s", response));
300             }
301         }, getClientDispatcher());
302     }
303
304     private String findPrimaryPathOrNull(String shardName) {
305         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
306
307         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
308             PrimaryFound found = PrimaryFound.fromSerializable(result);
309
310             LOG.debug("Primary found {}", found.getPrimaryPath());
311             return found.getPrimaryPath();
312
313         } else if (result.getClass().equals(ActorNotInitialized.class)){
314             throw new NotInitializedException(
315                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
316             );
317
318         } else {
319             return null;
320         }
321     }
322
323
324     /**
325      * Executes an operation on a local actor and wait for it's response
326      *
327      * @param actor
328      * @param message
329      * @return The response of the operation
330      */
331     public Object executeOperation(ActorRef actor, Object message) {
332         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
333
334         try {
335             return Await.result(future, operationDuration);
336         } catch (Exception e) {
337             throw new TimeoutException("Sending message " + message.getClass().toString() +
338                     " to actor " + actor.toString() + " failed. Try again later.", e);
339         }
340     }
341
342     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
343         Preconditions.checkArgument(actor != null, "actor must not be null");
344         Preconditions.checkArgument(message != null, "message must not be null");
345
346         LOG.debug("Sending message {} to {}", message.getClass(), actor);
347         return doAsk(actor, message, timeout);
348     }
349
350     /**
351      * Execute an operation on a remote actor and wait for it's response
352      *
353      * @param actor
354      * @param message
355      * @return
356      */
357     public Object executeOperation(ActorSelection actor, Object message) {
358         Future<Object> future = executeOperationAsync(actor, message);
359
360         try {
361             return Await.result(future, operationDuration);
362         } catch (Exception e) {
363             throw new TimeoutException("Sending message " + message.getClass().toString() +
364                     " to actor " + actor.toString() + " failed. Try again later.", e);
365         }
366     }
367
368     /**
369      * Execute an operation on a remote actor asynchronously.
370      *
371      * @param actor the ActorSelection
372      * @param message the message to send
373      * @param timeout the operation timeout
374      * @return a Future containing the eventual result
375      */
376     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
377             Timeout timeout) {
378         Preconditions.checkArgument(actor != null, "actor must not be null");
379         Preconditions.checkArgument(message != null, "message must not be null");
380
381         LOG.debug("Sending message {} to {}", message.getClass(), actor);
382
383         return doAsk(actor, message, timeout);
384     }
385
386     /**
387      * Execute an operation on a remote actor asynchronously.
388      *
389      * @param actor the ActorSelection
390      * @param message the message to send
391      * @return a Future containing the eventual result
392      */
393     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
394         return executeOperationAsync(actor, message, operationTimeout);
395     }
396
397     /**
398      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
399      * reply (essentially set and forget).
400      *
401      * @param actor the ActorSelection
402      * @param message the message to send
403      */
404     public void sendOperationAsync(ActorSelection actor, Object message) {
405         Preconditions.checkArgument(actor != null, "actor must not be null");
406         Preconditions.checkArgument(message != null, "message must not be null");
407
408         LOG.debug("Sending message {} to {}", message.getClass(), actor);
409
410         actor.tell(message, ActorRef.noSender());
411     }
412
413     public void shutdown() {
414         shardManager.tell(PoisonPill.getInstance(), null);
415         actorSystem.shutdown();
416     }
417
418     public ClusterWrapper getClusterWrapper() {
419         return clusterWrapper;
420     }
421
422     public String getCurrentMemberName(){
423         return clusterWrapper.getCurrentMemberName();
424     }
425
426     /**
427      * Send the message to each and every shard
428      *
429      * @param message
430      */
431     public void broadcast(Object message){
432         for(String shardName : configuration.getAllShardNames()){
433
434             Optional<ActorSelection> primary = findPrimaryShard(shardName);
435             if (primary.isPresent()) {
436                 primary.get().tell(message, ActorRef.noSender());
437             } else {
438                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
439                         message.getClass().getSimpleName(), shardName);
440             }
441         }
442     }
443
444     public FiniteDuration getOperationDuration() {
445         return operationDuration;
446     }
447
448     public boolean isPathLocal(String path) {
449         if (Strings.isNullOrEmpty(path)) {
450             return false;
451         }
452
453         int pathAtIndex = path.indexOf('@');
454         if (pathAtIndex == -1) {
455             //if the path is of local format, then its local and is co-located
456             return true;
457
458         } else if (selfAddressHostPort != null) {
459             // self-address and tx actor path, both are of remote path format
460             int slashIndex = path.indexOf('/', pathAtIndex);
461
462             if (slashIndex == -1) {
463                 return false;
464             }
465
466             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
467             return hostPort.equals(selfAddressHostPort);
468
469         } else {
470             // self address is local format and tx actor path is remote format
471             return false;
472         }
473     }
474
475     /**
476      * @deprecated This method is present only to support backward compatibility with Helium and should not be
477      * used any further
478      *
479      *
480      * @param primaryPath
481      * @param localPathOfRemoteActor
482      * @return
483     */
484     @Deprecated
485     public String resolvePath(final String primaryPath,
486                                             final String localPathOfRemoteActor) {
487         StringBuilder builder = new StringBuilder();
488         String[] primaryPathElements = primaryPath.split("/");
489         builder.append(primaryPathElements[0]).append("//")
490             .append(primaryPathElements[1]).append(primaryPathElements[2]);
491         String[] remotePathElements = localPathOfRemoteActor.split("/");
492         for (int i = 3; i < remotePathElements.length; i++) {
493                 builder.append("/").append(remotePathElements[i]);
494             }
495
496         return builder.toString();
497     }
498
499     /**
500      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
501      * should begin throttling the operations
502      *
503      * Parking reading this configuration here because we need to get to the actor system settings
504      *
505      * @return
506      */
507     public int getTransactionOutstandingOperationLimit(){
508         return transactionOutstandingOperationLimit;
509     }
510
511     /**
512      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
513      * us to create a timer for pretty much anything.
514      *
515      * @param operationName
516      * @return
517      */
518     public Timer getOperationTimer(String operationName){
519         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
520         return metricRegistry.timer(rate);
521     }
522
523     /**
524      * Get the type of the data store to which this ActorContext belongs
525      *
526      * @return
527      */
528     public String getDataStoreType() {
529         return datastoreContext.getDataStoreType();
530     }
531
532     /**
533      * Set the number of transaction creation permits that are to be allowed
534      *
535      * @param permitsPerSecond
536      */
537     public void setTxCreationLimit(double permitsPerSecond){
538         txRateLimiter.setRate(permitsPerSecond);
539     }
540
541     /**
542      * Get the current transaction creation rate limit
543      * @return
544      */
545     public double getTxCreationLimit(){
546         return txRateLimiter.getRate();
547     }
548
549     /**
550      * Try to acquire a transaction creation permit. Will block if no permits are available.
551      */
552     public void acquireTxCreationPermit(){
553         txRateLimiter.acquire();
554     }
555
556     /**
557      * Return the operation timeout to be used when committing transactions
558      * @return
559      */
560     public Timeout getTransactionCommitOperationTimeout(){
561         return transactionCommitOperationTimeout;
562     }
563
564     /**
565      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
566      * code on the datastore
567      * @return
568      */
569     public ExecutionContext getClientDispatcher() {
570         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
571     }
572
573     public String getNotificationDispatcherPath(){
574         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
575     }
576
577     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
578         return ask(actorRef, message, timeout);
579     }
580
581     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
582         return ask(actorRef, message, timeout);
583     }
584
585     @VisibleForTesting
586     Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
587         return primaryShardActorSelectionCache;
588     }
589 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.