<artifactId>org.osgi.core</artifactId>
</dependency>
- <!-- AKKA Dependencies -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_2.11</artifactId>
+ <version>2.3.2</version>
+ </dependency>
+
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Address;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The ShardManager has the following jobs,
+ *
+ * - Create all the local shard replicas that belong on this cluster member
+ * - Find the primary replica for any given shard
+ * - Engage in shard replica elections which decide which replica should be the primary
+ *
+ * Creation of Shard replicas
+ * ==========================
+ * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
+ * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
+ *
+ * Replica Elections
+ * =================
+ * The Shard Manager uses multiple cues to initiate election.
+ * - When a member of the cluster dies
+ * - When a local shard replica dies
+ * - When a local shard replica comes alive
+ */
+public class ShardManager extends UntypedActor {
+
+ // Stores a mapping between a shard name and the address of the current primary
+ private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
+
+ // Stores a mapping between a member name and the address of the member
+ private final Map<String, Address> memberNameToAddress = new HashMap<>();
+
+ // Stores a mapping between the shard name and all the members on which a replica of that shard are available
+ private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
+
+ LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if(message instanceof FindPrimary ){
+ FindPrimary msg = ((FindPrimary) message);
+ getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf());
+ }
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The FindPrimary message is used to locate the primary of any given shard
+ *
+ * TODO : Make this serializable
+ */
+public class FindPrimary{
+ private final String shardName;
+
+ public FindPrimary(String shardName){
+
+ Preconditions.checkNotNull(shardName, "shardName should not be null");
+
+ this.shardName = shardName;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class PrimaryFound {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+
+public class PrimaryNotFound {
+
+ private final String shardName;
+
+ public PrimaryNotFound(String shardName){
+
+ Preconditions.checkNotNull(shardName, "shardName should not be null");
+
+ this.shardName = shardName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PrimaryNotFound that = (PrimaryNotFound) o;
+
+ if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return shardName != null ? shardName.hashCode() : 0;
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import scala.concurrent.duration.Duration;
+
+public class ShardManagerTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setUp(){
+ system = ActorSystem.create("test");
+ }
+
+ @AfterClass
+ public static void tearDown(){
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ @Test
+ public void testOnReceiveFindPrimary() throws Exception {
+
+ new JavaTestKit(system) {{
+ final Props props = Props.create(ShardManager.class);
+ final TestActorRef<ShardManager> subject = TestActorRef.create(system, props, "test");
+
+ // can also use JavaTestKit “from the outside”
+ final JavaTestKit probe = new JavaTestKit(system);
+
+ // the run() method needs to finish within 3 seconds
+ new Within(duration("3 seconds")) {
+ protected void run() {
+
+ subject.tell(new FindPrimary("inventory"), getRef());
+
+ expectMsgEquals(Duration.Zero(), new PrimaryNotFound("inventory"));
+
+ // Will wait for the rest of the 3 seconds
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.junit.Test;
+
+public class FindPrimaryTest {
+
+ @Test
+ public void testNewBuilderForType() throws Exception {
+
+ }
+
+ @Test
+ public void testToBuilder() throws Exception {
+
+ }
+
+ @Test
+ public void testGetDefaultInstanceForType() throws Exception {
+
+ }
+}
\ No newline at end of file