2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
13 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
14 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSelection;
17 import akka.actor.Props;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import java.util.Collection;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
25 import org.opendaylight.controller.cluster.datastore.Shard;
26 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
27 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
28 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
29 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
33 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
34 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
35 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
36 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import scala.concurrent.Future;
45 * Special Shard for EntityOwnership.
47 * @author Thomas Pantelis
49 class EntityOwnershipShard extends Shard {
50 private final String localMemberName;
51 private final EntityOwnershipShardCommitCoordinator commitCoordinator;
52 private final EntityOwnershipListenerSupport listenerSupport;
54 private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
55 return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
58 protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
59 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
60 super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
61 this.localMemberName = localMemberName;
62 this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
63 this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
67 protected void onDatastoreContext(DatastoreContext context) {
68 super.onDatastoreContext(noPersistenceDatastoreContext(context));
72 protected void onRecoveryComplete() {
73 super.onRecoveryComplete();
75 new CandidateListChangeListener(getSelf()).init(getDataStore());
76 new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
80 public void onReceiveCommand(final Object message) throws Exception {
81 if(message instanceof RegisterCandidateLocal) {
82 onRegisterCandidateLocal((RegisterCandidateLocal)message);
83 } else if(message instanceof UnregisterCandidateLocal) {
84 onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
85 } else if(message instanceof CandidateAdded){
86 onCandidateAdded((CandidateAdded) message);
87 } else if(message instanceof CandidateRemoved){
88 onCandidateRemoved((CandidateRemoved) message);
89 } else if(!commitCoordinator.handleMessage(message, this)) {
90 super.onReceiveCommand(message);
94 private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
95 LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
97 listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
99 NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
100 registerCandidate.getEntity().getId(), localMemberName);
101 commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
103 getSender().tell(SuccessReply.INSTANCE, getSelf());
106 private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
107 LOG.debug("onUnregisterCandidateLocal: {}", unregisterCandidate);
109 Entity entity = unregisterCandidate.getEntity();
110 listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
112 YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
113 commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
115 getSender().tell(SuccessReply.INSTANCE, getSelf());
118 void tryCommitModifications(final BatchedModifications modifications) {
120 LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
122 // Note that it's possible the commit won't get consensus and will timeout and not be applied
123 // to the state. However we don't need to retry it in that case b/c it will be committed to
124 // the journal first and, once a majority of followers come back on line and it is replicated,
125 // it will be applied at that point.
126 handleBatchedModificationsLocal(modifications, self());
128 final ActorSelection leader = getLeader();
129 if (leader != null) {
130 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
132 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
133 getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
135 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
140 boolean hasLeader() {
141 return getLeader() != null && !isIsolatedLeader();
145 protected void onStateChanged() {
146 super.onStateChanged();
148 commitCoordinator.onStateChanged(this, isLeader());
151 private void onCandidateRemoved(CandidateRemoved message) {
156 LOG.debug("onCandidateRemoved: {}", message);
158 String currentOwner = getCurrentOwner(message.getEntityPath());
159 if(message.getRemovedCandidate().equals(currentOwner)){
160 writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
164 private void onCandidateAdded(CandidateAdded message) {
169 LOG.debug("onCandidateAdded: {}", message);
171 String currentOwner = getCurrentOwner(message.getEntityPath());
172 if(Strings.isNullOrEmpty(currentOwner)){
173 writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
177 private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
178 LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
180 commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
181 ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
184 private String newOwner(Collection<String> candidates) {
185 if(candidates.size() > 0){
186 return candidates.iterator().next();
192 private String getCurrentOwner(YangInstanceIdentifier entityId) {
193 DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
194 Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
195 if(optionalEntityOwner.isPresent()){
196 return optionalEntityOwner.get().getValue().toString();
201 public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
202 final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
203 return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
206 private static class Creator extends AbstractShardCreator {
207 private static final long serialVersionUID = 1L;
209 private final String localMemberName;
211 Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
212 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
213 final String localMemberName) {
214 super(name, peerAddresses, datastoreContext, schemaContext);
215 this.localMemberName = localMemberName;
219 public Shard create() throws Exception {
220 return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);