Bug 4105: Integrate EntityOwnerChangeListener with EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
13 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.Props;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import java.util.Collection;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
23 import org.opendaylight.controller.cluster.datastore.Shard;
24 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
25 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
26 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
27 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
31 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
32 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import scala.concurrent.Future;
39
40 /**
41  * Special Shard for EntityOwnership.
42  *
43  * @author Thomas Pantelis
44  */
45 class EntityOwnershipShard extends Shard {
46     private final String localMemberName;
47     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
48     private final EntityOwnershipListenerSupport listenerSupport;
49
50     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
51         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
52     }
53
54     protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
55             DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
56         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
57         this.localMemberName = localMemberName;
58         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
59         this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
60     }
61
62     @Override
63     protected void onDatastoreContext(DatastoreContext context) {
64         super.onDatastoreContext(noPersistenceDatastoreContext(context));
65     }
66
67     @Override
68     protected void onRecoveryComplete() {
69         super.onRecoveryComplete();
70
71         new CandidateListChangeListener(getSelf()).init(getDataStore());
72         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
73     }
74
75     @Override
76     public void onReceiveCommand(final Object message) throws Exception {
77         if(message instanceof RegisterCandidateLocal) {
78             onRegisterCandidateLocal((RegisterCandidateLocal)message);
79         } else if(message instanceof UnregisterCandidateLocal) {
80             onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
81         } else if(message instanceof CandidateAdded){
82             onCandidateAdded((CandidateAdded) message);
83         } else if(message instanceof CandidateRemoved){
84             onCandidateRemoved((CandidateRemoved) message);
85         } else if(!commitCoordinator.handleMessage(message, this)) {
86             super.onReceiveCommand(message);
87         }
88     }
89
90     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
91         LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
92
93         listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
94
95         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
96                 registerCandidate.getEntity().getId(), localMemberName);
97         commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
98
99         getSender().tell(SuccessReply.INSTANCE, getSelf());
100     }
101
102     void tryCommitModifications(final BatchedModifications modifications) {
103         if(isLeader()) {
104             LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
105
106             // Note that it's possible the commit won't get consensus and will timeout and not be applied
107             // to the state. However we don't need to retry it in that case b/c it will be committed to
108             // the journal first and, once a majority of followers come back on line and it is replicated,
109             // it will be applied at that point.
110             handleBatchedModificationsLocal(modifications, self());
111         } else {
112             final ActorSelection leader = getLeader();
113             if (leader != null) {
114                 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
115
116                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
117                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
118
119                 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
120             }
121         }
122     }
123
124     boolean hasLeader() {
125         return getLeader() != null && !isIsolatedLeader();
126     }
127
128     @Override
129     protected void onStateChanged() {
130         super.onStateChanged();
131
132         commitCoordinator.onStateChanged(this, isLeader());
133     }
134
135     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
136         // TODO - implement
137         getSender().tell(SuccessReply.INSTANCE, getSelf());
138     }
139
140     private void onCandidateRemoved(CandidateRemoved message) {
141         if(!isLeader()){
142             return;
143         }
144
145         LOG.debug("onCandidateRemoved: {}", message);
146
147         String currentOwner = getCurrentOwner(message.getEntityPath());
148         if(message.getRemovedCandidate().equals(currentOwner)){
149             writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
150         }
151     }
152
153     private void onCandidateAdded(CandidateAdded message) {
154         if(!isLeader()){
155             return;
156         }
157
158         LOG.debug("onCandidateAdded: {}", message);
159
160         String currentOwner = getCurrentOwner(message.getEntityPath());
161         if(currentOwner == null){
162             writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
163         }
164     }
165
166     private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
167         LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
168
169         commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
170                 ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
171     }
172
173     private String newOwner(Collection<String> candidates) {
174         if(candidates.size() > 0){
175             return candidates.iterator().next();
176         }
177
178         return "";
179     }
180
181     private String getCurrentOwner(YangInstanceIdentifier entityId) {
182         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
183         Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
184         if(optionalEntityOwner.isPresent()){
185             return optionalEntityOwner.get().getValue().toString();
186         }
187         return null;
188     }
189
190     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
191             final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
192         return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
193     }
194
195     private static class Creator extends AbstractShardCreator {
196         private static final long serialVersionUID = 1L;
197
198         private final String localMemberName;
199
200         Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
201                 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
202                 final String localMemberName) {
203             super(name, peerAddresses, datastoreContext, schemaContext);
204             this.localMemberName = localMemberName;
205         }
206
207         @Override
208         public Shard create() throws Exception {
209             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
210         }
211     }
212 }