Merge "Issue fix for config subsystem"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
20 import org.opendaylight.controller.cluster.datastore.Configuration;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
23 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
24 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
25 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
26 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
27 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
28 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
29 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.Duration;
35 import scala.concurrent.duration.FiniteDuration;
36
37 import java.util.concurrent.TimeUnit;
38
39 import static akka.pattern.Patterns.ask;
40
41 /**
42  * The ActorContext class contains utility methods which could be used by
43  * non-actors (like DistributedDataStore) to work with actors a little more
44  * easily. An ActorContext can be freely passed around to local object instances
45  * but should not be passed to actors especially remote actors
46  */
47 public class ActorContext {
48     private static final Logger
49         LOG = LoggerFactory.getLogger(ActorContext.class);
50
51     private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
52
53     public static final String MAILBOX = "bounded-mailbox";
54
55     private final ActorSystem actorSystem;
56     private final ActorRef shardManager;
57     private final ClusterWrapper clusterWrapper;
58     private final Configuration configuration;
59     private volatile SchemaContext schemaContext;
60     private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
61     private Timeout operationTimeout = new Timeout(operationDuration);
62
63     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
64         ClusterWrapper clusterWrapper,
65         Configuration configuration) {
66         this.actorSystem = actorSystem;
67         this.shardManager = shardManager;
68         this.clusterWrapper = clusterWrapper;
69         this.configuration = configuration;
70     }
71
72     public ActorSystem getActorSystem() {
73         return actorSystem;
74     }
75
76     public ActorRef getShardManager() {
77         return shardManager;
78     }
79
80     public ActorSelection actorSelection(String actorPath) {
81         return actorSystem.actorSelection(actorPath);
82     }
83
84     public ActorSelection actorSelection(ActorPath actorPath) {
85         return actorSystem.actorSelection(actorPath);
86     }
87
88     public void setSchemaContext(SchemaContext schemaContext) {
89         this.schemaContext = schemaContext;
90
91         if(shardManager != null) {
92             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
93         }
94     }
95
96     public void setOperationTimeout(int timeoutInSeconds) {
97         operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
98         operationTimeout = new Timeout(operationDuration);
99     }
100
101     public SchemaContext getSchemaContext() {
102         return schemaContext;
103     }
104
105     /**
106      * Finds the primary shard for the given shard name
107      *
108      * @param shardName
109      * @return
110      */
111     public Optional<ActorSelection> findPrimaryShard(String shardName) {
112         String path = findPrimaryPathOrNull(shardName);
113         if (path == null){
114             return Optional.absent();
115         }
116         return Optional.of(actorSystem.actorSelection(path));
117     }
118
119     /**
120      * Finds a local shard given it's shard name and return it's ActorRef
121      *
122      * @param shardName the name of the local shard that needs to be found
123      * @return a reference to a local shard actor which represents the shard
124      *         specified by the shardName
125      */
126     public Optional<ActorRef> findLocalShard(String shardName) {
127         Object result = executeOperation(shardManager, new FindLocalShard(shardName));
128
129         if (result instanceof LocalShardFound) {
130             LocalShardFound found = (LocalShardFound) result;
131             LOG.debug("Local shard found {}", found.getPath());
132             return Optional.of(found.getPath());
133         }
134
135         return Optional.absent();
136     }
137
138
139     private String findPrimaryPathOrNull(String shardName) {
140         Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
141
142         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
143             PrimaryFound found = PrimaryFound.fromSerializable(result);
144
145             LOG.debug("Primary found {}", found.getPrimaryPath());
146             return found.getPrimaryPath();
147
148         } else if (result.getClass().equals(ActorNotInitialized.class)){
149             throw new NotInitializedException(
150                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
151             );
152
153         } else {
154             return null;
155         }
156     }
157
158
159     /**
160      * Executes an operation on a local actor and wait for it's response
161      *
162      * @param actor
163      * @param message
164      * @return The response of the operation
165      */
166     public Object executeOperation(ActorRef actor, Object message) {
167         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
168
169         try {
170             return Await.result(future, operationDuration);
171         } catch (Exception e) {
172             throw new TimeoutException("Sending message " + message.getClass().toString() +
173                     " to actor " + actor.toString() + " failed. Try again later.", e);
174         }
175     }
176
177     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
178         Preconditions.checkArgument(actor != null, "actor must not be null");
179         Preconditions.checkArgument(message != null, "message must not be null");
180
181         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
182         return ask(actor, message, timeout);
183     }
184
185     /**
186      * Execute an operation on a remote actor and wait for it's response
187      *
188      * @param actor
189      * @param message
190      * @return
191      */
192     public Object executeOperation(ActorSelection actor, Object message) {
193         Future<Object> future = executeOperationAsync(actor, message);
194
195         try {
196             return Await.result(future, operationDuration);
197         } catch (Exception e) {
198             throw new TimeoutException("Sending message " + message.getClass().toString() +
199                     " to actor " + actor.toString() + " failed. Try again later.", e);
200         }
201     }
202
203     /**
204      * Execute an operation on a remote actor asynchronously.
205      *
206      * @param actor the ActorSelection
207      * @param message the message to send
208      * @return a Future containing the eventual result
209      */
210     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
211         Preconditions.checkArgument(actor != null, "actor must not be null");
212         Preconditions.checkArgument(message != null, "message must not be null");
213
214         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
215
216         return ask(actor, message, operationTimeout);
217     }
218
219     /**
220      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
221      * reply (essentially set and forget).
222      *
223      * @param actor the ActorSelection
224      * @param message the message to send
225      */
226     public void sendOperationAsync(ActorSelection actor, Object message) {
227         Preconditions.checkArgument(actor != null, "actor must not be null");
228         Preconditions.checkArgument(message != null, "message must not be null");
229
230         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
231
232         actor.tell(message, ActorRef.noSender());
233     }
234
235     public void shutdown() {
236         shardManager.tell(PoisonPill.getInstance(), null);
237         actorSystem.shutdown();
238     }
239
240     public String getCurrentMemberName(){
241         return clusterWrapper.getCurrentMemberName();
242     }
243
244     /**
245      * Send the message to each and every shard
246      *
247      * @param message
248      */
249     public void broadcast(Object message){
250         for(String shardName : configuration.getAllShardNames()){
251
252             Optional<ActorSelection> primary = findPrimaryShard(shardName);
253             if (primary.isPresent()) {
254                 primary.get().tell(message, ActorRef.noSender());
255             } else {
256                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
257                         message.getClass().getSimpleName(), shardName);
258             }
259         }
260     }
261
262     public FiniteDuration getOperationDuration() {
263         return operationDuration;
264     }
265 }