3dacec72bff48ace9aae63a0bec9182da8ec8b1b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
19 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
21 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import scala.concurrent.Await;
25 import scala.concurrent.Future;
26 import scala.concurrent.duration.Duration;
27 import scala.concurrent.duration.FiniteDuration;
28
29 import java.util.concurrent.TimeUnit;
30
31 import static akka.pattern.Patterns.ask;
32
33 /**
34  * The ActorContext class contains utility methods which could be used by
35  * non-actors (like DistributedDataStore) to work with actors a little more
36  * easily. An ActorContext can be freely passed around to local object instances
37  * but should not be passed to actors especially remote actors
38  */
39 public class ActorContext {
40     private static final Logger
41         LOG = LoggerFactory.getLogger(ActorContext.class);
42
43     public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
44     public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
45
46     private final ActorSystem actorSystem;
47     private final ActorRef shardManager;
48
49     private SchemaContext schemaContext = null;
50
51     public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
52         this.actorSystem = actorSystem;
53         this.shardManager = shardManager;
54     }
55
56     public ActorSystem getActorSystem() {
57         return actorSystem;
58     }
59
60     public ActorRef getShardManager() {
61         return shardManager;
62     }
63
64     public ActorSelection actorSelection(String actorPath){
65         return actorSystem.actorSelection(actorPath);
66     }
67
68     public ActorSelection actorSelection(ActorPath actorPath){
69         return actorSystem.actorSelection(actorPath);
70     }
71
72
73     /**
74      * Finds the primary for a given shard
75      *
76      * @param shardName
77      * @return
78      */
79     public ActorSelection findPrimary(String shardName) {
80         Object result = executeLocalOperation(shardManager,
81             new FindPrimary(shardName), ASK_DURATION);
82
83         if(result instanceof PrimaryFound){
84             PrimaryFound found = (PrimaryFound) result;
85
86             LOG.error("Primary found {}", found.getPrimaryPath());
87
88             return actorSystem.actorSelection(found.getPrimaryPath());
89         }
90         throw new PrimaryNotFoundException();
91     }
92
93     /**
94      * Executes an operation on a local actor and wait for it's response
95      * @param actor
96      * @param message
97      * @param duration
98      * @return The response of the operation
99      */
100     public Object executeLocalOperation(ActorRef actor, Object message,
101         FiniteDuration duration){
102         Future<Object> future =
103             ask(actor, message, new Timeout(duration));
104
105         try {
106             return Await.result(future, AWAIT_DURATION);
107         } catch (Exception e) {
108             throw new TimeoutException(e);
109         }
110     }
111
112     /**
113      * Execute an operation on a remote actor and wait for it's response
114      * @param actor
115      * @param message
116      * @param duration
117      * @return
118      */
119     public Object executeRemoteOperation(ActorSelection actor, Object message,
120         FiniteDuration duration){
121         Future<Object> future =
122             ask(actor, message, new Timeout(duration));
123
124         try {
125             return Await.result(future, AWAIT_DURATION);
126         } catch (Exception e) {
127             throw new TimeoutException(e);
128         }
129     }
130
131     /**
132      * Execute an operation on the primary for a given shard
133      * <p>
134      *     This method first finds the primary for a given shard ,then sends
135      *     the message to the remote shard and waits for a response
136      * </p>
137      * @param shardName
138      * @param message
139      * @param duration
140      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
141      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
142      *
143      * @return
144      */
145     public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
146         ActorSelection primary = findPrimary(shardName);
147
148         return executeRemoteOperation(primary, message, duration);
149     }
150
151     public void shutdown() {
152         shardManager.tell(PoisonPill.getInstance(), null);
153         actorSystem.shutdown();
154     }
155 }