2b2487f77ea496954a627eabd63372e7a3da3648
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
29 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
31 import org.opendaylight.controller.cluster.datastore.config.Configuration;
32 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
33 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
40 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
44 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
47 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Await;
52 import scala.concurrent.ExecutionContext;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.Duration;
55 import scala.concurrent.duration.FiniteDuration;
56
57 /**
58  * The ActorContext class contains utility methods which could be used by
59  * non-actors (like DistributedDataStore) to work with actors a little more
60  * easily. An ActorContext can be freely passed around to local object instances
61  * but should not be passed to actors especially remote actors
62  */
63 public class ActorContext {
64     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
65     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
66     private static final String METRIC_RATE = "rate";
67     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
68                                                               new Mapper<Throwable, Throwable>() {
69         @Override
70         public Throwable apply(Throwable failure) {
71             Throwable actualFailure = failure;
72             if(failure instanceof AskTimeoutException) {
73                 // A timeout exception most likely means the shard isn't initialized.
74                 actualFailure = new NotInitializedException(
75                         "Timed out trying to find the primary shard. Most likely cause is the " +
76                         "shard is not initialized yet.");
77             }
78
79             return actualFailure;
80         }
81     };
82     public static final String MAILBOX = "bounded-mailbox";
83     public static final String COMMIT = "commit";
84
85     private final ActorSystem actorSystem;
86     private final ActorRef shardManager;
87     private final ClusterWrapper clusterWrapper;
88     private final Configuration configuration;
89     private DatastoreContext datastoreContext;
90     private FiniteDuration operationDuration;
91     private Timeout operationTimeout;
92     private final String selfAddressHostPort;
93     private TransactionRateLimiter txRateLimiter;
94     private Timeout transactionCommitOperationTimeout;
95     private Timeout shardInitializationTimeout;
96     private final Dispatchers dispatchers;
97
98     private volatile SchemaContext schemaContext;
99     private volatile boolean updated;
100     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
101
102     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
103
104     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
105             ClusterWrapper clusterWrapper, Configuration configuration) {
106         this(actorSystem, shardManager, clusterWrapper, configuration,
107                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
108     }
109
110     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
111             ClusterWrapper clusterWrapper, Configuration configuration,
112             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
113         this.actorSystem = actorSystem;
114         this.shardManager = shardManager;
115         this.clusterWrapper = clusterWrapper;
116         this.configuration = configuration;
117         this.datastoreContext = datastoreContext;
118         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
119         this.primaryShardInfoCache = primaryShardInfoCache;
120
121         setCachedProperties();
122
123         Address selfAddress = clusterWrapper.getSelfAddress();
124         if (selfAddress != null && !selfAddress.host().isEmpty()) {
125             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
126         } else {
127             selfAddressHostPort = null;
128         }
129
130     }
131
132     private void setCachedProperties() {
133         txRateLimiter = new TransactionRateLimiter(this);
134
135         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
136         operationTimeout = new Timeout(operationDuration);
137
138         transactionCommitOperationTimeout =  new Timeout(Duration.create(
139                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
140
141         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
142     }
143
144     public DatastoreContext getDatastoreContext() {
145         return datastoreContext;
146     }
147
148     public ActorSystem getActorSystem() {
149         return actorSystem;
150     }
151
152     public ActorRef getShardManager() {
153         return shardManager;
154     }
155
156     public ActorSelection actorSelection(String actorPath) {
157         return actorSystem.actorSelection(actorPath);
158     }
159
160     public ActorSelection actorSelection(ActorPath actorPath) {
161         return actorSystem.actorSelection(actorPath);
162     }
163
164     public void setSchemaContext(SchemaContext schemaContext) {
165         this.schemaContext = schemaContext;
166
167         if(shardManager != null) {
168             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
169         }
170     }
171
172     public void setDatastoreContext(DatastoreContext context) {
173         this.datastoreContext = context;
174         setCachedProperties();
175
176         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
177         // will be published immediately even though they may not be immediately visible to other
178         // threads due to unsynchronized reads. That's OK though - we're going for eventual
179         // consistency here as immediately visible updates to these members aren't critical. These
180         // members could've been made volatile but wanted to avoid volatile reads as these are
181         // accessed often and updates will be infrequent.
182
183         updated = true;
184
185         if(shardManager != null) {
186             shardManager.tell(context, ActorRef.noSender());
187         }
188     }
189
190     public SchemaContext getSchemaContext() {
191         return schemaContext;
192     }
193
194     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
195         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
196         if(ret != null){
197             return ret;
198         }
199         Future<Object> future = executeOperationAsync(shardManager,
200                 new FindPrimary(shardName, true), shardInitializationTimeout);
201
202         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
203             @Override
204             public PrimaryShardInfo checkedApply(Object response) throws Exception {
205                 if(response instanceof RemotePrimaryShardFound) {
206                     LOG.debug("findPrimaryShardAsync received: {}", response);
207                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
208                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
209                 } else if(response instanceof LocalPrimaryShardFound) {
210                     LOG.debug("findPrimaryShardAsync received: {}", response);
211                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
212                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
213                             found.getLocalShardDataTree());
214                 } else if(response instanceof NotInitializedException) {
215                     throw (NotInitializedException)response;
216                 } else if(response instanceof PrimaryNotFoundException) {
217                     throw (PrimaryNotFoundException)response;
218                 } else if(response instanceof NoShardLeaderException) {
219                     throw (NoShardLeaderException)response;
220                 }
221
222                 throw new UnknownMessageException(String.format(
223                         "FindPrimary returned unkown response: %s", response));
224             }
225         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
226     }
227
228     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
229             short primaryVersion, DataTree localShardDataTree) {
230         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
231         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
232                 Optional.fromNullable(localShardDataTree));
233         primaryShardInfoCache.putSuccessful(shardName, info);
234         return info;
235     }
236
237     /**
238      * Finds a local shard given its shard name and return it's ActorRef
239      *
240      * @param shardName the name of the local shard that needs to be found
241      * @return a reference to a local shard actor which represents the shard
242      *         specified by the shardName
243      */
244     public Optional<ActorRef> findLocalShard(String shardName) {
245         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
246
247         if (result instanceof LocalShardFound) {
248             LocalShardFound found = (LocalShardFound) result;
249             LOG.debug("Local shard found {}", found.getPath());
250             return Optional.of(found.getPath());
251         }
252
253         return Optional.absent();
254     }
255
256     /**
257      * Finds a local shard async given its shard name and return a Future from which to obtain the
258      * ActorRef.
259      *
260      * @param shardName the name of the local shard that needs to be found
261      */
262     public Future<ActorRef> findLocalShardAsync( final String shardName) {
263         Future<Object> future = executeOperationAsync(shardManager,
264                 new FindLocalShard(shardName, true), shardInitializationTimeout);
265
266         return future.map(new Mapper<Object, ActorRef>() {
267             @Override
268             public ActorRef checkedApply(Object response) throws Throwable {
269                 if(response instanceof LocalShardFound) {
270                     LocalShardFound found = (LocalShardFound)response;
271                     LOG.debug("Local shard found {}", found.getPath());
272                     return found.getPath();
273                 } else if(response instanceof NotInitializedException) {
274                     throw (NotInitializedException)response;
275                 } else if(response instanceof LocalShardNotFound) {
276                     throw new LocalShardNotFoundException(
277                             String.format("Local shard for %s does not exist.", shardName));
278                 }
279
280                 throw new UnknownMessageException(String.format(
281                         "FindLocalShard returned unkown response: %s", response));
282             }
283         }, getClientDispatcher());
284     }
285
286     /**
287      * Executes an operation on a local actor and wait for it's response
288      *
289      * @param actor
290      * @param message
291      * @return The response of the operation
292      */
293     public Object executeOperation(ActorRef actor, Object message) {
294         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
295
296         try {
297             return Await.result(future, operationDuration);
298         } catch (Exception e) {
299             throw new TimeoutException("Sending message " + message.getClass().toString() +
300                     " to actor " + actor.toString() + " failed. Try again later.", e);
301         }
302     }
303
304     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
305         Preconditions.checkArgument(actor != null, "actor must not be null");
306         Preconditions.checkArgument(message != null, "message must not be null");
307
308         LOG.debug("Sending message {} to {}", message.getClass(), actor);
309         return doAsk(actor, message, timeout);
310     }
311
312     /**
313      * Execute an operation on a remote actor and wait for it's response
314      *
315      * @param actor
316      * @param message
317      * @return
318      */
319     public Object executeOperation(ActorSelection actor, Object message) {
320         Future<Object> future = executeOperationAsync(actor, message);
321
322         try {
323             return Await.result(future, operationDuration);
324         } catch (Exception e) {
325             throw new TimeoutException("Sending message " + message.getClass().toString() +
326                     " to actor " + actor.toString() + " failed. Try again later.", e);
327         }
328     }
329
330     /**
331      * Execute an operation on a remote actor asynchronously.
332      *
333      * @param actor the ActorSelection
334      * @param message the message to send
335      * @param timeout the operation timeout
336      * @return a Future containing the eventual result
337      */
338     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
339             Timeout timeout) {
340         Preconditions.checkArgument(actor != null, "actor must not be null");
341         Preconditions.checkArgument(message != null, "message must not be null");
342
343         LOG.debug("Sending message {} to {}", message.getClass(), actor);
344
345         return doAsk(actor, message, timeout);
346     }
347
348     /**
349      * Execute an operation on a remote actor asynchronously.
350      *
351      * @param actor the ActorSelection
352      * @param message the message to send
353      * @return a Future containing the eventual result
354      */
355     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
356         return executeOperationAsync(actor, message, operationTimeout);
357     }
358
359     /**
360      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
361      * reply (essentially set and forget).
362      *
363      * @param actor the ActorSelection
364      * @param message the message to send
365      */
366     public void sendOperationAsync(ActorSelection actor, Object message) {
367         Preconditions.checkArgument(actor != null, "actor must not be null");
368         Preconditions.checkArgument(message != null, "message must not be null");
369
370         LOG.debug("Sending message {} to {}", message.getClass(), actor);
371
372         actor.tell(message, ActorRef.noSender());
373     }
374
375     public void shutdown() {
376         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
377     }
378
379     public ClusterWrapper getClusterWrapper() {
380         return clusterWrapper;
381     }
382
383     public String getCurrentMemberName(){
384         return clusterWrapper.getCurrentMemberName();
385     }
386
387     /**
388      * Send the message to each and every shard
389      *
390      * @param message
391      */
392     public void broadcast(final Object message){
393         for(final String shardName : configuration.getAllShardNames()){
394
395             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
396             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
397                 @Override
398                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
399                     if(failure != null) {
400                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
401                                 message.getClass().getSimpleName(), shardName, failure);
402                     } else {
403                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
404                     }
405                 }
406             }, getClientDispatcher());
407         }
408     }
409
410     public FiniteDuration getOperationDuration() {
411         return operationDuration;
412     }
413
414     public Timeout getOperationTimeout() {
415         return operationTimeout;
416     }
417
418     public boolean isPathLocal(String path) {
419         if (Strings.isNullOrEmpty(path)) {
420             return false;
421         }
422
423         int pathAtIndex = path.indexOf('@');
424         if (pathAtIndex == -1) {
425             //if the path is of local format, then its local and is co-located
426             return true;
427
428         } else if (selfAddressHostPort != null) {
429             // self-address and tx actor path, both are of remote path format
430             int slashIndex = path.indexOf('/', pathAtIndex);
431
432             if (slashIndex == -1) {
433                 return false;
434             }
435
436             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
437             return hostPort.equals(selfAddressHostPort);
438
439         } else {
440             // self address is local format and tx actor path is remote format
441             return false;
442         }
443     }
444
445     /**
446      * @deprecated This method is present only to support backward compatibility with Helium and should not be
447      * used any further
448      *
449      *
450      * @param primaryPath
451      * @param localPathOfRemoteActor
452      * @return
453     */
454     @Deprecated
455     public String resolvePath(final String primaryPath,
456                                             final String localPathOfRemoteActor) {
457         StringBuilder builder = new StringBuilder();
458         String[] primaryPathElements = primaryPath.split("/");
459         builder.append(primaryPathElements[0]).append("//")
460             .append(primaryPathElements[1]).append(primaryPathElements[2]);
461         String[] remotePathElements = localPathOfRemoteActor.split("/");
462         for (int i = 3; i < remotePathElements.length; i++) {
463                 builder.append("/").append(remotePathElements[i]);
464             }
465
466         return builder.toString();
467     }
468
469     /**
470      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
471      * us to create a timer for pretty much anything.
472      *
473      * @param operationName
474      * @return
475      */
476     public Timer getOperationTimer(String operationName){
477         return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
478     }
479
480     public Timer getOperationTimer(String dataStoreType, String operationName){
481         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
482                 operationName, METRIC_RATE);
483         return metricRegistry.timer(rate);
484     }
485
486     /**
487      * Get the type of the data store to which this ActorContext belongs
488      *
489      * @return
490      */
491     public String getDataStoreType() {
492         return datastoreContext.getDataStoreType();
493     }
494
495     /**
496      * Get the current transaction creation rate limit
497      * @return
498      */
499     public double getTxCreationLimit(){
500         return txRateLimiter.getTxCreationLimit();
501     }
502
503     /**
504      * Try to acquire a transaction creation permit. Will block if no permits are available.
505      */
506     public void acquireTxCreationPermit(){
507         txRateLimiter.acquire();
508     }
509
510     /**
511      * Return the operation timeout to be used when committing transactions
512      * @return
513      */
514     public Timeout getTransactionCommitOperationTimeout(){
515         return transactionCommitOperationTimeout;
516     }
517
518     /**
519      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
520      * code on the datastore
521      * @return
522      */
523     public ExecutionContext getClientDispatcher() {
524         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
525     }
526
527     public String getNotificationDispatcherPath(){
528         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
529     }
530
531     public Configuration getConfiguration() {
532         return configuration;
533     }
534
535     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
536         return ask(actorRef, message, timeout);
537     }
538
539     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
540         return ask(actorRef, message, timeout);
541     }
542
543     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
544         return primaryShardInfoCache;
545     }
546 }