Merge "Create transaction on the backend datastore only when neccessary"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Futures;
19 import akka.dispatch.Mapper;
20 import akka.dispatch.OnComplete;
21 import akka.pattern.AskTimeoutException;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Strings;
29 import com.google.common.cache.Cache;
30 import com.google.common.cache.CacheBuilder;
31 import com.google.common.util.concurrent.RateLimiter;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
49 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
50 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import scala.concurrent.Await;
56 import scala.concurrent.ExecutionContext;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
60
61 /**
62  * The ActorContext class contains utility methods which could be used by
63  * non-actors (like DistributedDataStore) to work with actors a little more
64  * easily. An ActorContext can be freely passed around to local object instances
65  * but should not be passed to actors especially remote actors
66  */
67 public class ActorContext {
68     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
69     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
70     private static final String METRIC_RATE = "rate";
71     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
72                                                               new Mapper<Throwable, Throwable>() {
73         @Override
74         public Throwable apply(Throwable failure) {
75             Throwable actualFailure = failure;
76             if(failure instanceof AskTimeoutException) {
77                 // A timeout exception most likely means the shard isn't initialized.
78                 actualFailure = new NotInitializedException(
79                         "Timed out trying to find the primary shard. Most likely cause is the " +
80                         "shard is not initialized yet.");
81             }
82
83             return actualFailure;
84         }
85     };
86     public static final String MAILBOX = "bounded-mailbox";
87
88     private final ActorSystem actorSystem;
89     private final ActorRef shardManager;
90     private final ClusterWrapper clusterWrapper;
91     private final Configuration configuration;
92     private DatastoreContext datastoreContext;
93     private FiniteDuration operationDuration;
94     private Timeout operationTimeout;
95     private final String selfAddressHostPort;
96     private RateLimiter txRateLimiter;
97     private final int transactionOutstandingOperationLimit;
98     private Timeout transactionCommitOperationTimeout;
99     private Timeout shardInitializationTimeout;
100     private final Dispatchers dispatchers;
101     private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
102
103     private volatile SchemaContext schemaContext;
104     private volatile boolean updated;
105     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
106
107     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
108             ClusterWrapper clusterWrapper, Configuration configuration) {
109         this(actorSystem, shardManager, clusterWrapper, configuration,
110                 DatastoreContext.newBuilder().build());
111     }
112
113     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
114             ClusterWrapper clusterWrapper, Configuration configuration,
115             DatastoreContext datastoreContext) {
116         this.actorSystem = actorSystem;
117         this.shardManager = shardManager;
118         this.clusterWrapper = clusterWrapper;
119         this.configuration = configuration;
120         this.datastoreContext = datastoreContext;
121         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
122
123         setCachedProperties();
124
125         Address selfAddress = clusterWrapper.getSelfAddress();
126         if (selfAddress != null && !selfAddress.host().isEmpty()) {
127             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
128         } else {
129             selfAddressHostPort = null;
130         }
131
132         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
133     }
134
135     private void setCachedProperties() {
136         txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
137
138         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
139         operationTimeout = new Timeout(operationDuration);
140
141         transactionCommitOperationTimeout =  new Timeout(Duration.create(
142                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
143
144         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
145
146         primaryShardInfoCache = CacheBuilder.newBuilder()
147                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
148                 .build();
149     }
150
151     public DatastoreContext getDatastoreContext() {
152         return datastoreContext;
153     }
154
155     public ActorSystem getActorSystem() {
156         return actorSystem;
157     }
158
159     public ActorRef getShardManager() {
160         return shardManager;
161     }
162
163     public ActorSelection actorSelection(String actorPath) {
164         return actorSystem.actorSelection(actorPath);
165     }
166
167     public ActorSelection actorSelection(ActorPath actorPath) {
168         return actorSystem.actorSelection(actorPath);
169     }
170
171     public void setSchemaContext(SchemaContext schemaContext) {
172         this.schemaContext = schemaContext;
173
174         if(shardManager != null) {
175             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
176         }
177     }
178
179     public void setDatastoreContext(DatastoreContext context) {
180         this.datastoreContext = context;
181         setCachedProperties();
182
183         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
184         // will be published immediately even though they may not be immediately visible to other
185         // threads due to unsynchronized reads. That's OK though - we're going for eventual
186         // consistency here as immediately visible updates to these members aren't critical. These
187         // members could've been made volatile but wanted to avoid volatile reads as these are
188         // accessed often and updates will be infrequent.
189
190         updated = true;
191
192         if(shardManager != null) {
193             shardManager.tell(context, ActorRef.noSender());
194         }
195     }
196
197     public SchemaContext getSchemaContext() {
198         return schemaContext;
199     }
200
201     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
202         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
203         if(ret != null){
204             return ret;
205         }
206         Future<Object> future = executeOperationAsync(shardManager,
207                 new FindPrimary(shardName, true), shardInitializationTimeout);
208
209         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
210             @Override
211             public PrimaryShardInfo checkedApply(Object response) throws Exception {
212                 if(response instanceof PrimaryFound) {
213                     PrimaryFound found = (PrimaryFound)response;
214
215                     LOG.debug("Primary found {}", found.getPrimaryPath());
216                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
217                     PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
218                     primaryShardInfoCache.put(shardName, Futures.successful(info));
219                     return info;
220                 } else if(response instanceof NotInitializedException) {
221                     throw (NotInitializedException)response;
222                 } else if(response instanceof PrimaryNotFoundException) {
223                     throw (PrimaryNotFoundException)response;
224                 } else if(response instanceof NoShardLeaderException) {
225                     throw (NoShardLeaderException)response;
226                 }
227
228                 throw new UnknownMessageException(String.format(
229                         "FindPrimary returned unkown response: %s", response));
230             }
231         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
232     }
233
234     /**
235      * Finds a local shard given its shard name and return it's ActorRef
236      *
237      * @param shardName the name of the local shard that needs to be found
238      * @return a reference to a local shard actor which represents the shard
239      *         specified by the shardName
240      */
241     public Optional<ActorRef> findLocalShard(String shardName) {
242         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
243
244         if (result instanceof LocalShardFound) {
245             LocalShardFound found = (LocalShardFound) result;
246             LOG.debug("Local shard found {}", found.getPath());
247             return Optional.of(found.getPath());
248         }
249
250         return Optional.absent();
251     }
252
253     /**
254      * Finds a local shard async given its shard name and return a Future from which to obtain the
255      * ActorRef.
256      *
257      * @param shardName the name of the local shard that needs to be found
258      */
259     public Future<ActorRef> findLocalShardAsync( final String shardName) {
260         Future<Object> future = executeOperationAsync(shardManager,
261                 new FindLocalShard(shardName, true), shardInitializationTimeout);
262
263         return future.map(new Mapper<Object, ActorRef>() {
264             @Override
265             public ActorRef checkedApply(Object response) throws Throwable {
266                 if(response instanceof LocalShardFound) {
267                     LocalShardFound found = (LocalShardFound)response;
268                     LOG.debug("Local shard found {}", found.getPath());
269                     return found.getPath();
270                 } else if(response instanceof NotInitializedException) {
271                     throw (NotInitializedException)response;
272                 } else if(response instanceof LocalShardNotFound) {
273                     throw new LocalShardNotFoundException(
274                             String.format("Local shard for %s does not exist.", shardName));
275                 }
276
277                 throw new UnknownMessageException(String.format(
278                         "FindLocalShard returned unkown response: %s", response));
279             }
280         }, getClientDispatcher());
281     }
282
283     /**
284      * Executes an operation on a local actor and wait for it's response
285      *
286      * @param actor
287      * @param message
288      * @return The response of the operation
289      */
290     public Object executeOperation(ActorRef actor, Object message) {
291         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
292
293         try {
294             return Await.result(future, operationDuration);
295         } catch (Exception e) {
296             throw new TimeoutException("Sending message " + message.getClass().toString() +
297                     " to actor " + actor.toString() + " failed. Try again later.", e);
298         }
299     }
300
301     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
302         Preconditions.checkArgument(actor != null, "actor must not be null");
303         Preconditions.checkArgument(message != null, "message must not be null");
304
305         LOG.debug("Sending message {} to {}", message.getClass(), actor);
306         return doAsk(actor, message, timeout);
307     }
308
309     /**
310      * Execute an operation on a remote actor and wait for it's response
311      *
312      * @param actor
313      * @param message
314      * @return
315      */
316     public Object executeOperation(ActorSelection actor, Object message) {
317         Future<Object> future = executeOperationAsync(actor, message);
318
319         try {
320             return Await.result(future, operationDuration);
321         } catch (Exception e) {
322             throw new TimeoutException("Sending message " + message.getClass().toString() +
323                     " to actor " + actor.toString() + " failed. Try again later.", e);
324         }
325     }
326
327     /**
328      * Execute an operation on a remote actor asynchronously.
329      *
330      * @param actor the ActorSelection
331      * @param message the message to send
332      * @param timeout the operation timeout
333      * @return a Future containing the eventual result
334      */
335     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
336             Timeout timeout) {
337         Preconditions.checkArgument(actor != null, "actor must not be null");
338         Preconditions.checkArgument(message != null, "message must not be null");
339
340         LOG.debug("Sending message {} to {}", message.getClass(), actor);
341
342         return doAsk(actor, message, timeout);
343     }
344
345     /**
346      * Execute an operation on a remote actor asynchronously.
347      *
348      * @param actor the ActorSelection
349      * @param message the message to send
350      * @return a Future containing the eventual result
351      */
352     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
353         return executeOperationAsync(actor, message, operationTimeout);
354     }
355
356     /**
357      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
358      * reply (essentially set and forget).
359      *
360      * @param actor the ActorSelection
361      * @param message the message to send
362      */
363     public void sendOperationAsync(ActorSelection actor, Object message) {
364         Preconditions.checkArgument(actor != null, "actor must not be null");
365         Preconditions.checkArgument(message != null, "message must not be null");
366
367         LOG.debug("Sending message {} to {}", message.getClass(), actor);
368
369         actor.tell(message, ActorRef.noSender());
370     }
371
372     public void shutdown() {
373         shardManager.tell(PoisonPill.getInstance(), null);
374         actorSystem.shutdown();
375     }
376
377     public ClusterWrapper getClusterWrapper() {
378         return clusterWrapper;
379     }
380
381     public String getCurrentMemberName(){
382         return clusterWrapper.getCurrentMemberName();
383     }
384
385     /**
386      * Send the message to each and every shard
387      *
388      * @param message
389      */
390     public void broadcast(final Object message){
391         for(final String shardName : configuration.getAllShardNames()){
392
393             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
394             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
395                 @Override
396                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
397                     if(failure != null) {
398                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
399                                 message.getClass().getSimpleName(), shardName, failure);
400                     } else {
401                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
402                     }
403                 }
404             }, getClientDispatcher());
405         }
406     }
407
408     public FiniteDuration getOperationDuration() {
409         return operationDuration;
410     }
411
412     public boolean isPathLocal(String path) {
413         if (Strings.isNullOrEmpty(path)) {
414             return false;
415         }
416
417         int pathAtIndex = path.indexOf('@');
418         if (pathAtIndex == -1) {
419             //if the path is of local format, then its local and is co-located
420             return true;
421
422         } else if (selfAddressHostPort != null) {
423             // self-address and tx actor path, both are of remote path format
424             int slashIndex = path.indexOf('/', pathAtIndex);
425
426             if (slashIndex == -1) {
427                 return false;
428             }
429
430             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
431             return hostPort.equals(selfAddressHostPort);
432
433         } else {
434             // self address is local format and tx actor path is remote format
435             return false;
436         }
437     }
438
439     /**
440      * @deprecated This method is present only to support backward compatibility with Helium and should not be
441      * used any further
442      *
443      *
444      * @param primaryPath
445      * @param localPathOfRemoteActor
446      * @return
447     */
448     @Deprecated
449     public String resolvePath(final String primaryPath,
450                                             final String localPathOfRemoteActor) {
451         StringBuilder builder = new StringBuilder();
452         String[] primaryPathElements = primaryPath.split("/");
453         builder.append(primaryPathElements[0]).append("//")
454             .append(primaryPathElements[1]).append(primaryPathElements[2]);
455         String[] remotePathElements = localPathOfRemoteActor.split("/");
456         for (int i = 3; i < remotePathElements.length; i++) {
457                 builder.append("/").append(remotePathElements[i]);
458             }
459
460         return builder.toString();
461     }
462
463     /**
464      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
465      * should begin throttling the operations
466      *
467      * Parking reading this configuration here because we need to get to the actor system settings
468      *
469      * @return
470      */
471     public int getTransactionOutstandingOperationLimit(){
472         return transactionOutstandingOperationLimit;
473     }
474
475     /**
476      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
477      * us to create a timer for pretty much anything.
478      *
479      * @param operationName
480      * @return
481      */
482     public Timer getOperationTimer(String operationName){
483         return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
484     }
485
486     public Timer getOperationTimer(String dataStoreType, String operationName){
487         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
488                 operationName, METRIC_RATE);
489         return metricRegistry.timer(rate);
490     }
491
492     /**
493      * Get the type of the data store to which this ActorContext belongs
494      *
495      * @return
496      */
497     public String getDataStoreType() {
498         return datastoreContext.getDataStoreType();
499     }
500
501     /**
502      * Set the number of transaction creation permits that are to be allowed
503      *
504      * @param permitsPerSecond
505      */
506     public void setTxCreationLimit(double permitsPerSecond){
507         txRateLimiter.setRate(permitsPerSecond);
508     }
509
510     /**
511      * Get the current transaction creation rate limit
512      * @return
513      */
514     public double getTxCreationLimit(){
515         return txRateLimiter.getRate();
516     }
517
518     /**
519      * Try to acquire a transaction creation permit. Will block if no permits are available.
520      */
521     public void acquireTxCreationPermit(){
522         txRateLimiter.acquire();
523     }
524
525     /**
526      * Return the operation timeout to be used when committing transactions
527      * @return
528      */
529     public Timeout getTransactionCommitOperationTimeout(){
530         return transactionCommitOperationTimeout;
531     }
532
533     /**
534      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
535      * code on the datastore
536      * @return
537      */
538     public ExecutionContext getClientDispatcher() {
539         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
540     }
541
542     public String getNotificationDispatcherPath(){
543         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
544     }
545
546     public Configuration getConfiguration() {
547         return configuration;
548     }
549
550     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
551         return ask(actorRef, message, timeout);
552     }
553
554     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
555         return ask(actorRef, message, timeout);
556     }
557
558     @VisibleForTesting
559     Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
560         return primaryShardInfoCache;
561     }
562 }