From a507f19b518f36065c74f4f88c9327ede28ff640 Mon Sep 17 00:00:00 2001
From: Tom Pantelis
Date: Sun, 9 Aug 2015 00:22:02 -0400
Subject: [PATCH 1/1] Bug 4105: Implement RegisterCandidate in
EntityOwnershipShard
Change-Id: Idab615399d81a8451e22bfabd30aed9a98e4b037
Signed-off-by: Tom Pantelis
---
opendaylight/md-sal/sal-common-api/pom.xml | 10 +
.../md/sal/common/api/clustering/Entity.java | 8 +-
.../sal/common/api/clustering/EntityTest.java | 59 ++++
.../controller/cluster/datastore/Shard.java | 28 +-
.../DistributedEntityOwnershipService.java | 8 +-
.../entityownership/EntityOwnershipShard.java | 145 +++++++-
.../EntityOwnershipShardPropsCreator.java | 7 +-
.../src/main/yang/entity-owners.yang | 10 +-
.../cluster/datastore/AbstractShardTest.java | 3 +-
.../cluster/datastore/ShardTestKit.java | 8 +-
.../AbstractEntityOwnershipTest.java | 68 ++++
...DistributedEntityOwnershipServiceTest.java | 77 ++++-
.../EntityOwnershipShardTest.java | 324 ++++++++++++++++++
.../datastore/model/SchemaContextHelper.java | 14 +
14 files changed, 727 insertions(+), 42 deletions(-)
create mode 100644 opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java
create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java
diff --git a/opendaylight/md-sal/sal-common-api/pom.xml b/opendaylight/md-sal/sal-common-api/pom.xml
index 9b30c72d4c..438067b4d3 100644
--- a/opendaylight/md-sal/sal-common-api/pom.xml
+++ b/opendaylight/md-sal/sal-common-api/pom.xml
@@ -35,6 +35,16 @@
org.opendaylight.yangtools
yang-data-api
+
+ org.apache.commons
+ commons-lang3
+ test
+
+
+ junit
+ junit
+ test
+
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java
index 3b9f8a425e..93d001216a 100644
--- a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java
+++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java
@@ -9,6 +9,7 @@
package org.opendaylight.controller.md.sal.common.api.clustering;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import javax.annotation.Nonnull;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -31,7 +32,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
* referenced by the YangInstanceIdentifier if the inventory node stored in the data store.
*
*/
-public final class Entity {
+public final class Entity implements Serializable {
+ private static final long serialVersionUID = 1L;
private final String type;
private final YangInstanceIdentifier id;
@@ -90,9 +92,7 @@ public final class Entity {
@Override
public int hashCode() {
- int result = type != null ? type.hashCode() : 0;
- result = 31 * result + (id != null ? id.hashCode() : 0);
- return result;
+ return 31 * type.hashCode() + id.hashCode();
}
@Override
diff --git a/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java b/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java
new file mode 100644
index 0000000000..8636de2be0
--- /dev/null
+++ b/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.api.clustering;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for Entity.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityTest {
+ static String ENTITY_TYPE1 = "type1";
+ static String ENTITY_TYPE2 = "type2";
+ static final QName QNAME1 = QName.create("test", "2015-08-14", "1");
+ static final QName QNAME2 = QName.create("test", "2015-08-14", "2");
+ static final YangInstanceIdentifier YANGID1 = YangInstanceIdentifier.of(QNAME1);
+ static final YangInstanceIdentifier YANGID2 = YangInstanceIdentifier.of(QNAME2);
+
+ @Test
+ public void testHashCode() {
+ Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1);
+
+ assertEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE1, YANGID1).hashCode());
+ assertNotEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE2, YANGID2).hashCode());
+ }
+
+ @Test
+ public void testEquals() {
+ Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1);
+
+ assertEquals("Same", true, entity1.equals(entity1));
+ assertEquals("Same", true, entity1.equals(new Entity(ENTITY_TYPE1, YANGID1)));
+ assertEquals("Different entity type", false, entity1.equals(new Entity(ENTITY_TYPE2, YANGID1)));
+ assertEquals("Different yang ID", false, entity1.equals(new Entity(ENTITY_TYPE1, YANGID2)));
+ assertEquals("Different Object", false, entity1.equals(new Object()));
+ assertEquals("Equals null", false, entity1.equals(null));
+ }
+
+ @Test
+ public void testSerialization() {
+ Entity entity = new Entity(ENTITY_TYPE1, YANGID1);
+
+ Entity clone = SerializationUtils.clone(entity);
+
+ assertEquals("getType", entity.getType(), clone.getType());
+ assertEquals("getId", entity.getId(), clone.getId());
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 7b34f5df60..6aee29dd40 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -408,6 +408,16 @@ public class Shard extends RaftActor {
getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
}
+ protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+ try {
+ commitCoordinator.handleBatchedModifications(batched, sender, this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ }
+
private void handleBatchedModifications(BatchedModifications batched) {
// This message is sent to prepare the modifications transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
@@ -424,13 +434,7 @@ public class Shard extends RaftActor {
if(isLeader()) {
failIfIsolatedLeader(getSender());
- try {
- commitCoordinator.handleBatchedModifications(batched, getSender(), this);
- } catch (Exception e) {
- LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
- batched.getTransactionID(), e);
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
- }
+ handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
if(leader != null) {
@@ -446,7 +450,7 @@ public class Shard extends RaftActor {
}
private boolean failIfIsolatedLeader(ActorRef sender) {
- if(getRaftState() == RaftState.IsolatedLeader) {
+ if(isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
"Shard %s was the leader but has lost contact with all of its followers. Either all" +
" other follower nodes are down or this node is isolated by a network partition.",
@@ -457,6 +461,10 @@ public class Shard extends RaftActor {
return false;
}
+ protected boolean isIsolatedLeader() {
+ return getRaftState() == RaftState.IsolatedLeader;
+ }
+
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
if (isLeader()) {
failIfIsolatedLeader(getSender());
@@ -684,6 +692,10 @@ public class Shard extends RaftActor {
return commitCoordinator;
}
+ protected DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
protected abstract static class AbstractShardCreator implements Creator {
private static final long serialVersionUID = 1L;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java
index 13ecfffefe..f51f579443 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -138,6 +139,11 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
}
protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
- return new EntityOwnershipShardPropsCreator();
+ return new EntityOwnershipShardPropsCreator(datastore.getActorContext().getCurrentMemberName());
+ }
+
+ @VisibleForTesting
+ ActorRef getLocalEntityOwnershipShard() {
+ return localEntityOwnershipShard;
}
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
index 52f85f481e..20a69022db 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
@@ -7,30 +7,65 @@
*/
package org.opendaylight.controller.cluster.datastore.entityownership;
+import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
/**
* Special Shard for EntityOwnership.
*
* @author Thomas Pantelis
*/
-public class EntityOwnershipShard extends Shard {
+class EntityOwnershipShard extends Shard {
+ static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
+ static final QName ENTITY_QNAME = org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.
+ md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity.QNAME;
+ static final QName CANDIDATE_NAME = QName.create(Candidate.QNAME, "name");
+ static final QName ENTITY_ID = QName.create(ENTITY_QNAME, "id");
+ static final QName ENTITY_TYPE = QName.create(EntityType.QNAME, "type");
+
+ private int transactionIDCounter = 0;
+ private final String localMemberName;
+ private final List retryModifications = new ArrayList<>();
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
}
protected EntityOwnershipShard(ShardIdentifier name, Map peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
+ this.localMemberName = localMemberName;
}
@Override
@@ -50,31 +85,127 @@ public class EntityOwnershipShard extends Shard {
}
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
- // TODO - implement
+ LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
+
+ // TODO - add the listener locally.
+
+ BatchedModifications modifications = new BatchedModifications(
+ TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
+ DataStoreVersions.CURRENT_VERSION, "");
+ modifications.setDoCommitOnReady(true);
+ modifications.setReady(true);
+ modifications.setTotalMessagesSent(1);
+
+ NormalizedNode, ?> entityOwners = createEntityOwnersWithCandidate(registerCandidate.getEntity(), localMemberName);
+ modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
+
+ tryCommitModifications(modifications);
+
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private NormalizedNode, ?> createEntityOwnersWithCandidate(Entity entity, String memberName) {
+ MapNode candidateNode = ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild(
+ ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME, memberName)).build();
+
+ MapEntryNode entityNode = ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID, entity.getId()).
+ addChild(candidateNode).build();
+
+ MapEntryNode entityTypeNode = ImmutableNodes.mapEntryBuilder(EntityType.QNAME, ENTITY_TYPE, entity.getType()).
+ addChild(ImmutableNodes.mapNodeBuilder(ENTITY_QNAME).addChild(entityNode).build()).build();
+
+ return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(EntityOwners.QNAME)).
+ addChild(ImmutableNodes.mapNodeBuilder(EntityType.QNAME).addChild(entityTypeNode).build()).build();
+ }
+
+ private void tryCommitModifications(final BatchedModifications modifications) {
+ if(isLeader()) {
+ if(isIsolatedLeader()) {
+ LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+ retryModifications.add(modifications);
+ } else {
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
+ }
+ } else {
+ final ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
+
+ Future