BUG-2138: Create DistributedShardFrontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorPath;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.pattern.Patterns;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Strings;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
32 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
50 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
51 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
52 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
53 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
54 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.Await;
59 import scala.concurrent.ExecutionContext;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.Duration;
62 import scala.concurrent.duration.FiniteDuration;
63
64 /**
65  * The ActorContext class contains utility methods which could be used by
66  * non-actors (like DistributedDataStore) to work with actors a little more
67  * easily. An ActorContext can be freely passed around to local object instances
68  * but should not be passed to actors especially remote actors
69  */
70 public class ActorContext {
71     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
72     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
73     private static final String METRIC_RATE = "rate";
74     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
75                                                               new Mapper<Throwable, Throwable>() {
76         @Override
77         public Throwable apply(Throwable failure) {
78             Throwable actualFailure = failure;
79             if (failure instanceof AskTimeoutException) {
80                 // A timeout exception most likely means the shard isn't initialized.
81                 actualFailure = new NotInitializedException(
82                         "Timed out trying to find the primary shard. Most likely cause is the "
83                         + "shard is not initialized yet.");
84             }
85
86             return actualFailure;
87         }
88     };
89     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
90     public static final String COMMIT = "commit";
91
92     private final ActorSystem actorSystem;
93     private final ActorRef shardManager;
94     private final ClusterWrapper clusterWrapper;
95     private final Configuration configuration;
96     private DatastoreContext datastoreContext;
97     private FiniteDuration operationDuration;
98     private Timeout operationTimeout;
99     private final String selfAddressHostPort;
100     private TransactionRateLimiter txRateLimiter;
101     private Timeout transactionCommitOperationTimeout;
102     private Timeout shardInitializationTimeout;
103     private final Dispatchers dispatchers;
104
105     private volatile SchemaContext schemaContext;
106
107     // Used as a write memory barrier.
108     @SuppressWarnings("unused")
109     private volatile boolean updated;
110
111     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
112             .getMetricsRegistry();
113
114     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
115     private final ShardStrategyFactory shardStrategyFactory;
116
117     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
118             ClusterWrapper clusterWrapper, Configuration configuration) {
119         this(actorSystem, shardManager, clusterWrapper, configuration,
120                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
121     }
122
123     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
124             ClusterWrapper clusterWrapper, Configuration configuration,
125             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
126         this.actorSystem = actorSystem;
127         this.shardManager = shardManager;
128         this.clusterWrapper = clusterWrapper;
129         this.configuration = configuration;
130         this.datastoreContext = datastoreContext;
131         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
132         this.primaryShardInfoCache = primaryShardInfoCache;
133
134         final LogicalDatastoreType convertedType =
135                 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
136         this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
137
138         setCachedProperties();
139
140         Address selfAddress = clusterWrapper.getSelfAddress();
141         if (selfAddress != null && !selfAddress.host().isEmpty()) {
142             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
143         } else {
144             selfAddressHostPort = null;
145         }
146
147     }
148
149     private void setCachedProperties() {
150         txRateLimiter = new TransactionRateLimiter(this);
151
152         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
153         operationTimeout = new Timeout(operationDuration);
154
155         transactionCommitOperationTimeout =  new Timeout(Duration.create(
156                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
157
158         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
159     }
160
161     public DatastoreContext getDatastoreContext() {
162         return datastoreContext;
163     }
164
165     public ActorSystem getActorSystem() {
166         return actorSystem;
167     }
168
169     public ActorRef getShardManager() {
170         return shardManager;
171     }
172
173     public ActorSelection actorSelection(String actorPath) {
174         return actorSystem.actorSelection(actorPath);
175     }
176
177     public ActorSelection actorSelection(ActorPath actorPath) {
178         return actorSystem.actorSelection(actorPath);
179     }
180
181     public void setSchemaContext(SchemaContext schemaContext) {
182         this.schemaContext = schemaContext;
183
184         if (shardManager != null) {
185             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
186         }
187     }
188
189     public void setDatastoreContext(DatastoreContextFactory contextFactory) {
190         this.datastoreContext = contextFactory.getBaseDatastoreContext();
191         setCachedProperties();
192
193         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
194         // will be published immediately even though they may not be immediately visible to other
195         // threads due to unsynchronized reads. That's OK though - we're going for eventual
196         // consistency here as immediately visible updates to these members aren't critical. These
197         // members could've been made volatile but wanted to avoid volatile reads as these are
198         // accessed often and updates will be infrequent.
199
200         updated = true;
201
202         if (shardManager != null) {
203             shardManager.tell(contextFactory, ActorRef.noSender());
204         }
205     }
206
207     public SchemaContext getSchemaContext() {
208         return schemaContext;
209     }
210
211     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
212         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
213         if (ret != null) {
214             return ret;
215         }
216         Future<Object> future = executeOperationAsync(shardManager,
217                 new FindPrimary(shardName, true), shardInitializationTimeout);
218
219         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
220             @Override
221             public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
222                 if (response instanceof RemotePrimaryShardFound) {
223                     LOG.debug("findPrimaryShardAsync received: {}", response);
224                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
225                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
226                 } else if (response instanceof LocalPrimaryShardFound) {
227                     LOG.debug("findPrimaryShardAsync received: {}", response);
228                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
229                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
230                             found.getLocalShardDataTree());
231                 } else if (response instanceof NotInitializedException) {
232                     throw (NotInitializedException)response;
233                 } else if (response instanceof PrimaryNotFoundException) {
234                     throw (PrimaryNotFoundException)response;
235                 } else if (response instanceof NoShardLeaderException) {
236                     throw (NoShardLeaderException)response;
237                 }
238
239                 throw new UnknownMessageException(String.format(
240                         "FindPrimary returned unkown response: %s", response));
241             }
242         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
243     }
244
245     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
246             short primaryVersion, DataTree localShardDataTree) {
247         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
248         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
249             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
250         primaryShardInfoCache.putSuccessful(shardName, info);
251         return info;
252     }
253
254     /**
255      * Finds a local shard given its shard name and return it's ActorRef.
256      *
257      * @param shardName the name of the local shard that needs to be found
258      * @return a reference to a local shard actor which represents the shard
259      *         specified by the shardName
260      */
261     public Optional<ActorRef> findLocalShard(String shardName) {
262         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
263
264         if (result instanceof LocalShardFound) {
265             LocalShardFound found = (LocalShardFound) result;
266             LOG.debug("Local shard found {}", found.getPath());
267             return Optional.of(found.getPath());
268         }
269
270         return Optional.absent();
271     }
272
273     /**
274      * Finds a local shard async given its shard name and return a Future from which to obtain the
275      * ActorRef.
276      *
277      * @param shardName the name of the local shard that needs to be found
278      */
279     public Future<ActorRef> findLocalShardAsync(final String shardName) {
280         Future<Object> future = executeOperationAsync(shardManager,
281                 new FindLocalShard(shardName, true), shardInitializationTimeout);
282
283         return future.map(new Mapper<Object, ActorRef>() {
284             @Override
285             public ActorRef checkedApply(Object response) throws Throwable {
286                 if (response instanceof LocalShardFound) {
287                     LocalShardFound found = (LocalShardFound)response;
288                     LOG.debug("Local shard found {}", found.getPath());
289                     return found.getPath();
290                 } else if (response instanceof NotInitializedException) {
291                     throw (NotInitializedException)response;
292                 } else if (response instanceof LocalShardNotFound) {
293                     throw new LocalShardNotFoundException(
294                             String.format("Local shard for %s does not exist.", shardName));
295                 }
296
297                 throw new UnknownMessageException(String.format(
298                         "FindLocalShard returned unkown response: %s", response));
299             }
300         }, getClientDispatcher());
301     }
302
303     /**
304      * Executes an operation on a local actor and wait for it's response.
305      *
306      * @param actor the actor
307      * @param message the message to send
308      * @return The response of the operation
309      */
310     @SuppressWarnings("checkstyle:IllegalCatch")
311     public Object executeOperation(ActorRef actor, Object message) {
312         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
313
314         try {
315             return Await.result(future, operationDuration);
316         } catch (Exception e) {
317             throw new TimeoutException("Sending message " + message.getClass().toString()
318                     + " to actor " + actor.toString() + " failed. Try again later.", e);
319         }
320     }
321
322     /**
323      * Execute an operation on a remote actor and wait for it's response.
324      *
325      * @param actor the actor
326      * @param message the message
327      * @return the response message
328      */
329     @SuppressWarnings("checkstyle:IllegalCatch")
330     public Object executeOperation(ActorSelection actor, Object message) {
331         Future<Object> future = executeOperationAsync(actor, message);
332
333         try {
334             return Await.result(future, operationDuration);
335         } catch (Exception e) {
336             throw new TimeoutException("Sending message " + message.getClass().toString()
337                     + " to actor " + actor.toString() + " failed. Try again later.", e);
338         }
339     }
340
341     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
342         Preconditions.checkArgument(actor != null, "actor must not be null");
343         Preconditions.checkArgument(message != null, "message must not be null");
344
345         LOG.debug("Sending message {} to {}", message.getClass(), actor);
346         return doAsk(actor, message, timeout);
347     }
348
349     /**
350      * Execute an operation on a remote actor asynchronously.
351      *
352      * @param actor the ActorSelection
353      * @param message the message to send
354      * @param timeout the operation timeout
355      * @return a Future containing the eventual result
356      */
357     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
358             Timeout timeout) {
359         Preconditions.checkArgument(actor != null, "actor must not be null");
360         Preconditions.checkArgument(message != null, "message must not be null");
361
362         LOG.debug("Sending message {} to {}", message.getClass(), actor);
363
364         return doAsk(actor, message, timeout);
365     }
366
367     /**
368      * Execute an operation on a remote actor asynchronously.
369      *
370      * @param actor the ActorSelection
371      * @param message the message to send
372      * @return a Future containing the eventual result
373      */
374     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
375         return executeOperationAsync(actor, message, operationTimeout);
376     }
377
378     /**
379      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
380      * reply (essentially set and forget).
381      *
382      * @param actor the ActorSelection
383      * @param message the message to send
384      */
385     public void sendOperationAsync(ActorSelection actor, Object message) {
386         Preconditions.checkArgument(actor != null, "actor must not be null");
387         Preconditions.checkArgument(message != null, "message must not be null");
388
389         LOG.debug("Sending message {} to {}", message.getClass(), actor);
390
391         actor.tell(message, ActorRef.noSender());
392     }
393
394     @SuppressWarnings("checkstyle:IllegalCatch")
395     public void shutdown() {
396         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
397         try {
398             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
399         } catch (Exception e) {
400             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
401         }
402     }
403
404     public ClusterWrapper getClusterWrapper() {
405         return clusterWrapper;
406     }
407
408     public MemberName getCurrentMemberName() {
409         return clusterWrapper.getCurrentMemberName();
410     }
411
412     /**
413      * Send the message to each and every shard.
414      */
415     public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass) {
416         for (final String shardName : configuration.getAllShardNames()) {
417
418             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
419             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
420                 @Override
421                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
422                     if (failure != null) {
423                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
424                             messageClass.getSimpleName(), shardName, failure);
425                     } else {
426                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
427                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
428                     }
429                 }
430             }, getClientDispatcher());
431         }
432     }
433
434     public FiniteDuration getOperationDuration() {
435         return operationDuration;
436     }
437
438     public Timeout getOperationTimeout() {
439         return operationTimeout;
440     }
441
442     public boolean isPathLocal(String path) {
443         if (Strings.isNullOrEmpty(path)) {
444             return false;
445         }
446
447         int pathAtIndex = path.indexOf('@');
448         if (pathAtIndex == -1) {
449             //if the path is of local format, then its local and is co-located
450             return true;
451
452         } else if (selfAddressHostPort != null) {
453             // self-address and tx actor path, both are of remote path format
454             int slashIndex = path.indexOf('/', pathAtIndex);
455
456             if (slashIndex == -1) {
457                 return false;
458             }
459
460             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
461             return hostPort.equals(selfAddressHostPort);
462
463         } else {
464             // self address is local format and tx actor path is remote format
465             return false;
466         }
467     }
468
469     /**
470      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
471      * us to create a timer for pretty much anything.
472      *
473      * @param operationName the name of the operation
474      * @return the Timer instance
475      */
476     public Timer getOperationTimer(String operationName) {
477         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
478     }
479
480     public Timer getOperationTimer(String dataStoreType, String operationName) {
481         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
482                 operationName, METRIC_RATE);
483         return metricRegistry.timer(rate);
484     }
485
486     /**
487      * Get the name of the data store to which this ActorContext belongs.
488      *
489      * @return the data store name
490      */
491     public String getDataStoreName() {
492         return datastoreContext.getDataStoreName();
493     }
494
495     /**
496      * Get the current transaction creation rate limit.
497      *
498      * @return the rate limit
499      */
500     public double getTxCreationLimit() {
501         return txRateLimiter.getTxCreationLimit();
502     }
503
504     /**
505      * Try to acquire a transaction creation permit. Will block if no permits are available.
506      */
507     public void acquireTxCreationPermit() {
508         txRateLimiter.acquire();
509     }
510
511     /**
512      * Returns the operation timeout to be used when committing transactions.
513      *
514      * @return the operation timeout
515      */
516     public Timeout getTransactionCommitOperationTimeout() {
517         return transactionCommitOperationTimeout;
518     }
519
520     /**
521      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
522      * code on the datastore.
523      *
524      * @return the dispatcher
525      */
526     public ExecutionContext getClientDispatcher() {
527         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
528     }
529
530     public String getNotificationDispatcherPath() {
531         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
532     }
533
534     public Configuration getConfiguration() {
535         return configuration;
536     }
537
538     public ShardStrategyFactory getShardStrategyFactory() {
539         return shardStrategyFactory;
540     }
541
542     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
543         return ask(actorRef, message, timeout);
544     }
545
546     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout) {
547         return ask(actorRef, message, timeout);
548     }
549
550     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
551         return primaryShardInfoCache;
552     }
553 }