Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
29 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
30 import org.opendaylight.controller.cluster.datastore.Configuration;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
32 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
33 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
40 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
44 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
47 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Await;
52 import scala.concurrent.ExecutionContext;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.Duration;
55 import scala.concurrent.duration.FiniteDuration;
56
57 /**
58  * The ActorContext class contains utility methods which could be used by
59  * non-actors (like DistributedDataStore) to work with actors a little more
60  * easily. An ActorContext can be freely passed around to local object instances
61  * but should not be passed to actors especially remote actors
62  */
63 public class ActorContext {
64     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
65     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
66     private static final String METRIC_RATE = "rate";
67     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
68                                                               new Mapper<Throwable, Throwable>() {
69         @Override
70         public Throwable apply(Throwable failure) {
71             Throwable actualFailure = failure;
72             if(failure instanceof AskTimeoutException) {
73                 // A timeout exception most likely means the shard isn't initialized.
74                 actualFailure = new NotInitializedException(
75                         "Timed out trying to find the primary shard. Most likely cause is the " +
76                         "shard is not initialized yet.");
77             }
78
79             return actualFailure;
80         }
81     };
82     public static final String MAILBOX = "bounded-mailbox";
83     public static final String COMMIT = "commit";
84
85     private final ActorSystem actorSystem;
86     private final ActorRef shardManager;
87     private final ClusterWrapper clusterWrapper;
88     private final Configuration configuration;
89     private DatastoreContext datastoreContext;
90     private FiniteDuration operationDuration;
91     private Timeout operationTimeout;
92     private final String selfAddressHostPort;
93     private TransactionRateLimiter txRateLimiter;
94     private final int transactionOutstandingOperationLimit;
95     private Timeout transactionCommitOperationTimeout;
96     private Timeout shardInitializationTimeout;
97     private final Dispatchers dispatchers;
98
99     private volatile SchemaContext schemaContext;
100     private volatile boolean updated;
101     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
102
103     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
104
105     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
106             ClusterWrapper clusterWrapper, Configuration configuration) {
107         this(actorSystem, shardManager, clusterWrapper, configuration,
108                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
109     }
110
111     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
112             ClusterWrapper clusterWrapper, Configuration configuration,
113             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
114         this.actorSystem = actorSystem;
115         this.shardManager = shardManager;
116         this.clusterWrapper = clusterWrapper;
117         this.configuration = configuration;
118         this.datastoreContext = datastoreContext;
119         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
120         this.primaryShardInfoCache = primaryShardInfoCache;
121
122         setCachedProperties();
123
124         Address selfAddress = clusterWrapper.getSelfAddress();
125         if (selfAddress != null && !selfAddress.host().isEmpty()) {
126             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
127         } else {
128             selfAddressHostPort = null;
129         }
130
131         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
132     }
133
134     private void setCachedProperties() {
135         txRateLimiter = new TransactionRateLimiter(this);
136
137         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
138         operationTimeout = new Timeout(operationDuration);
139
140         transactionCommitOperationTimeout =  new Timeout(Duration.create(
141                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
142
143         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
144     }
145
146     public DatastoreContext getDatastoreContext() {
147         return datastoreContext;
148     }
149
150     public ActorSystem getActorSystem() {
151         return actorSystem;
152     }
153
154     public ActorRef getShardManager() {
155         return shardManager;
156     }
157
158     public ActorSelection actorSelection(String actorPath) {
159         return actorSystem.actorSelection(actorPath);
160     }
161
162     public ActorSelection actorSelection(ActorPath actorPath) {
163         return actorSystem.actorSelection(actorPath);
164     }
165
166     public void setSchemaContext(SchemaContext schemaContext) {
167         this.schemaContext = schemaContext;
168
169         if(shardManager != null) {
170             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
171         }
172     }
173
174     public void setDatastoreContext(DatastoreContext context) {
175         this.datastoreContext = context;
176         setCachedProperties();
177
178         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
179         // will be published immediately even though they may not be immediately visible to other
180         // threads due to unsynchronized reads. That's OK though - we're going for eventual
181         // consistency here as immediately visible updates to these members aren't critical. These
182         // members could've been made volatile but wanted to avoid volatile reads as these are
183         // accessed often and updates will be infrequent.
184
185         updated = true;
186
187         if(shardManager != null) {
188             shardManager.tell(context, ActorRef.noSender());
189         }
190     }
191
192     public SchemaContext getSchemaContext() {
193         return schemaContext;
194     }
195
196     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
197         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
198         if(ret != null){
199             return ret;
200         }
201         Future<Object> future = executeOperationAsync(shardManager,
202                 new FindPrimary(shardName, true), shardInitializationTimeout);
203
204         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
205             @Override
206             public PrimaryShardInfo checkedApply(Object response) throws Exception {
207                 if(response instanceof RemotePrimaryShardFound) {
208                     LOG.debug("findPrimaryShardAsync received: {}", response);
209                     return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
210                 } else if(response instanceof LocalPrimaryShardFound) {
211                     LOG.debug("findPrimaryShardAsync received: {}", response);
212                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
213                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
214                 } else if(response instanceof NotInitializedException) {
215                     throw (NotInitializedException)response;
216                 } else if(response instanceof PrimaryNotFoundException) {
217                     throw (PrimaryNotFoundException)response;
218                 } else if(response instanceof NoShardLeaderException) {
219                     throw (NoShardLeaderException)response;
220                 }
221
222                 throw new UnknownMessageException(String.format(
223                         "FindPrimary returned unkown response: %s", response));
224             }
225         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
226     }
227
228     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
229             DataTree localShardDataTree) {
230         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
231         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
232         primaryShardInfoCache.putSuccessful(shardName, info);
233         return info;
234     }
235
236     /**
237      * Finds a local shard given its shard name and return it's ActorRef
238      *
239      * @param shardName the name of the local shard that needs to be found
240      * @return a reference to a local shard actor which represents the shard
241      *         specified by the shardName
242      */
243     public Optional<ActorRef> findLocalShard(String shardName) {
244         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
245
246         if (result instanceof LocalShardFound) {
247             LocalShardFound found = (LocalShardFound) result;
248             LOG.debug("Local shard found {}", found.getPath());
249             return Optional.of(found.getPath());
250         }
251
252         return Optional.absent();
253     }
254
255     /**
256      * Finds a local shard async given its shard name and return a Future from which to obtain the
257      * ActorRef.
258      *
259      * @param shardName the name of the local shard that needs to be found
260      */
261     public Future<ActorRef> findLocalShardAsync( final String shardName) {
262         Future<Object> future = executeOperationAsync(shardManager,
263                 new FindLocalShard(shardName, true), shardInitializationTimeout);
264
265         return future.map(new Mapper<Object, ActorRef>() {
266             @Override
267             public ActorRef checkedApply(Object response) throws Throwable {
268                 if(response instanceof LocalShardFound) {
269                     LocalShardFound found = (LocalShardFound)response;
270                     LOG.debug("Local shard found {}", found.getPath());
271                     return found.getPath();
272                 } else if(response instanceof NotInitializedException) {
273                     throw (NotInitializedException)response;
274                 } else if(response instanceof LocalShardNotFound) {
275                     throw new LocalShardNotFoundException(
276                             String.format("Local shard for %s does not exist.", shardName));
277                 }
278
279                 throw new UnknownMessageException(String.format(
280                         "FindLocalShard returned unkown response: %s", response));
281             }
282         }, getClientDispatcher());
283     }
284
285     /**
286      * Executes an operation on a local actor and wait for it's response
287      *
288      * @param actor
289      * @param message
290      * @return The response of the operation
291      */
292     public Object executeOperation(ActorRef actor, Object message) {
293         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
294
295         try {
296             return Await.result(future, operationDuration);
297         } catch (Exception e) {
298             throw new TimeoutException("Sending message " + message.getClass().toString() +
299                     " to actor " + actor.toString() + " failed. Try again later.", e);
300         }
301     }
302
303     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
304         Preconditions.checkArgument(actor != null, "actor must not be null");
305         Preconditions.checkArgument(message != null, "message must not be null");
306
307         LOG.debug("Sending message {} to {}", message.getClass(), actor);
308         return doAsk(actor, message, timeout);
309     }
310
311     /**
312      * Execute an operation on a remote actor and wait for it's response
313      *
314      * @param actor
315      * @param message
316      * @return
317      */
318     public Object executeOperation(ActorSelection actor, Object message) {
319         Future<Object> future = executeOperationAsync(actor, message);
320
321         try {
322             return Await.result(future, operationDuration);
323         } catch (Exception e) {
324             throw new TimeoutException("Sending message " + message.getClass().toString() +
325                     " to actor " + actor.toString() + " failed. Try again later.", e);
326         }
327     }
328
329     /**
330      * Execute an operation on a remote actor asynchronously.
331      *
332      * @param actor the ActorSelection
333      * @param message the message to send
334      * @param timeout the operation timeout
335      * @return a Future containing the eventual result
336      */
337     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
338             Timeout timeout) {
339         Preconditions.checkArgument(actor != null, "actor must not be null");
340         Preconditions.checkArgument(message != null, "message must not be null");
341
342         LOG.debug("Sending message {} to {}", message.getClass(), actor);
343
344         return doAsk(actor, message, timeout);
345     }
346
347     /**
348      * Execute an operation on a remote actor asynchronously.
349      *
350      * @param actor the ActorSelection
351      * @param message the message to send
352      * @return a Future containing the eventual result
353      */
354     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
355         return executeOperationAsync(actor, message, operationTimeout);
356     }
357
358     /**
359      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
360      * reply (essentially set and forget).
361      *
362      * @param actor the ActorSelection
363      * @param message the message to send
364      */
365     public void sendOperationAsync(ActorSelection actor, Object message) {
366         Preconditions.checkArgument(actor != null, "actor must not be null");
367         Preconditions.checkArgument(message != null, "message must not be null");
368
369         LOG.debug("Sending message {} to {}", message.getClass(), actor);
370
371         actor.tell(message, ActorRef.noSender());
372     }
373
374     public void shutdown() {
375         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
376     }
377
378     public ClusterWrapper getClusterWrapper() {
379         return clusterWrapper;
380     }
381
382     public String getCurrentMemberName(){
383         return clusterWrapper.getCurrentMemberName();
384     }
385
386     /**
387      * Send the message to each and every shard
388      *
389      * @param message
390      */
391     public void broadcast(final Object message){
392         for(final String shardName : configuration.getAllShardNames()){
393
394             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
395             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
396                 @Override
397                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
398                     if(failure != null) {
399                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
400                                 message.getClass().getSimpleName(), shardName, failure);
401                     } else {
402                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
403                     }
404                 }
405             }, getClientDispatcher());
406         }
407     }
408
409     public FiniteDuration getOperationDuration() {
410         return operationDuration;
411     }
412
413     public boolean isPathLocal(String path) {
414         if (Strings.isNullOrEmpty(path)) {
415             return false;
416         }
417
418         int pathAtIndex = path.indexOf('@');
419         if (pathAtIndex == -1) {
420             //if the path is of local format, then its local and is co-located
421             return true;
422
423         } else if (selfAddressHostPort != null) {
424             // self-address and tx actor path, both are of remote path format
425             int slashIndex = path.indexOf('/', pathAtIndex);
426
427             if (slashIndex == -1) {
428                 return false;
429             }
430
431             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
432             return hostPort.equals(selfAddressHostPort);
433
434         } else {
435             // self address is local format and tx actor path is remote format
436             return false;
437         }
438     }
439
440     /**
441      * @deprecated This method is present only to support backward compatibility with Helium and should not be
442      * used any further
443      *
444      *
445      * @param primaryPath
446      * @param localPathOfRemoteActor
447      * @return
448     */
449     @Deprecated
450     public String resolvePath(final String primaryPath,
451                                             final String localPathOfRemoteActor) {
452         StringBuilder builder = new StringBuilder();
453         String[] primaryPathElements = primaryPath.split("/");
454         builder.append(primaryPathElements[0]).append("//")
455             .append(primaryPathElements[1]).append(primaryPathElements[2]);
456         String[] remotePathElements = localPathOfRemoteActor.split("/");
457         for (int i = 3; i < remotePathElements.length; i++) {
458                 builder.append("/").append(remotePathElements[i]);
459             }
460
461         return builder.toString();
462     }
463
464     /**
465      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
466      * should begin throttling the operations
467      *
468      * Parking reading this configuration here because we need to get to the actor system settings
469      *
470      * @return
471      */
472     public int getTransactionOutstandingOperationLimit(){
473         return transactionOutstandingOperationLimit;
474     }
475
476     /**
477      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
478      * us to create a timer for pretty much anything.
479      *
480      * @param operationName
481      * @return
482      */
483     public Timer getOperationTimer(String operationName){
484         return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
485     }
486
487     public Timer getOperationTimer(String dataStoreType, String operationName){
488         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
489                 operationName, METRIC_RATE);
490         return metricRegistry.timer(rate);
491     }
492
493     /**
494      * Get the type of the data store to which this ActorContext belongs
495      *
496      * @return
497      */
498     public String getDataStoreType() {
499         return datastoreContext.getDataStoreType();
500     }
501
502     /**
503      * Get the current transaction creation rate limit
504      * @return
505      */
506     public double getTxCreationLimit(){
507         return txRateLimiter.getTxCreationLimit();
508     }
509
510     /**
511      * Try to acquire a transaction creation permit. Will block if no permits are available.
512      */
513     public void acquireTxCreationPermit(){
514         txRateLimiter.acquire();
515     }
516
517     /**
518      * Return the operation timeout to be used when committing transactions
519      * @return
520      */
521     public Timeout getTransactionCommitOperationTimeout(){
522         return transactionCommitOperationTimeout;
523     }
524
525     /**
526      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
527      * code on the datastore
528      * @return
529      */
530     public ExecutionContext getClientDispatcher() {
531         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
532     }
533
534     public String getNotificationDispatcherPath(){
535         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
536     }
537
538     public Configuration getConfiguration() {
539         return configuration;
540     }
541
542     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
543         return ask(actorRef, message, timeout);
544     }
545
546     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
547         return ask(actorRef, message, timeout);
548     }
549
550     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
551         return primaryShardInfoCache;
552     }
553 }