Initial support for multiple-shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.Configuration;
18 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
19 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
20 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
21 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.Duration;
28 import scala.concurrent.duration.FiniteDuration;
29
30 import java.util.concurrent.TimeUnit;
31
32 import static akka.pattern.Patterns.ask;
33
34 /**
35  * The ActorContext class contains utility methods which could be used by
36  * non-actors (like DistributedDataStore) to work with actors a little more
37  * easily. An ActorContext can be freely passed around to local object instances
38  * but should not be passed to actors especially remote actors
39  */
40 public class ActorContext {
41     private static final Logger
42         LOG = LoggerFactory.getLogger(ActorContext.class);
43
44     public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
45     public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
46
47     private final ActorSystem actorSystem;
48     private final ActorRef shardManager;
49     private final Configuration configuration;
50
51     private SchemaContext schemaContext = null;
52
53     public ActorContext(ActorSystem actorSystem, ActorRef shardManager, Configuration configuration){
54         this.actorSystem = actorSystem;
55         this.shardManager = shardManager;
56         this.configuration = configuration;
57     }
58
59     public ActorSystem getActorSystem() {
60         return actorSystem;
61     }
62
63     public ActorRef getShardManager() {
64         return shardManager;
65     }
66
67     public ActorSelection actorSelection(String actorPath){
68         return actorSystem.actorSelection(actorPath);
69     }
70
71     public ActorSelection actorSelection(ActorPath actorPath){
72         return actorSystem.actorSelection(actorPath);
73     }
74
75
76     /**
77      * Finds the primary for a given shard
78      *
79      * @param shardName
80      * @return
81      */
82     public ActorSelection findPrimary(String shardName) {
83         Object result = executeLocalOperation(shardManager,
84             new FindPrimary(shardName), ASK_DURATION);
85
86         if(result instanceof PrimaryFound){
87             PrimaryFound found = (PrimaryFound) result;
88
89             LOG.error("Primary found {}", found.getPrimaryPath());
90
91             return actorSystem.actorSelection(found.getPrimaryPath());
92         }
93         throw new PrimaryNotFoundException();
94     }
95
96     /**
97      * Executes an operation on a local actor and wait for it's response
98      * @param actor
99      * @param message
100      * @param duration
101      * @return The response of the operation
102      */
103     public Object executeLocalOperation(ActorRef actor, Object message,
104         FiniteDuration duration){
105         Future<Object> future =
106             ask(actor, message, new Timeout(duration));
107
108         try {
109             return Await.result(future, AWAIT_DURATION);
110         } catch (Exception e) {
111             throw new TimeoutException(e);
112         }
113     }
114
115     /**
116      * Execute an operation on a remote actor and wait for it's response
117      * @param actor
118      * @param message
119      * @param duration
120      * @return
121      */
122     public Object executeRemoteOperation(ActorSelection actor, Object message,
123         FiniteDuration duration){
124         Future<Object> future =
125             ask(actor, message, new Timeout(duration));
126
127         try {
128             return Await.result(future, AWAIT_DURATION);
129         } catch (Exception e) {
130             throw new TimeoutException(e);
131         }
132     }
133
134     /**
135      * Execute an operation on the primary for a given shard
136      * <p>
137      *     This method first finds the primary for a given shard ,then sends
138      *     the message to the remote shard and waits for a response
139      * </p>
140      * @param shardName
141      * @param message
142      * @param duration
143      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
144      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
145      *
146      * @return
147      */
148     public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
149         ActorSelection primary = findPrimary(shardName);
150
151         return executeRemoteOperation(primary, message, duration);
152     }
153
154     public void shutdown() {
155         shardManager.tell(PoisonPill.getInstance(), null);
156         actorSystem.shutdown();
157     }
158 }