Merge "Bug 2697: Improvement wrong response handling, missing message"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.pattern.AskTimeoutException;
20 import akka.util.Timeout;
21 import com.codahale.metrics.JmxReporter;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import com.google.common.util.concurrent.RateLimiter;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.Configuration;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
38 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.ExecutionContext;
51 import scala.concurrent.Future;
52 import scala.concurrent.duration.Duration;
53 import scala.concurrent.duration.FiniteDuration;
54
55 /**
56  * The ActorContext class contains utility methods which could be used by
57  * non-actors (like DistributedDataStore) to work with actors a little more
58  * easily. An ActorContext can be freely passed around to local object instances
59  * but should not be passed to actors especially remote actors
60  */
61 public class ActorContext {
62     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
63     private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
64     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
65     private static final String METRIC_RATE = "rate";
66     private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
67     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
68                                                               new Mapper<Throwable, Throwable>() {
69         @Override
70         public Throwable apply(Throwable failure) {
71             Throwable actualFailure = failure;
72             if(failure instanceof AskTimeoutException) {
73                 // A timeout exception most likely means the shard isn't initialized.
74                 actualFailure = new NotInitializedException(
75                         "Timed out trying to find the primary shard. Most likely cause is the " +
76                         "shard is not initialized yet.");
77             }
78
79             return actualFailure;
80         }
81     };
82     public static final String MAILBOX = "bounded-mailbox";
83
84     private final ActorSystem actorSystem;
85     private final ActorRef shardManager;
86     private final ClusterWrapper clusterWrapper;
87     private final Configuration configuration;
88     private final DatastoreContext datastoreContext;
89     private final FiniteDuration operationDuration;
90     private final Timeout operationTimeout;
91     private final String selfAddressHostPort;
92     private final RateLimiter txRateLimiter;
93     private final MetricRegistry metricRegistry = new MetricRegistry();
94     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
95     private final int transactionOutstandingOperationLimit;
96     private final Timeout transactionCommitOperationTimeout;
97     private final Dispatchers dispatchers;
98
99     private volatile SchemaContext schemaContext;
100
101     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
102             ClusterWrapper clusterWrapper, Configuration configuration) {
103         this(actorSystem, shardManager, clusterWrapper, configuration,
104                 DatastoreContext.newBuilder().build());
105     }
106
107     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
108             ClusterWrapper clusterWrapper, Configuration configuration,
109             DatastoreContext datastoreContext) {
110         this.actorSystem = actorSystem;
111         this.shardManager = shardManager;
112         this.clusterWrapper = clusterWrapper;
113         this.configuration = configuration;
114         this.datastoreContext = datastoreContext;
115         this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
116         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
117
118         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
119         operationTimeout = new Timeout(operationDuration);
120         transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
121                 TimeUnit.SECONDS));
122
123
124         Address selfAddress = clusterWrapper.getSelfAddress();
125         if (selfAddress != null && !selfAddress.host().isEmpty()) {
126             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
127         } else {
128             selfAddressHostPort = null;
129         }
130
131         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
132         jmxReporter.start();
133
134     }
135
136     public DatastoreContext getDatastoreContext() {
137         return datastoreContext;
138     }
139
140     public ActorSystem getActorSystem() {
141         return actorSystem;
142     }
143
144     public ActorRef getShardManager() {
145         return shardManager;
146     }
147
148     public ActorSelection actorSelection(String actorPath) {
149         return actorSystem.actorSelection(actorPath);
150     }
151
152     public ActorSelection actorSelection(ActorPath actorPath) {
153         return actorSystem.actorSelection(actorPath);
154     }
155
156     public void setSchemaContext(SchemaContext schemaContext) {
157         this.schemaContext = schemaContext;
158
159         if(shardManager != null) {
160             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
161         }
162     }
163
164     public SchemaContext getSchemaContext() {
165         return schemaContext;
166     }
167
168     /**
169      * Finds the primary shard for the given shard name
170      *
171      * @param shardName
172      * @return
173      */
174     public Optional<ActorSelection> findPrimaryShard(String shardName) {
175         String path = findPrimaryPathOrNull(shardName);
176         if (path == null){
177             return Optional.absent();
178         }
179         return Optional.of(actorSystem.actorSelection(path));
180     }
181
182     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
183         Future<Object> future = executeOperationAsync(shardManager,
184                 new FindPrimary(shardName, true).toSerializable(),
185                 datastoreContext.getShardInitializationTimeout());
186
187         return future.transform(new Mapper<Object, ActorSelection>() {
188             @Override
189             public ActorSelection checkedApply(Object response) throws Exception {
190                 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
191                     PrimaryFound found = PrimaryFound.fromSerializable(response);
192
193                     LOG.debug("Primary found {}", found.getPrimaryPath());
194                     return actorSystem.actorSelection(found.getPrimaryPath());
195                 } else if(response instanceof ActorNotInitialized) {
196                     throw new NotInitializedException(
197                             String.format("Found primary shard %s but it's not initialized yet. " +
198                                           "Please try again later", shardName));
199                 } else if(response instanceof PrimaryNotFound) {
200                     throw new PrimaryNotFoundException(
201                             String.format("No primary shard found for %S.", shardName));
202                 }
203
204                 throw new UnknownMessageException(String.format(
205                         "FindPrimary returned unkown response: %s", response));
206             }
207         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
208     }
209
210     /**
211      * Finds a local shard given its shard name and return it's ActorRef
212      *
213      * @param shardName the name of the local shard that needs to be found
214      * @return a reference to a local shard actor which represents the shard
215      *         specified by the shardName
216      */
217     public Optional<ActorRef> findLocalShard(String shardName) {
218         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
219
220         if (result instanceof LocalShardFound) {
221             LocalShardFound found = (LocalShardFound) result;
222             LOG.debug("Local shard found {}", found.getPath());
223             return Optional.of(found.getPath());
224         }
225
226         return Optional.absent();
227     }
228
229     /**
230      * Finds a local shard async given its shard name and return a Future from which to obtain the
231      * ActorRef.
232      *
233      * @param shardName the name of the local shard that needs to be found
234      */
235     public Future<ActorRef> findLocalShardAsync( final String shardName) {
236         Future<Object> future = executeOperationAsync(shardManager,
237                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
238
239         return future.map(new Mapper<Object, ActorRef>() {
240             @Override
241             public ActorRef checkedApply(Object response) throws Throwable {
242                 if(response instanceof LocalShardFound) {
243                     LocalShardFound found = (LocalShardFound)response;
244                     LOG.debug("Local shard found {}", found.getPath());
245                     return found.getPath();
246                 } else if(response instanceof ActorNotInitialized) {
247                     throw new NotInitializedException(
248                             String.format("Found local shard for %s but it's not initialized yet.",
249                                     shardName));
250                 } else if(response instanceof LocalShardNotFound) {
251                     throw new LocalShardNotFoundException(
252                             String.format("Local shard for %s does not exist.", shardName));
253                 }
254
255                 throw new UnknownMessageException(String.format(
256                         "FindLocalShard returned unkown response: %s", response));
257             }
258         }, getClientDispatcher());
259     }
260
261     private String findPrimaryPathOrNull(String shardName) {
262         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
263
264         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
265             PrimaryFound found = PrimaryFound.fromSerializable(result);
266
267             LOG.debug("Primary found {}", found.getPrimaryPath());
268             return found.getPrimaryPath();
269
270         } else if (result.getClass().equals(ActorNotInitialized.class)){
271             throw new NotInitializedException(
272                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
273             );
274
275         } else {
276             return null;
277         }
278     }
279
280
281     /**
282      * Executes an operation on a local actor and wait for it's response
283      *
284      * @param actor
285      * @param message
286      * @return The response of the operation
287      */
288     public Object executeOperation(ActorRef actor, Object message) {
289         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
290
291         try {
292             return Await.result(future, operationDuration);
293         } catch (Exception e) {
294             throw new TimeoutException("Sending message " + message.getClass().toString() +
295                     " to actor " + actor.toString() + " failed. Try again later.", e);
296         }
297     }
298
299     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
300         Preconditions.checkArgument(actor != null, "actor must not be null");
301         Preconditions.checkArgument(message != null, "message must not be null");
302
303         LOG.debug("Sending message {} to {}", message.getClass(), actor);
304         return ask(actor, message, timeout);
305     }
306
307     /**
308      * Execute an operation on a remote actor and wait for it's response
309      *
310      * @param actor
311      * @param message
312      * @return
313      */
314     public Object executeOperation(ActorSelection actor, Object message) {
315         Future<Object> future = executeOperationAsync(actor, message);
316
317         try {
318             return Await.result(future, operationDuration);
319         } catch (Exception e) {
320             throw new TimeoutException("Sending message " + message.getClass().toString() +
321                     " to actor " + actor.toString() + " failed. Try again later.", e);
322         }
323     }
324
325     /**
326      * Execute an operation on a remote actor asynchronously.
327      *
328      * @param actor the ActorSelection
329      * @param message the message to send
330      * @param timeout the operation timeout
331      * @return a Future containing the eventual result
332      */
333     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
334             Timeout timeout) {
335         Preconditions.checkArgument(actor != null, "actor must not be null");
336         Preconditions.checkArgument(message != null, "message must not be null");
337
338         LOG.debug("Sending message {} to {}", message.getClass(), actor);
339
340         return ask(actor, message, timeout);
341     }
342
343     /**
344      * Execute an operation on a remote actor asynchronously.
345      *
346      * @param actor the ActorSelection
347      * @param message the message to send
348      * @return a Future containing the eventual result
349      */
350     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
351         return executeOperationAsync(actor, message, operationTimeout);
352     }
353
354     /**
355      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
356      * reply (essentially set and forget).
357      *
358      * @param actor the ActorSelection
359      * @param message the message to send
360      */
361     public void sendOperationAsync(ActorSelection actor, Object message) {
362         Preconditions.checkArgument(actor != null, "actor must not be null");
363         Preconditions.checkArgument(message != null, "message must not be null");
364
365         LOG.debug("Sending message {} to {}", message.getClass(), actor);
366
367         actor.tell(message, ActorRef.noSender());
368     }
369
370     public void shutdown() {
371         shardManager.tell(PoisonPill.getInstance(), null);
372         actorSystem.shutdown();
373     }
374
375     public ClusterWrapper getClusterWrapper() {
376         return clusterWrapper;
377     }
378
379     public String getCurrentMemberName(){
380         return clusterWrapper.getCurrentMemberName();
381     }
382
383     /**
384      * Send the message to each and every shard
385      *
386      * @param message
387      */
388     public void broadcast(Object message){
389         for(String shardName : configuration.getAllShardNames()){
390
391             Optional<ActorSelection> primary = findPrimaryShard(shardName);
392             if (primary.isPresent()) {
393                 primary.get().tell(message, ActorRef.noSender());
394             } else {
395                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
396                         message.getClass().getSimpleName(), shardName);
397             }
398         }
399     }
400
401     public FiniteDuration getOperationDuration() {
402         return operationDuration;
403     }
404
405     public boolean isPathLocal(String path) {
406         if (Strings.isNullOrEmpty(path)) {
407             return false;
408         }
409
410         int pathAtIndex = path.indexOf('@');
411         if (pathAtIndex == -1) {
412             //if the path is of local format, then its local and is co-located
413             return true;
414
415         } else if (selfAddressHostPort != null) {
416             // self-address and tx actor path, both are of remote path format
417             int slashIndex = path.indexOf('/', pathAtIndex);
418
419             if (slashIndex == -1) {
420                 return false;
421             }
422
423             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
424             return hostPort.equals(selfAddressHostPort);
425
426         } else {
427             // self address is local format and tx actor path is remote format
428             return false;
429         }
430     }
431
432     /**
433      * @deprecated This method is present only to support backward compatibility with Helium and should not be
434      * used any further
435      *
436      *
437      * @param primaryPath
438      * @param localPathOfRemoteActor
439      * @return
440     */
441     @Deprecated
442     public String resolvePath(final String primaryPath,
443                                             final String localPathOfRemoteActor) {
444         StringBuilder builder = new StringBuilder();
445         String[] primaryPathElements = primaryPath.split("/");
446         builder.append(primaryPathElements[0]).append("//")
447             .append(primaryPathElements[1]).append(primaryPathElements[2]);
448         String[] remotePathElements = localPathOfRemoteActor.split("/");
449         for (int i = 3; i < remotePathElements.length; i++) {
450                 builder.append("/").append(remotePathElements[i]);
451             }
452
453         return builder.toString();
454     }
455
456     /**
457      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
458      * should begin throttling the operations
459      *
460      * Parking reading this configuration here because we need to get to the actor system settings
461      *
462      * @return
463      */
464     public int getTransactionOutstandingOperationLimit(){
465         return transactionOutstandingOperationLimit;
466     }
467
468     /**
469      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
470      * us to create a timer for pretty much anything.
471      *
472      * @param operationName
473      * @return
474      */
475     public Timer getOperationTimer(String operationName){
476         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
477         return metricRegistry.timer(rate);
478     }
479
480     /**
481      * Get the type of the data store to which this ActorContext belongs
482      *
483      * @return
484      */
485     public String getDataStoreType() {
486         return datastoreContext.getDataStoreType();
487     }
488
489     /**
490      * Set the number of transaction creation permits that are to be allowed
491      *
492      * @param permitsPerSecond
493      */
494     public void setTxCreationLimit(double permitsPerSecond){
495         txRateLimiter.setRate(permitsPerSecond);
496     }
497
498     /**
499      * Get the current transaction creation rate limit
500      * @return
501      */
502     public double getTxCreationLimit(){
503         return txRateLimiter.getRate();
504     }
505
506     /**
507      * Try to acquire a transaction creation permit. Will block if no permits are available.
508      */
509     public void acquireTxCreationPermit(){
510         txRateLimiter.acquire();
511     }
512
513     /**
514      * Return the operation timeout to be used when committing transactions
515      * @return
516      */
517     public Timeout getTransactionCommitOperationTimeout(){
518         return transactionCommitOperationTimeout;
519     }
520
521     /**
522      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
523      * code on the datastore
524      * @return
525      */
526     public ExecutionContext getClientDispatcher() {
527         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
528     }
529
530     public String getNotificationDispatcherPath(){
531         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
532     }
533
534 }