2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
13 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.Props;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import java.util.Collection;
21 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
23 import org.opendaylight.controller.cluster.datastore.Shard;
24 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
25 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
26 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
27 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
31 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
32 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import scala.concurrent.Future;
41 * Special Shard for EntityOwnership.
43 * @author Thomas Pantelis
45 class EntityOwnershipShard extends Shard {
46 private final String localMemberName;
47 private final EntityOwnershipShardCommitCoordinator commitCoordinator;
49 private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
50 return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
53 protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
54 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
55 super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
56 this.localMemberName = localMemberName;
57 this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
61 protected void onDatastoreContext(DatastoreContext context) {
62 super.onDatastoreContext(noPersistenceDatastoreContext(context));
66 protected void onRecoveryComplete() {
67 super.onRecoveryComplete();
69 new CandidateListChangeListener(getSelf(), getDataStore());
73 public void onReceiveCommand(final Object message) throws Exception {
74 if(message instanceof RegisterCandidateLocal) {
75 onRegisterCandidateLocal((RegisterCandidateLocal)message);
76 } else if(message instanceof UnregisterCandidateLocal) {
77 onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
78 } else if(message instanceof CandidateAdded){
79 onCandidateAdded((CandidateAdded) message);
80 } else if(message instanceof CandidateRemoved){
81 onCandidateRemoved((CandidateRemoved) message);
82 } else if(!commitCoordinator.handleMessage(message, this)) {
83 super.onReceiveCommand(message);
87 private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
88 LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
90 // TODO - add the listener locally.
92 NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
93 registerCandidate.getEntity().getId(), localMemberName);
94 commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
96 getSender().tell(SuccessReply.INSTANCE, getSelf());
99 void tryCommitModifications(final BatchedModifications modifications) {
101 LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
103 // Note that it's possible the commit won't get consensus and will timeout and not be applied
104 // to the state. However we don't need to retry it in that case b/c it will be committed to
105 // the journal first and, once a majority of followers come back on line and it is replicated,
106 // it will be applied at that point.
107 handleBatchedModificationsLocal(modifications, self());
109 final ActorSelection leader = getLeader();
110 if (leader != null) {
111 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
113 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
114 getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
116 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
121 boolean hasLeader() {
122 return getLeader() != null && !isIsolatedLeader();
126 protected void onStateChanged() {
127 super.onStateChanged();
129 commitCoordinator.onStateChanged(this, isLeader());
132 private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
134 getSender().tell(SuccessReply.INSTANCE, getSelf());
137 private void onCandidateRemoved(CandidateRemoved message) {
142 LOG.debug("onCandidateRemoved: {}", message);
144 String currentOwner = getCurrentOwner(message.getEntityPath());
145 if(message.getRemovedCandidate().equals(currentOwner)){
146 writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
150 private void onCandidateAdded(CandidateAdded message) {
155 LOG.debug("onCandidateAdded: {}", message);
157 String currentOwner = getCurrentOwner(message.getEntityPath());
158 if(currentOwner == null){
159 writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
163 private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
164 LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
166 commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
167 ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
170 private String newOwner(Collection<String> candidates) {
171 if(candidates.size() > 0){
172 return candidates.iterator().next();
178 private String getCurrentOwner(YangInstanceIdentifier entityId) {
179 DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
180 Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
181 if(optionalEntityOwner.isPresent()){
182 return optionalEntityOwner.get().getValue().toString();
187 public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
188 final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
189 return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
192 private static class Creator extends AbstractShardCreator {
193 private static final long serialVersionUID = 1L;
195 private final String localMemberName;
197 Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
198 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
199 final String localMemberName) {
200 super(name, peerAddresses, datastoreContext, schemaContext);
201 this.localMemberName = localMemberName;
205 public Shard create() throws Exception {
206 return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);