BUG-5903: do not rely on primary info on failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.opendaylight.controller.cluster.access.concepts.MemberName;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
47 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
49 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
50 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
51 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.Await;
57 import scala.concurrent.ExecutionContext;
58 import scala.concurrent.Future;
59 import scala.concurrent.duration.Duration;
60 import scala.concurrent.duration.FiniteDuration;
61
62 /**
63  * The ActorContext class contains utility methods which could be used by
64  * non-actors (like DistributedDataStore) to work with actors a little more
65  * easily. An ActorContext can be freely passed around to local object instances
66  * but should not be passed to actors especially remote actors
67  */
68 public class ActorContext {
69     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
70     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
71     private static final String METRIC_RATE = "rate";
72     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
73                                                               new Mapper<Throwable, Throwable>() {
74         @Override
75         public Throwable apply(Throwable failure) {
76             Throwable actualFailure = failure;
77             if(failure instanceof AskTimeoutException) {
78                 // A timeout exception most likely means the shard isn't initialized.
79                 actualFailure = new NotInitializedException(
80                         "Timed out trying to find the primary shard. Most likely cause is the " +
81                         "shard is not initialized yet.");
82             }
83
84             return actualFailure;
85         }
86     };
87     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
88     public static final String COMMIT = "commit";
89
90     private final ActorSystem actorSystem;
91     private final ActorRef shardManager;
92     private final ClusterWrapper clusterWrapper;
93     private final Configuration configuration;
94     private DatastoreContext datastoreContext;
95     private FiniteDuration operationDuration;
96     private Timeout operationTimeout;
97     private final String selfAddressHostPort;
98     private TransactionRateLimiter txRateLimiter;
99     private Timeout transactionCommitOperationTimeout;
100     private Timeout shardInitializationTimeout;
101     private final Dispatchers dispatchers;
102
103     private volatile SchemaContext schemaContext;
104     private volatile boolean updated;
105     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
106
107     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
108     private final ShardStrategyFactory shardStrategyFactory;
109
110     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
111             ClusterWrapper clusterWrapper, Configuration configuration) {
112         this(actorSystem, shardManager, clusterWrapper, configuration,
113                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
114     }
115
116     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
117             ClusterWrapper clusterWrapper, Configuration configuration,
118             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
119         this.actorSystem = actorSystem;
120         this.shardManager = shardManager;
121         this.clusterWrapper = clusterWrapper;
122         this.configuration = configuration;
123         this.datastoreContext = datastoreContext;
124         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
125         this.primaryShardInfoCache = primaryShardInfoCache;
126         this.shardStrategyFactory = new ShardStrategyFactory(configuration);
127
128         setCachedProperties();
129
130         Address selfAddress = clusterWrapper.getSelfAddress();
131         if (selfAddress != null && !selfAddress.host().isEmpty()) {
132             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
133         } else {
134             selfAddressHostPort = null;
135         }
136
137     }
138
139     private void setCachedProperties() {
140         txRateLimiter = new TransactionRateLimiter(this);
141
142         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
143         operationTimeout = new Timeout(operationDuration);
144
145         transactionCommitOperationTimeout =  new Timeout(Duration.create(
146                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
147
148         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
149     }
150
151     public DatastoreContext getDatastoreContext() {
152         return datastoreContext;
153     }
154
155     public ActorSystem getActorSystem() {
156         return actorSystem;
157     }
158
159     public ActorRef getShardManager() {
160         return shardManager;
161     }
162
163     public ActorSelection actorSelection(String actorPath) {
164         return actorSystem.actorSelection(actorPath);
165     }
166
167     public ActorSelection actorSelection(ActorPath actorPath) {
168         return actorSystem.actorSelection(actorPath);
169     }
170
171     public void setSchemaContext(SchemaContext schemaContext) {
172         this.schemaContext = schemaContext;
173
174         if(shardManager != null) {
175             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
176         }
177     }
178
179     public void setDatastoreContext(DatastoreContextFactory contextFactory) {
180         this.datastoreContext = contextFactory.getBaseDatastoreContext();
181         setCachedProperties();
182
183         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
184         // will be published immediately even though they may not be immediately visible to other
185         // threads due to unsynchronized reads. That's OK though - we're going for eventual
186         // consistency here as immediately visible updates to these members aren't critical. These
187         // members could've been made volatile but wanted to avoid volatile reads as these are
188         // accessed often and updates will be infrequent.
189
190         updated = true;
191
192         if(shardManager != null) {
193             shardManager.tell(contextFactory, ActorRef.noSender());
194         }
195     }
196
197     public SchemaContext getSchemaContext() {
198         return schemaContext;
199     }
200
201     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
202         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
203         if(ret != null){
204             return ret;
205         }
206         Future<Object> future = executeOperationAsync(shardManager,
207                 new FindPrimary(shardName, true), shardInitializationTimeout);
208
209         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
210             @Override
211             public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
212                 if(response instanceof RemotePrimaryShardFound) {
213                     LOG.debug("findPrimaryShardAsync received: {}", response);
214                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
215                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
216                 } else if(response instanceof LocalPrimaryShardFound) {
217                     LOG.debug("findPrimaryShardAsync received: {}", response);
218                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
219                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
220                             found.getLocalShardDataTree());
221                 } else if(response instanceof NotInitializedException) {
222                     throw (NotInitializedException)response;
223                 } else if(response instanceof PrimaryNotFoundException) {
224                     throw (PrimaryNotFoundException)response;
225                 } else if(response instanceof NoShardLeaderException) {
226                     throw (NoShardLeaderException)response;
227                 }
228
229                 throw new UnknownMessageException(String.format(
230                         "FindPrimary returned unkown response: %s", response));
231             }
232         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
233     }
234
235     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
236             short primaryVersion, DataTree localShardDataTree) {
237         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
238         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
239             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
240         primaryShardInfoCache.putSuccessful(shardName, info);
241         return info;
242     }
243
244     /**
245      * Finds a local shard given its shard name and return it's ActorRef
246      *
247      * @param shardName the name of the local shard that needs to be found
248      * @return a reference to a local shard actor which represents the shard
249      *         specified by the shardName
250      */
251     public Optional<ActorRef> findLocalShard(String shardName) {
252         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
253
254         if (result instanceof LocalShardFound) {
255             LocalShardFound found = (LocalShardFound) result;
256             LOG.debug("Local shard found {}", found.getPath());
257             return Optional.of(found.getPath());
258         }
259
260         return Optional.absent();
261     }
262
263     /**
264      * Finds a local shard async given its shard name and return a Future from which to obtain the
265      * ActorRef.
266      *
267      * @param shardName the name of the local shard that needs to be found
268      */
269     public Future<ActorRef> findLocalShardAsync( final String shardName) {
270         Future<Object> future = executeOperationAsync(shardManager,
271                 new FindLocalShard(shardName, true), shardInitializationTimeout);
272
273         return future.map(new Mapper<Object, ActorRef>() {
274             @Override
275             public ActorRef checkedApply(Object response) throws Throwable {
276                 if(response instanceof LocalShardFound) {
277                     LocalShardFound found = (LocalShardFound)response;
278                     LOG.debug("Local shard found {}", found.getPath());
279                     return found.getPath();
280                 } else if(response instanceof NotInitializedException) {
281                     throw (NotInitializedException)response;
282                 } else if(response instanceof LocalShardNotFound) {
283                     throw new LocalShardNotFoundException(
284                             String.format("Local shard for %s does not exist.", shardName));
285                 }
286
287                 throw new UnknownMessageException(String.format(
288                         "FindLocalShard returned unkown response: %s", response));
289             }
290         }, getClientDispatcher());
291     }
292
293     /**
294      * Executes an operation on a local actor and wait for it's response
295      *
296      * @param actor
297      * @param message
298      * @return The response of the operation
299      */
300     public Object executeOperation(ActorRef actor, Object message) {
301         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
302
303         try {
304             return Await.result(future, operationDuration);
305         } catch (Exception e) {
306             throw new TimeoutException("Sending message " + message.getClass().toString() +
307                     " to actor " + actor.toString() + " failed. Try again later.", e);
308         }
309     }
310
311     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
312         Preconditions.checkArgument(actor != null, "actor must not be null");
313         Preconditions.checkArgument(message != null, "message must not be null");
314
315         LOG.debug("Sending message {} to {}", message.getClass(), actor);
316         return doAsk(actor, message, timeout);
317     }
318
319     /**
320      * Execute an operation on a remote actor and wait for it's response
321      *
322      * @param actor
323      * @param message
324      * @return
325      */
326     public Object executeOperation(ActorSelection actor, Object message) {
327         Future<Object> future = executeOperationAsync(actor, message);
328
329         try {
330             return Await.result(future, operationDuration);
331         } catch (Exception e) {
332             throw new TimeoutException("Sending message " + message.getClass().toString() +
333                     " to actor " + actor.toString() + " failed. Try again later.", e);
334         }
335     }
336
337     /**
338      * Execute an operation on a remote actor asynchronously.
339      *
340      * @param actor the ActorSelection
341      * @param message the message to send
342      * @param timeout the operation timeout
343      * @return a Future containing the eventual result
344      */
345     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
346             Timeout timeout) {
347         Preconditions.checkArgument(actor != null, "actor must not be null");
348         Preconditions.checkArgument(message != null, "message must not be null");
349
350         LOG.debug("Sending message {} to {}", message.getClass(), actor);
351
352         return doAsk(actor, message, timeout);
353     }
354
355     /**
356      * Execute an operation on a remote actor asynchronously.
357      *
358      * @param actor the ActorSelection
359      * @param message the message to send
360      * @return a Future containing the eventual result
361      */
362     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
363         return executeOperationAsync(actor, message, operationTimeout);
364     }
365
366     /**
367      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
368      * reply (essentially set and forget).
369      *
370      * @param actor the ActorSelection
371      * @param message the message to send
372      */
373     public void sendOperationAsync(ActorSelection actor, Object message) {
374         Preconditions.checkArgument(actor != null, "actor must not be null");
375         Preconditions.checkArgument(message != null, "message must not be null");
376
377         LOG.debug("Sending message {} to {}", message.getClass(), actor);
378
379         actor.tell(message, ActorRef.noSender());
380     }
381
382     public void shutdown() {
383         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
384         try {
385             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
386         } catch(Exception e) {
387             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
388         }
389     }
390
391     public ClusterWrapper getClusterWrapper() {
392         return clusterWrapper;
393     }
394
395     public MemberName getCurrentMemberName(){
396         return clusterWrapper.getCurrentMemberName();
397     }
398
399     /**
400      * Send the message to each and every shard
401      */
402     public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass){
403         for(final String shardName : configuration.getAllShardNames()){
404
405             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
406             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
407                 @Override
408                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
409                     if(failure != null) {
410                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
411                             messageClass.getSimpleName(), shardName, failure);
412                     } else {
413                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
414                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
415                     }
416                 }
417             }, getClientDispatcher());
418         }
419     }
420
421     public FiniteDuration getOperationDuration() {
422         return operationDuration;
423     }
424
425     public Timeout getOperationTimeout() {
426         return operationTimeout;
427     }
428
429     public boolean isPathLocal(String path) {
430         if (Strings.isNullOrEmpty(path)) {
431             return false;
432         }
433
434         int pathAtIndex = path.indexOf('@');
435         if (pathAtIndex == -1) {
436             //if the path is of local format, then its local and is co-located
437             return true;
438
439         } else if (selfAddressHostPort != null) {
440             // self-address and tx actor path, both are of remote path format
441             int slashIndex = path.indexOf('/', pathAtIndex);
442
443             if (slashIndex == -1) {
444                 return false;
445             }
446
447             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
448             return hostPort.equals(selfAddressHostPort);
449
450         } else {
451             // self address is local format and tx actor path is remote format
452             return false;
453         }
454     }
455
456     /**
457      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
458      * us to create a timer for pretty much anything.
459      *
460      * @param operationName
461      * @return
462      */
463     public Timer getOperationTimer(String operationName){
464         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
465     }
466
467     public Timer getOperationTimer(String dataStoreType, String operationName){
468         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
469                 operationName, METRIC_RATE);
470         return metricRegistry.timer(rate);
471     }
472
473     /**
474      * Get the name of the data store to which this ActorContext belongs
475      *
476      * @return
477      */
478     public String getDataStoreName() {
479         return datastoreContext.getDataStoreName();
480     }
481
482     /**
483      * Get the current transaction creation rate limit
484      * @return
485      */
486     public double getTxCreationLimit(){
487         return txRateLimiter.getTxCreationLimit();
488     }
489
490     /**
491      * Try to acquire a transaction creation permit. Will block if no permits are available.
492      */
493     public void acquireTxCreationPermit(){
494         txRateLimiter.acquire();
495     }
496
497     /**
498      * Return the operation timeout to be used when committing transactions
499      * @return
500      */
501     public Timeout getTransactionCommitOperationTimeout(){
502         return transactionCommitOperationTimeout;
503     }
504
505     /**
506      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
507      * code on the datastore
508      * @return
509      */
510     public ExecutionContext getClientDispatcher() {
511         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
512     }
513
514     public String getNotificationDispatcherPath(){
515         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
516     }
517
518     public Configuration getConfiguration() {
519         return configuration;
520     }
521
522     public ShardStrategyFactory getShardStrategyFactory() {
523         return shardStrategyFactory;
524     }
525
526     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
527         return ask(actorRef, message, timeout);
528     }
529
530     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
531         return ask(actorRef, message, timeout);
532     }
533
534     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
535         return primaryShardInfoCache;
536     }
537 }