Merge "Fixed for bug 1197"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
18 import org.opendaylight.controller.cluster.datastore.Configuration;
19 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Await;
27 import scala.concurrent.Future;
28 import scala.concurrent.duration.Duration;
29 import scala.concurrent.duration.FiniteDuration;
30
31 import java.util.concurrent.TimeUnit;
32
33 import static akka.pattern.Patterns.ask;
34
35 /**
36  * The ActorContext class contains utility methods which could be used by
37  * non-actors (like DistributedDataStore) to work with actors a little more
38  * easily. An ActorContext can be freely passed around to local object instances
39  * but should not be passed to actors especially remote actors
40  */
41 public class ActorContext {
42     private static final Logger
43         LOG = LoggerFactory.getLogger(ActorContext.class);
44
45     public static final FiniteDuration ASK_DURATION =
46         Duration.create(5, TimeUnit.SECONDS);
47     public static final Duration AWAIT_DURATION =
48         Duration.create(5, TimeUnit.SECONDS);
49
50     private final ActorSystem actorSystem;
51     private final ActorRef shardManager;
52     private final ClusterWrapper clusterWrapper;
53     private final Configuration configuration;
54
55     private SchemaContext schemaContext = null;
56
57     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
58         ClusterWrapper clusterWrapper,
59         Configuration configuration) {
60         this.actorSystem = actorSystem;
61         this.shardManager = shardManager;
62         this.clusterWrapper = clusterWrapper;
63         this.configuration = configuration;
64     }
65
66     public ActorSystem getActorSystem() {
67         return actorSystem;
68     }
69
70     public ActorRef getShardManager() {
71         return shardManager;
72     }
73
74     public ActorSelection actorSelection(String actorPath) {
75         return actorSystem.actorSelection(actorPath);
76     }
77
78     public ActorSelection actorSelection(ActorPath actorPath) {
79         return actorSystem.actorSelection(actorPath);
80     }
81
82
83     /**
84      * Finds the primary for a given shard
85      *
86      * @param shardName
87      * @return
88      */
89     public ActorSelection findPrimary(String shardName) {
90         String path = findPrimaryPath(shardName);
91         return actorSystem.actorSelection(path);
92     }
93
94     public String findPrimaryPath(String shardName) {
95         Object result = executeLocalOperation(shardManager,
96             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
97
98         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
99             PrimaryFound found = PrimaryFound.fromSerializable(result);
100
101             LOG.debug("Primary found {}", found.getPrimaryPath());
102
103             return found.getPrimaryPath();
104         }
105         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
106     }
107
108
109     /**
110      * Executes an operation on a local actor and wait for it's response
111      *
112      * @param actor
113      * @param message
114      * @param duration
115      * @return The response of the operation
116      */
117     public Object executeLocalOperation(ActorRef actor, Object message,
118         FiniteDuration duration) {
119         Future<Object> future =
120             ask(actor, message, new Timeout(duration));
121
122         try {
123             return Await.result(future, AWAIT_DURATION);
124         } catch (Exception e) {
125             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
126         }
127     }
128
129     /**
130      * Execute an operation on a remote actor and wait for it's response
131      *
132      * @param actor
133      * @param message
134      * @param duration
135      * @return
136      */
137     public Object executeRemoteOperation(ActorSelection actor, Object message,
138         FiniteDuration duration) {
139
140         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
141
142         Future<Object> future =
143             ask(actor, message, new Timeout(duration));
144
145         try {
146             return Await.result(future, AWAIT_DURATION);
147         } catch (Exception e) {
148             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
149         }
150     }
151
152     /**
153      * Execute an operation on the primary for a given shard
154      * <p>
155      * This method first finds the primary for a given shard ,then sends
156      * the message to the remote shard and waits for a response
157      * </p>
158      *
159      * @param shardName
160      * @param message
161      * @param duration
162      * @return
163      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
164      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
165      */
166     public Object executeShardOperation(String shardName, Object message,
167         FiniteDuration duration) {
168         ActorSelection primary = findPrimary(shardName);
169
170         return executeRemoteOperation(primary, message, duration);
171     }
172
173     public void shutdown() {
174         shardManager.tell(PoisonPill.getInstance(), null);
175         actorSystem.shutdown();
176     }
177
178     /**
179      * @deprecated Need to stop using this method. There are ways to send a
180      * remote ActorRef as a string which should be used instead of this hack
181      *
182      * @param primaryPath
183      * @param localPathOfRemoteActor
184      * @return
185      */
186     @Deprecated
187     public String resolvePath(final String primaryPath,
188         final String localPathOfRemoteActor) {
189         StringBuilder builder = new StringBuilder();
190         String[] primaryPathElements = primaryPath.split("/");
191         builder.append(primaryPathElements[0]).append("//")
192             .append(primaryPathElements[1]).append(primaryPathElements[2]);
193         String[] remotePathElements = localPathOfRemoteActor.split("/");
194         for (int i = 3; i < remotePathElements.length; i++) {
195             builder.append("/").append(remotePathElements[i]);
196         }
197
198         return builder.toString();
199
200     }
201
202     public ActorPath actorFor(String path){
203         return actorSystem.actorFor(path).path();
204     }
205
206     public String getCurrentMemberName(){
207         return clusterWrapper.getCurrentMemberName();
208     }
209
210 }