7138a4a6e7b3bc05e610ad323c96ea13eae10281
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.pattern.AskTimeoutException;
20 import akka.util.Timeout;
21 import com.codahale.metrics.JmxReporter;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import com.google.common.util.concurrent.RateLimiter;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.Configuration;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
38 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.Future;
51 import scala.concurrent.duration.Duration;
52 import scala.concurrent.duration.FiniteDuration;
53
54 /**
55  * The ActorContext class contains utility methods which could be used by
56  * non-actors (like DistributedDataStore) to work with actors a little more
57  * easily. An ActorContext can be freely passed around to local object instances
58  * but should not be passed to actors especially remote actors
59  */
60 public class ActorContext {
61     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
62     private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
63     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
64     private static final String METRIC_RATE = "rate";
65     private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
66     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
67                                                               new Mapper<Throwable, Throwable>() {
68         @Override
69         public Throwable apply(Throwable failure) {
70             Throwable actualFailure = failure;
71             if(failure instanceof AskTimeoutException) {
72                 // A timeout exception most likely means the shard isn't initialized.
73                 actualFailure = new NotInitializedException(
74                         "Timed out trying to find the primary shard. Most likely cause is the " +
75                         "shard is not initialized yet.");
76             }
77
78             return actualFailure;
79         }
80     };
81     public static final String MAILBOX = "bounded-mailbox";
82
83     private final ActorSystem actorSystem;
84     private final ActorRef shardManager;
85     private final ClusterWrapper clusterWrapper;
86     private final Configuration configuration;
87     private final DatastoreContext datastoreContext;
88     private final String dataStoreType;
89     private final FiniteDuration operationDuration;
90     private final Timeout operationTimeout;
91     private final String selfAddressHostPort;
92     private final RateLimiter txRateLimiter;
93     private final MetricRegistry metricRegistry = new MetricRegistry();
94     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
95     private final int transactionOutstandingOperationLimit;
96     private final Timeout transactionCommitOperationTimeout;
97
98     private volatile SchemaContext schemaContext;
99
100     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
101             ClusterWrapper clusterWrapper, Configuration configuration) {
102         this(actorSystem, shardManager, clusterWrapper, configuration,
103                 DatastoreContext.newBuilder().build(), UNKNOWN_DATA_STORE_TYPE);
104     }
105
106     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
107             ClusterWrapper clusterWrapper, Configuration configuration,
108             DatastoreContext datastoreContext, String dataStoreType) {
109         this.actorSystem = actorSystem;
110         this.shardManager = shardManager;
111         this.clusterWrapper = clusterWrapper;
112         this.configuration = configuration;
113         this.datastoreContext = datastoreContext;
114         this.dataStoreType = dataStoreType;
115         this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
116
117         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
118         operationTimeout = new Timeout(operationDuration);
119         transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
120                 TimeUnit.SECONDS));
121
122
123         Address selfAddress = clusterWrapper.getSelfAddress();
124         if (selfAddress != null && !selfAddress.host().isEmpty()) {
125             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
126         } else {
127             selfAddressHostPort = null;
128         }
129
130         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
131         jmxReporter.start();
132     }
133
134     public DatastoreContext getDatastoreContext() {
135         return datastoreContext;
136     }
137
138     public ActorSystem getActorSystem() {
139         return actorSystem;
140     }
141
142     public ActorRef getShardManager() {
143         return shardManager;
144     }
145
146     public ActorSelection actorSelection(String actorPath) {
147         return actorSystem.actorSelection(actorPath);
148     }
149
150     public ActorSelection actorSelection(ActorPath actorPath) {
151         return actorSystem.actorSelection(actorPath);
152     }
153
154     public void setSchemaContext(SchemaContext schemaContext) {
155         this.schemaContext = schemaContext;
156
157         if(shardManager != null) {
158             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
159         }
160     }
161
162     public SchemaContext getSchemaContext() {
163         return schemaContext;
164     }
165
166     /**
167      * Finds the primary shard for the given shard name
168      *
169      * @param shardName
170      * @return
171      */
172     public Optional<ActorSelection> findPrimaryShard(String shardName) {
173         String path = findPrimaryPathOrNull(shardName);
174         if (path == null){
175             return Optional.absent();
176         }
177         return Optional.of(actorSystem.actorSelection(path));
178     }
179
180     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
181         Future<Object> future = executeOperationAsync(shardManager,
182                 new FindPrimary(shardName, true).toSerializable(),
183                 datastoreContext.getShardInitializationTimeout());
184
185         return future.transform(new Mapper<Object, ActorSelection>() {
186             @Override
187             public ActorSelection checkedApply(Object response) throws Exception {
188                 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
189                     PrimaryFound found = PrimaryFound.fromSerializable(response);
190
191                     LOG.debug("Primary found {}", found.getPrimaryPath());
192                     return actorSystem.actorSelection(found.getPrimaryPath());
193                 } else if(response instanceof ActorNotInitialized) {
194                     throw new NotInitializedException(
195                             String.format("Found primary shard %s but it's not initialized yet. " +
196                                           "Please try again later", shardName));
197                 } else if(response instanceof PrimaryNotFound) {
198                     throw new PrimaryNotFoundException(
199                             String.format("No primary shard found for %S.", shardName));
200                 }
201
202                 throw new UnknownMessageException(String.format(
203                         "FindPrimary returned unkown response: %s", response));
204             }
205         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
206     }
207
208     /**
209      * Finds a local shard given its shard name and return it's ActorRef
210      *
211      * @param shardName the name of the local shard that needs to be found
212      * @return a reference to a local shard actor which represents the shard
213      *         specified by the shardName
214      */
215     public Optional<ActorRef> findLocalShard(String shardName) {
216         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
217
218         if (result instanceof LocalShardFound) {
219             LocalShardFound found = (LocalShardFound) result;
220             LOG.debug("Local shard found {}", found.getPath());
221             return Optional.of(found.getPath());
222         }
223
224         return Optional.absent();
225     }
226
227     /**
228      * Finds a local shard async given its shard name and return a Future from which to obtain the
229      * ActorRef.
230      *
231      * @param shardName the name of the local shard that needs to be found
232      */
233     public Future<ActorRef> findLocalShardAsync( final String shardName) {
234         Future<Object> future = executeOperationAsync(shardManager,
235                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
236
237         return future.map(new Mapper<Object, ActorRef>() {
238             @Override
239             public ActorRef checkedApply(Object response) throws Throwable {
240                 if(response instanceof LocalShardFound) {
241                     LocalShardFound found = (LocalShardFound)response;
242                     LOG.debug("Local shard found {}", found.getPath());
243                     return found.getPath();
244                 } else if(response instanceof ActorNotInitialized) {
245                     throw new NotInitializedException(
246                             String.format("Found local shard for %s but it's not initialized yet.",
247                                     shardName));
248                 } else if(response instanceof LocalShardNotFound) {
249                     throw new LocalShardNotFoundException(
250                             String.format("Local shard for %s does not exist.", shardName));
251                 }
252
253                 throw new UnknownMessageException(String.format(
254                         "FindLocalShard returned unkown response: %s", response));
255             }
256         }, getActorSystem().dispatcher());
257     }
258
259     private String findPrimaryPathOrNull(String shardName) {
260         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
261
262         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
263             PrimaryFound found = PrimaryFound.fromSerializable(result);
264
265             LOG.debug("Primary found {}", found.getPrimaryPath());
266             return found.getPrimaryPath();
267
268         } else if (result.getClass().equals(ActorNotInitialized.class)){
269             throw new NotInitializedException(
270                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
271             );
272
273         } else {
274             return null;
275         }
276     }
277
278
279     /**
280      * Executes an operation on a local actor and wait for it's response
281      *
282      * @param actor
283      * @param message
284      * @return The response of the operation
285      */
286     public Object executeOperation(ActorRef actor, Object message) {
287         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
288
289         try {
290             return Await.result(future, operationDuration);
291         } catch (Exception e) {
292             throw new TimeoutException("Sending message " + message.getClass().toString() +
293                     " to actor " + actor.toString() + " failed. Try again later.", e);
294         }
295     }
296
297     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
298         Preconditions.checkArgument(actor != null, "actor must not be null");
299         Preconditions.checkArgument(message != null, "message must not be null");
300
301         LOG.debug("Sending message {} to {}", message.getClass(), actor);
302         return ask(actor, message, timeout);
303     }
304
305     /**
306      * Execute an operation on a remote actor and wait for it's response
307      *
308      * @param actor
309      * @param message
310      * @return
311      */
312     public Object executeOperation(ActorSelection actor, Object message) {
313         Future<Object> future = executeOperationAsync(actor, message);
314
315         try {
316             return Await.result(future, operationDuration);
317         } catch (Exception e) {
318             throw new TimeoutException("Sending message " + message.getClass().toString() +
319                     " to actor " + actor.toString() + " failed. Try again later.", e);
320         }
321     }
322
323     /**
324      * Execute an operation on a remote actor asynchronously.
325      *
326      * @param actor the ActorSelection
327      * @param message the message to send
328      * @param timeout the operation timeout
329      * @return a Future containing the eventual result
330      */
331     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
332             Timeout timeout) {
333         Preconditions.checkArgument(actor != null, "actor must not be null");
334         Preconditions.checkArgument(message != null, "message must not be null");
335
336         LOG.debug("Sending message {} to {}", message.getClass(), actor);
337
338         return ask(actor, message, timeout);
339     }
340
341     /**
342      * Execute an operation on a remote actor asynchronously.
343      *
344      * @param actor the ActorSelection
345      * @param message the message to send
346      * @return a Future containing the eventual result
347      */
348     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
349         return executeOperationAsync(actor, message, operationTimeout);
350     }
351
352     /**
353      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
354      * reply (essentially set and forget).
355      *
356      * @param actor the ActorSelection
357      * @param message the message to send
358      */
359     public void sendOperationAsync(ActorSelection actor, Object message) {
360         Preconditions.checkArgument(actor != null, "actor must not be null");
361         Preconditions.checkArgument(message != null, "message must not be null");
362
363         LOG.debug("Sending message {} to {}", message.getClass(), actor);
364
365         actor.tell(message, ActorRef.noSender());
366     }
367
368     public void shutdown() {
369         shardManager.tell(PoisonPill.getInstance(), null);
370         actorSystem.shutdown();
371     }
372
373     public ClusterWrapper getClusterWrapper() {
374         return clusterWrapper;
375     }
376
377     public String getCurrentMemberName(){
378         return clusterWrapper.getCurrentMemberName();
379     }
380
381     /**
382      * Send the message to each and every shard
383      *
384      * @param message
385      */
386     public void broadcast(Object message){
387         for(String shardName : configuration.getAllShardNames()){
388
389             Optional<ActorSelection> primary = findPrimaryShard(shardName);
390             if (primary.isPresent()) {
391                 primary.get().tell(message, ActorRef.noSender());
392             } else {
393                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
394                         message.getClass().getSimpleName(), shardName);
395             }
396         }
397     }
398
399     public FiniteDuration getOperationDuration() {
400         return operationDuration;
401     }
402
403     public boolean isPathLocal(String path) {
404         if (Strings.isNullOrEmpty(path)) {
405             return false;
406         }
407
408         int pathAtIndex = path.indexOf('@');
409         if (pathAtIndex == -1) {
410             //if the path is of local format, then its local and is co-located
411             return true;
412
413         } else if (selfAddressHostPort != null) {
414             // self-address and tx actor path, both are of remote path format
415             int slashIndex = path.indexOf('/', pathAtIndex);
416
417             if (slashIndex == -1) {
418                 return false;
419             }
420
421             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
422             return hostPort.equals(selfAddressHostPort);
423
424         } else {
425             // self address is local format and tx actor path is remote format
426             return false;
427         }
428     }
429
430     /**
431      * @deprecated This method is present only to support backward compatibility with Helium and should not be
432      * used any further
433      *
434      *
435      * @param primaryPath
436      * @param localPathOfRemoteActor
437      * @return
438     */
439     @Deprecated
440     public String resolvePath(final String primaryPath,
441                                             final String localPathOfRemoteActor) {
442         StringBuilder builder = new StringBuilder();
443         String[] primaryPathElements = primaryPath.split("/");
444         builder.append(primaryPathElements[0]).append("//")
445             .append(primaryPathElements[1]).append(primaryPathElements[2]);
446         String[] remotePathElements = localPathOfRemoteActor.split("/");
447         for (int i = 3; i < remotePathElements.length; i++) {
448                 builder.append("/").append(remotePathElements[i]);
449             }
450
451         return builder.toString();
452     }
453
454     /**
455      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
456      * should begin throttling the operations
457      *
458      * Parking reading this configuration here because we need to get to the actor system settings
459      *
460      * @return
461      */
462     public int getTransactionOutstandingOperationLimit(){
463         return transactionOutstandingOperationLimit;
464     }
465
466     /**
467      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
468      * us to create a timer for pretty much anything.
469      *
470      * @param operationName
471      * @return
472      */
473     public Timer getOperationTimer(String operationName){
474         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName, METRIC_RATE);
475         return metricRegistry.timer(rate);
476     }
477
478     /**
479      * Get the type of the data store to which this ActorContext belongs
480      *
481      * @return
482      */
483     public String getDataStoreType() {
484         return dataStoreType;
485     }
486
487     /**
488      * Set the number of transaction creation permits that are to be allowed
489      *
490      * @param permitsPerSecond
491      */
492     public void setTxCreationLimit(double permitsPerSecond){
493         txRateLimiter.setRate(permitsPerSecond);
494     }
495
496     /**
497      * Get the current transaction creation rate limit
498      * @return
499      */
500     public double getTxCreationLimit(){
501         return txRateLimiter.getRate();
502     }
503
504     /**
505      * Try to acquire a transaction creation permit. Will block if no permits are available.
506      */
507     public void acquireTxCreationPermit(){
508         txRateLimiter.acquire();
509     }
510
511     /**
512      * Return the operation timeout to be used when committing transactions
513      * @return
514      */
515     public Timeout getTransactionCommitOperationTimeout(){
516         return transactionCommitOperationTimeout;
517     }
518
519
520 }