Bug 4149: Implement per-shard DatastoreContext settings
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
29 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
32 import org.opendaylight.controller.cluster.datastore.config.Configuration;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
48 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
50 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.Await;
54 import scala.concurrent.ExecutionContext;
55 import scala.concurrent.Future;
56 import scala.concurrent.duration.Duration;
57 import scala.concurrent.duration.FiniteDuration;
58
59 /**
60  * The ActorContext class contains utility methods which could be used by
61  * non-actors (like DistributedDataStore) to work with actors a little more
62  * easily. An ActorContext can be freely passed around to local object instances
63  * but should not be passed to actors especially remote actors
64  */
65 public class ActorContext {
66     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
67     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
68     private static final String METRIC_RATE = "rate";
69     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
70                                                               new Mapper<Throwable, Throwable>() {
71         @Override
72         public Throwable apply(Throwable failure) {
73             Throwable actualFailure = failure;
74             if(failure instanceof AskTimeoutException) {
75                 // A timeout exception most likely means the shard isn't initialized.
76                 actualFailure = new NotInitializedException(
77                         "Timed out trying to find the primary shard. Most likely cause is the " +
78                         "shard is not initialized yet.");
79             }
80
81             return actualFailure;
82         }
83     };
84     public static final String MAILBOX = "bounded-mailbox";
85     public static final String COMMIT = "commit";
86
87     private final ActorSystem actorSystem;
88     private final ActorRef shardManager;
89     private final ClusterWrapper clusterWrapper;
90     private final Configuration configuration;
91     private DatastoreContext datastoreContext;
92     private FiniteDuration operationDuration;
93     private Timeout operationTimeout;
94     private final String selfAddressHostPort;
95     private TransactionRateLimiter txRateLimiter;
96     private Timeout transactionCommitOperationTimeout;
97     private Timeout shardInitializationTimeout;
98     private final Dispatchers dispatchers;
99
100     private volatile SchemaContext schemaContext;
101     private volatile boolean updated;
102     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
103
104     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
105     private final ShardStrategyFactory shardStrategyFactory;
106
107     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
108             ClusterWrapper clusterWrapper, Configuration configuration) {
109         this(actorSystem, shardManager, clusterWrapper, configuration,
110                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
111     }
112
113     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
114             ClusterWrapper clusterWrapper, Configuration configuration,
115             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
116         this.actorSystem = actorSystem;
117         this.shardManager = shardManager;
118         this.clusterWrapper = clusterWrapper;
119         this.configuration = configuration;
120         this.datastoreContext = datastoreContext;
121         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
122         this.primaryShardInfoCache = primaryShardInfoCache;
123         this.shardStrategyFactory = new ShardStrategyFactory(configuration);
124
125         setCachedProperties();
126
127         Address selfAddress = clusterWrapper.getSelfAddress();
128         if (selfAddress != null && !selfAddress.host().isEmpty()) {
129             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
130         } else {
131             selfAddressHostPort = null;
132         }
133
134     }
135
136     private void setCachedProperties() {
137         txRateLimiter = new TransactionRateLimiter(this);
138
139         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
140         operationTimeout = new Timeout(operationDuration);
141
142         transactionCommitOperationTimeout =  new Timeout(Duration.create(
143                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
144
145         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
146     }
147
148     public DatastoreContext getDatastoreContext() {
149         return datastoreContext;
150     }
151
152     public ActorSystem getActorSystem() {
153         return actorSystem;
154     }
155
156     public ActorRef getShardManager() {
157         return shardManager;
158     }
159
160     public ActorSelection actorSelection(String actorPath) {
161         return actorSystem.actorSelection(actorPath);
162     }
163
164     public ActorSelection actorSelection(ActorPath actorPath) {
165         return actorSystem.actorSelection(actorPath);
166     }
167
168     public void setSchemaContext(SchemaContext schemaContext) {
169         this.schemaContext = schemaContext;
170
171         if(shardManager != null) {
172             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
173         }
174     }
175
176     public void setDatastoreContext(DatastoreContextFactory contextFactory) {
177         this.datastoreContext = contextFactory.getBaseDatastoreContext();
178         setCachedProperties();
179
180         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
181         // will be published immediately even though they may not be immediately visible to other
182         // threads due to unsynchronized reads. That's OK though - we're going for eventual
183         // consistency here as immediately visible updates to these members aren't critical. These
184         // members could've been made volatile but wanted to avoid volatile reads as these are
185         // accessed often and updates will be infrequent.
186
187         updated = true;
188
189         if(shardManager != null) {
190             shardManager.tell(contextFactory, ActorRef.noSender());
191         }
192     }
193
194     public SchemaContext getSchemaContext() {
195         return schemaContext;
196     }
197
198     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
199         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
200         if(ret != null){
201             return ret;
202         }
203         Future<Object> future = executeOperationAsync(shardManager,
204                 new FindPrimary(shardName, true), shardInitializationTimeout);
205
206         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
207             @Override
208             public PrimaryShardInfo checkedApply(Object response) throws Exception {
209                 if(response instanceof RemotePrimaryShardFound) {
210                     LOG.debug("findPrimaryShardAsync received: {}", response);
211                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
212                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
213                 } else if(response instanceof LocalPrimaryShardFound) {
214                     LOG.debug("findPrimaryShardAsync received: {}", response);
215                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
216                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
217                             found.getLocalShardDataTree());
218                 } else if(response instanceof NotInitializedException) {
219                     throw (NotInitializedException)response;
220                 } else if(response instanceof PrimaryNotFoundException) {
221                     throw (PrimaryNotFoundException)response;
222                 } else if(response instanceof NoShardLeaderException) {
223                     throw (NoShardLeaderException)response;
224                 }
225
226                 throw new UnknownMessageException(String.format(
227                         "FindPrimary returned unkown response: %s", response));
228             }
229         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
230     }
231
232     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
233             short primaryVersion, DataTree localShardDataTree) {
234         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
235         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
236                 Optional.fromNullable(localShardDataTree));
237         primaryShardInfoCache.putSuccessful(shardName, info);
238         return info;
239     }
240
241     /**
242      * Finds a local shard given its shard name and return it's ActorRef
243      *
244      * @param shardName the name of the local shard that needs to be found
245      * @return a reference to a local shard actor which represents the shard
246      *         specified by the shardName
247      */
248     public Optional<ActorRef> findLocalShard(String shardName) {
249         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
250
251         if (result instanceof LocalShardFound) {
252             LocalShardFound found = (LocalShardFound) result;
253             LOG.debug("Local shard found {}", found.getPath());
254             return Optional.of(found.getPath());
255         }
256
257         return Optional.absent();
258     }
259
260     /**
261      * Finds a local shard async given its shard name and return a Future from which to obtain the
262      * ActorRef.
263      *
264      * @param shardName the name of the local shard that needs to be found
265      */
266     public Future<ActorRef> findLocalShardAsync( final String shardName) {
267         Future<Object> future = executeOperationAsync(shardManager,
268                 new FindLocalShard(shardName, true), shardInitializationTimeout);
269
270         return future.map(new Mapper<Object, ActorRef>() {
271             @Override
272             public ActorRef checkedApply(Object response) throws Throwable {
273                 if(response instanceof LocalShardFound) {
274                     LocalShardFound found = (LocalShardFound)response;
275                     LOG.debug("Local shard found {}", found.getPath());
276                     return found.getPath();
277                 } else if(response instanceof NotInitializedException) {
278                     throw (NotInitializedException)response;
279                 } else if(response instanceof LocalShardNotFound) {
280                     throw new LocalShardNotFoundException(
281                             String.format("Local shard for %s does not exist.", shardName));
282                 }
283
284                 throw new UnknownMessageException(String.format(
285                         "FindLocalShard returned unkown response: %s", response));
286             }
287         }, getClientDispatcher());
288     }
289
290     /**
291      * Executes an operation on a local actor and wait for it's response
292      *
293      * @param actor
294      * @param message
295      * @return The response of the operation
296      */
297     public Object executeOperation(ActorRef actor, Object message) {
298         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
299
300         try {
301             return Await.result(future, operationDuration);
302         } catch (Exception e) {
303             throw new TimeoutException("Sending message " + message.getClass().toString() +
304                     " to actor " + actor.toString() + " failed. Try again later.", e);
305         }
306     }
307
308     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
309         Preconditions.checkArgument(actor != null, "actor must not be null");
310         Preconditions.checkArgument(message != null, "message must not be null");
311
312         LOG.debug("Sending message {} to {}", message.getClass(), actor);
313         return doAsk(actor, message, timeout);
314     }
315
316     /**
317      * Execute an operation on a remote actor and wait for it's response
318      *
319      * @param actor
320      * @param message
321      * @return
322      */
323     public Object executeOperation(ActorSelection actor, Object message) {
324         Future<Object> future = executeOperationAsync(actor, message);
325
326         try {
327             return Await.result(future, operationDuration);
328         } catch (Exception e) {
329             throw new TimeoutException("Sending message " + message.getClass().toString() +
330                     " to actor " + actor.toString() + " failed. Try again later.", e);
331         }
332     }
333
334     /**
335      * Execute an operation on a remote actor asynchronously.
336      *
337      * @param actor the ActorSelection
338      * @param message the message to send
339      * @param timeout the operation timeout
340      * @return a Future containing the eventual result
341      */
342     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
343             Timeout timeout) {
344         Preconditions.checkArgument(actor != null, "actor must not be null");
345         Preconditions.checkArgument(message != null, "message must not be null");
346
347         LOG.debug("Sending message {} to {}", message.getClass(), actor);
348
349         return doAsk(actor, message, timeout);
350     }
351
352     /**
353      * Execute an operation on a remote actor asynchronously.
354      *
355      * @param actor the ActorSelection
356      * @param message the message to send
357      * @return a Future containing the eventual result
358      */
359     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
360         return executeOperationAsync(actor, message, operationTimeout);
361     }
362
363     /**
364      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
365      * reply (essentially set and forget).
366      *
367      * @param actor the ActorSelection
368      * @param message the message to send
369      */
370     public void sendOperationAsync(ActorSelection actor, Object message) {
371         Preconditions.checkArgument(actor != null, "actor must not be null");
372         Preconditions.checkArgument(message != null, "message must not be null");
373
374         LOG.debug("Sending message {} to {}", message.getClass(), actor);
375
376         actor.tell(message, ActorRef.noSender());
377     }
378
379     public void shutdown() {
380         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
381     }
382
383     public ClusterWrapper getClusterWrapper() {
384         return clusterWrapper;
385     }
386
387     public String getCurrentMemberName(){
388         return clusterWrapper.getCurrentMemberName();
389     }
390
391     /**
392      * Send the message to each and every shard
393      *
394      * @param message
395      */
396     public void broadcast(final Object message){
397         for(final String shardName : configuration.getAllShardNames()){
398
399             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
400             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
401                 @Override
402                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
403                     if(failure != null) {
404                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
405                                 message.getClass().getSimpleName(), shardName, failure);
406                     } else {
407                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
408                     }
409                 }
410             }, getClientDispatcher());
411         }
412     }
413
414     public FiniteDuration getOperationDuration() {
415         return operationDuration;
416     }
417
418     public Timeout getOperationTimeout() {
419         return operationTimeout;
420     }
421
422     public boolean isPathLocal(String path) {
423         if (Strings.isNullOrEmpty(path)) {
424             return false;
425         }
426
427         int pathAtIndex = path.indexOf('@');
428         if (pathAtIndex == -1) {
429             //if the path is of local format, then its local and is co-located
430             return true;
431
432         } else if (selfAddressHostPort != null) {
433             // self-address and tx actor path, both are of remote path format
434             int slashIndex = path.indexOf('/', pathAtIndex);
435
436             if (slashIndex == -1) {
437                 return false;
438             }
439
440             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
441             return hostPort.equals(selfAddressHostPort);
442
443         } else {
444             // self address is local format and tx actor path is remote format
445             return false;
446         }
447     }
448
449     /**
450      * @deprecated This method is present only to support backward compatibility with Helium and should not be
451      * used any further
452      *
453      *
454      * @param primaryPath
455      * @param localPathOfRemoteActor
456      * @return
457     */
458     @Deprecated
459     public String resolvePath(final String primaryPath,
460                                             final String localPathOfRemoteActor) {
461         StringBuilder builder = new StringBuilder();
462         String[] primaryPathElements = primaryPath.split("/");
463         builder.append(primaryPathElements[0]).append("//")
464             .append(primaryPathElements[1]).append(primaryPathElements[2]);
465         String[] remotePathElements = localPathOfRemoteActor.split("/");
466         for (int i = 3; i < remotePathElements.length; i++) {
467                 builder.append("/").append(remotePathElements[i]);
468             }
469
470         return builder.toString();
471     }
472
473     /**
474      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
475      * us to create a timer for pretty much anything.
476      *
477      * @param operationName
478      * @return
479      */
480     public Timer getOperationTimer(String operationName){
481         return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
482     }
483
484     public Timer getOperationTimer(String dataStoreType, String operationName){
485         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
486                 operationName, METRIC_RATE);
487         return metricRegistry.timer(rate);
488     }
489
490     /**
491      * Get the type of the data store to which this ActorContext belongs
492      *
493      * @return
494      */
495     public String getDataStoreType() {
496         return datastoreContext.getDataStoreType();
497     }
498
499     /**
500      * Get the current transaction creation rate limit
501      * @return
502      */
503     public double getTxCreationLimit(){
504         return txRateLimiter.getTxCreationLimit();
505     }
506
507     /**
508      * Try to acquire a transaction creation permit. Will block if no permits are available.
509      */
510     public void acquireTxCreationPermit(){
511         txRateLimiter.acquire();
512     }
513
514     /**
515      * Return the operation timeout to be used when committing transactions
516      * @return
517      */
518     public Timeout getTransactionCommitOperationTimeout(){
519         return transactionCommitOperationTimeout;
520     }
521
522     /**
523      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
524      * code on the datastore
525      * @return
526      */
527     public ExecutionContext getClientDispatcher() {
528         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
529     }
530
531     public String getNotificationDispatcherPath(){
532         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
533     }
534
535     public Configuration getConfiguration() {
536         return configuration;
537     }
538
539     public ShardStrategyFactory getShardStrategyFactory() {
540         return shardStrategyFactory;
541     }
542
543     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
544         return ask(actorRef, message, timeout);
545     }
546
547     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
548         return ask(actorRef, message, timeout);
549     }
550
551     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
552         return primaryShardInfoCache;
553     }
554 }