cb06c898fd1940743c19b1763f2b173ec3dacbfc
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.pattern.AskTimeoutException;
20 import akka.util.Timeout;
21 import com.codahale.metrics.JmxReporter;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import com.google.common.util.concurrent.RateLimiter;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.Configuration;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
38 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.Future;
51 import scala.concurrent.duration.Duration;
52 import scala.concurrent.duration.FiniteDuration;
53
54 /**
55  * The ActorContext class contains utility methods which could be used by
56  * non-actors (like DistributedDataStore) to work with actors a little more
57  * easily. An ActorContext can be freely passed around to local object instances
58  * but should not be passed to actors especially remote actors
59  */
60 public class ActorContext {
61     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
62     private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
63     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
64     private static final String METRIC_RATE = "rate";
65     private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
66     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
67                                                               new Mapper<Throwable, Throwable>() {
68         @Override
69         public Throwable apply(Throwable failure) {
70             Throwable actualFailure = failure;
71             if(failure instanceof AskTimeoutException) {
72                 // A timeout exception most likely means the shard isn't initialized.
73                 actualFailure = new NotInitializedException(
74                         "Timed out trying to find the primary shard. Most likely cause is the " +
75                         "shard is not initialized yet.");
76             }
77
78             return actualFailure;
79         }
80     };
81     public static final String MAILBOX = "bounded-mailbox";
82
83     private final ActorSystem actorSystem;
84     private final ActorRef shardManager;
85     private final ClusterWrapper clusterWrapper;
86     private final Configuration configuration;
87     private final DatastoreContext datastoreContext;
88     private final FiniteDuration operationDuration;
89     private final Timeout operationTimeout;
90     private final String selfAddressHostPort;
91     private final RateLimiter txRateLimiter;
92     private final MetricRegistry metricRegistry = new MetricRegistry();
93     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
94     private final int transactionOutstandingOperationLimit;
95     private final Timeout transactionCommitOperationTimeout;
96
97     private volatile SchemaContext schemaContext;
98
99     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
100             ClusterWrapper clusterWrapper, Configuration configuration) {
101         this(actorSystem, shardManager, clusterWrapper, configuration,
102                 DatastoreContext.newBuilder().build());
103     }
104
105     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
106             ClusterWrapper clusterWrapper, Configuration configuration,
107             DatastoreContext datastoreContext) {
108         this.actorSystem = actorSystem;
109         this.shardManager = shardManager;
110         this.clusterWrapper = clusterWrapper;
111         this.configuration = configuration;
112         this.datastoreContext = datastoreContext;
113         this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
114
115         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
116         operationTimeout = new Timeout(operationDuration);
117         transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
118                 TimeUnit.SECONDS));
119
120
121         Address selfAddress = clusterWrapper.getSelfAddress();
122         if (selfAddress != null && !selfAddress.host().isEmpty()) {
123             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
124         } else {
125             selfAddressHostPort = null;
126         }
127
128         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
129         jmxReporter.start();
130     }
131
132     public DatastoreContext getDatastoreContext() {
133         return datastoreContext;
134     }
135
136     public ActorSystem getActorSystem() {
137         return actorSystem;
138     }
139
140     public ActorRef getShardManager() {
141         return shardManager;
142     }
143
144     public ActorSelection actorSelection(String actorPath) {
145         return actorSystem.actorSelection(actorPath);
146     }
147
148     public ActorSelection actorSelection(ActorPath actorPath) {
149         return actorSystem.actorSelection(actorPath);
150     }
151
152     public void setSchemaContext(SchemaContext schemaContext) {
153         this.schemaContext = schemaContext;
154
155         if(shardManager != null) {
156             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
157         }
158     }
159
160     public SchemaContext getSchemaContext() {
161         return schemaContext;
162     }
163
164     /**
165      * Finds the primary shard for the given shard name
166      *
167      * @param shardName
168      * @return
169      */
170     public Optional<ActorSelection> findPrimaryShard(String shardName) {
171         String path = findPrimaryPathOrNull(shardName);
172         if (path == null){
173             return Optional.absent();
174         }
175         return Optional.of(actorSystem.actorSelection(path));
176     }
177
178     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
179         Future<Object> future = executeOperationAsync(shardManager,
180                 new FindPrimary(shardName, true).toSerializable(),
181                 datastoreContext.getShardInitializationTimeout());
182
183         return future.transform(new Mapper<Object, ActorSelection>() {
184             @Override
185             public ActorSelection checkedApply(Object response) throws Exception {
186                 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
187                     PrimaryFound found = PrimaryFound.fromSerializable(response);
188
189                     LOG.debug("Primary found {}", found.getPrimaryPath());
190                     return actorSystem.actorSelection(found.getPrimaryPath());
191                 } else if(response instanceof ActorNotInitialized) {
192                     throw new NotInitializedException(
193                             String.format("Found primary shard %s but it's not initialized yet. " +
194                                           "Please try again later", shardName));
195                 } else if(response instanceof PrimaryNotFound) {
196                     throw new PrimaryNotFoundException(
197                             String.format("No primary shard found for %S.", shardName));
198                 }
199
200                 throw new UnknownMessageException(String.format(
201                         "FindPrimary returned unkown response: %s", response));
202             }
203         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
204     }
205
206     /**
207      * Finds a local shard given its shard name and return it's ActorRef
208      *
209      * @param shardName the name of the local shard that needs to be found
210      * @return a reference to a local shard actor which represents the shard
211      *         specified by the shardName
212      */
213     public Optional<ActorRef> findLocalShard(String shardName) {
214         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
215
216         if (result instanceof LocalShardFound) {
217             LocalShardFound found = (LocalShardFound) result;
218             LOG.debug("Local shard found {}", found.getPath());
219             return Optional.of(found.getPath());
220         }
221
222         return Optional.absent();
223     }
224
225     /**
226      * Finds a local shard async given its shard name and return a Future from which to obtain the
227      * ActorRef.
228      *
229      * @param shardName the name of the local shard that needs to be found
230      */
231     public Future<ActorRef> findLocalShardAsync( final String shardName) {
232         Future<Object> future = executeOperationAsync(shardManager,
233                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
234
235         return future.map(new Mapper<Object, ActorRef>() {
236             @Override
237             public ActorRef checkedApply(Object response) throws Throwable {
238                 if(response instanceof LocalShardFound) {
239                     LocalShardFound found = (LocalShardFound)response;
240                     LOG.debug("Local shard found {}", found.getPath());
241                     return found.getPath();
242                 } else if(response instanceof ActorNotInitialized) {
243                     throw new NotInitializedException(
244                             String.format("Found local shard for %s but it's not initialized yet.",
245                                     shardName));
246                 } else if(response instanceof LocalShardNotFound) {
247                     throw new LocalShardNotFoundException(
248                             String.format("Local shard for %s does not exist.", shardName));
249                 }
250
251                 throw new UnknownMessageException(String.format(
252                         "FindLocalShard returned unkown response: %s", response));
253             }
254         }, getActorSystem().dispatcher());
255     }
256
257     private String findPrimaryPathOrNull(String shardName) {
258         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
259
260         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
261             PrimaryFound found = PrimaryFound.fromSerializable(result);
262
263             LOG.debug("Primary found {}", found.getPrimaryPath());
264             return found.getPrimaryPath();
265
266         } else if (result.getClass().equals(ActorNotInitialized.class)){
267             throw new NotInitializedException(
268                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
269             );
270
271         } else {
272             return null;
273         }
274     }
275
276
277     /**
278      * Executes an operation on a local actor and wait for it's response
279      *
280      * @param actor
281      * @param message
282      * @return The response of the operation
283      */
284     public Object executeOperation(ActorRef actor, Object message) {
285         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
286
287         try {
288             return Await.result(future, operationDuration);
289         } catch (Exception e) {
290             throw new TimeoutException("Sending message " + message.getClass().toString() +
291                     " to actor " + actor.toString() + " failed. Try again later.", e);
292         }
293     }
294
295     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
296         Preconditions.checkArgument(actor != null, "actor must not be null");
297         Preconditions.checkArgument(message != null, "message must not be null");
298
299         LOG.debug("Sending message {} to {}", message.getClass(), actor);
300         return ask(actor, message, timeout);
301     }
302
303     /**
304      * Execute an operation on a remote actor and wait for it's response
305      *
306      * @param actor
307      * @param message
308      * @return
309      */
310     public Object executeOperation(ActorSelection actor, Object message) {
311         Future<Object> future = executeOperationAsync(actor, message);
312
313         try {
314             return Await.result(future, operationDuration);
315         } catch (Exception e) {
316             throw new TimeoutException("Sending message " + message.getClass().toString() +
317                     " to actor " + actor.toString() + " failed. Try again later.", e);
318         }
319     }
320
321     /**
322      * Execute an operation on a remote actor asynchronously.
323      *
324      * @param actor the ActorSelection
325      * @param message the message to send
326      * @param timeout the operation timeout
327      * @return a Future containing the eventual result
328      */
329     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
330             Timeout timeout) {
331         Preconditions.checkArgument(actor != null, "actor must not be null");
332         Preconditions.checkArgument(message != null, "message must not be null");
333
334         LOG.debug("Sending message {} to {}", message.getClass(), actor);
335
336         return ask(actor, message, timeout);
337     }
338
339     /**
340      * Execute an operation on a remote actor asynchronously.
341      *
342      * @param actor the ActorSelection
343      * @param message the message to send
344      * @return a Future containing the eventual result
345      */
346     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
347         return executeOperationAsync(actor, message, operationTimeout);
348     }
349
350     /**
351      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
352      * reply (essentially set and forget).
353      *
354      * @param actor the ActorSelection
355      * @param message the message to send
356      */
357     public void sendOperationAsync(ActorSelection actor, Object message) {
358         Preconditions.checkArgument(actor != null, "actor must not be null");
359         Preconditions.checkArgument(message != null, "message must not be null");
360
361         LOG.debug("Sending message {} to {}", message.getClass(), actor);
362
363         actor.tell(message, ActorRef.noSender());
364     }
365
366     public void shutdown() {
367         shardManager.tell(PoisonPill.getInstance(), null);
368         actorSystem.shutdown();
369     }
370
371     public ClusterWrapper getClusterWrapper() {
372         return clusterWrapper;
373     }
374
375     public String getCurrentMemberName(){
376         return clusterWrapper.getCurrentMemberName();
377     }
378
379     /**
380      * Send the message to each and every shard
381      *
382      * @param message
383      */
384     public void broadcast(Object message){
385         for(String shardName : configuration.getAllShardNames()){
386
387             Optional<ActorSelection> primary = findPrimaryShard(shardName);
388             if (primary.isPresent()) {
389                 primary.get().tell(message, ActorRef.noSender());
390             } else {
391                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
392                         message.getClass().getSimpleName(), shardName);
393             }
394         }
395     }
396
397     public FiniteDuration getOperationDuration() {
398         return operationDuration;
399     }
400
401     public boolean isPathLocal(String path) {
402         if (Strings.isNullOrEmpty(path)) {
403             return false;
404         }
405
406         int pathAtIndex = path.indexOf('@');
407         if (pathAtIndex == -1) {
408             //if the path is of local format, then its local and is co-located
409             return true;
410
411         } else if (selfAddressHostPort != null) {
412             // self-address and tx actor path, both are of remote path format
413             int slashIndex = path.indexOf('/', pathAtIndex);
414
415             if (slashIndex == -1) {
416                 return false;
417             }
418
419             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
420             return hostPort.equals(selfAddressHostPort);
421
422         } else {
423             // self address is local format and tx actor path is remote format
424             return false;
425         }
426     }
427
428     /**
429      * @deprecated This method is present only to support backward compatibility with Helium and should not be
430      * used any further
431      *
432      *
433      * @param primaryPath
434      * @param localPathOfRemoteActor
435      * @return
436     */
437     @Deprecated
438     public String resolvePath(final String primaryPath,
439                                             final String localPathOfRemoteActor) {
440         StringBuilder builder = new StringBuilder();
441         String[] primaryPathElements = primaryPath.split("/");
442         builder.append(primaryPathElements[0]).append("//")
443             .append(primaryPathElements[1]).append(primaryPathElements[2]);
444         String[] remotePathElements = localPathOfRemoteActor.split("/");
445         for (int i = 3; i < remotePathElements.length; i++) {
446                 builder.append("/").append(remotePathElements[i]);
447             }
448
449         return builder.toString();
450     }
451
452     /**
453      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
454      * should begin throttling the operations
455      *
456      * Parking reading this configuration here because we need to get to the actor system settings
457      *
458      * @return
459      */
460     public int getTransactionOutstandingOperationLimit(){
461         return transactionOutstandingOperationLimit;
462     }
463
464     /**
465      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
466      * us to create a timer for pretty much anything.
467      *
468      * @param operationName
469      * @return
470      */
471     public Timer getOperationTimer(String operationName){
472         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
473         return metricRegistry.timer(rate);
474     }
475
476     /**
477      * Get the type of the data store to which this ActorContext belongs
478      *
479      * @return
480      */
481     public String getDataStoreType() {
482         return datastoreContext.getDataStoreType();
483     }
484
485     /**
486      * Set the number of transaction creation permits that are to be allowed
487      *
488      * @param permitsPerSecond
489      */
490     public void setTxCreationLimit(double permitsPerSecond){
491         txRateLimiter.setRate(permitsPerSecond);
492     }
493
494     /**
495      * Get the current transaction creation rate limit
496      * @return
497      */
498     public double getTxCreationLimit(){
499         return txRateLimiter.getRate();
500     }
501
502     /**
503      * Try to acquire a transaction creation permit. Will block if no permits are available.
504      */
505     public void acquireTxCreationPermit(){
506         txRateLimiter.acquire();
507     }
508
509     /**
510      * Return the operation timeout to be used when committing transactions
511      * @return
512      */
513     public Timeout getTransactionCommitOperationTimeout(){
514         return transactionCommitOperationTimeout;
515     }
516
517
518 }