Bug 4105: Add EntityOwnerDataChangeListener
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.pattern.Patterns;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
22 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
23 import org.opendaylight.controller.cluster.datastore.Shard;
24 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
25 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
26 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
27 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
29 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
30 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import scala.concurrent.Future;
34
35 /**
36  * Special Shard for EntityOwnership.
37  *
38  * @author Thomas Pantelis
39  */
40 class EntityOwnershipShard extends Shard {
41     private int transactionIDCounter = 0;
42     private final String localMemberName;
43     private final List<BatchedModifications> retryModifications = new ArrayList<>();
44
45     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
46         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
47     }
48
49     protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
50             DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
51         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
52         this.localMemberName = localMemberName;
53     }
54
55     @Override
56     protected void onDatastoreContext(DatastoreContext context) {
57         super.onDatastoreContext(noPersistenceDatastoreContext(context));
58     }
59
60     @Override
61     public void onReceiveCommand(final Object message) throws Exception {
62         if(message instanceof RegisterCandidateLocal) {
63             onRegisterCandidateLocal((RegisterCandidateLocal)message);
64         } else if(message instanceof UnregisterCandidateLocal) {
65             onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
66         } else {
67             super.onReceiveCommand(message);
68         }
69     }
70
71     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
72         LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
73
74         // TODO - add the listener locally.
75
76         BatchedModifications modifications = new BatchedModifications(
77                 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
78                 DataStoreVersions.CURRENT_VERSION, "");
79         modifications.setDoCommitOnReady(true);
80         modifications.setReady(true);
81         modifications.setTotalMessagesSent(1);
82
83         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
84                 registerCandidate.getEntity().getId(), localMemberName);
85         modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
86
87         tryCommitModifications(modifications);
88
89         getSender().tell(SuccessReply.INSTANCE, getSelf());
90     }
91
92     private void tryCommitModifications(final BatchedModifications modifications) {
93         if(isLeader()) {
94             if(isIsolatedLeader()) {
95                 LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
96
97                 retryModifications.add(modifications);
98             } else {
99                 LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
100
101                 // Note that it's possible the commit won't get consensus and will timeout and not be applied
102                 // to the state. However we don't need to retry it in that case b/c it will be committed to
103                 // the journal first and, once a majority of followers come back on line and it is replicated,
104                 // it will be applied at that point.
105                 handleBatchedModificationsLocal(modifications, self());
106             }
107         } else {
108             final ActorSelection leader = getLeader();
109             if (leader != null) {
110                 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
111
112                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
113                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
114                 future.onComplete(new OnComplete<Object>() {
115                     @Override
116                     public void onComplete(Throwable failure, Object response) {
117                         if(failure != null) {
118                             if(failure instanceof AskTimeoutException) {
119                                 LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
120                                         modifications.getTransactionID(), leader);
121                                 tryCommitModifications(modifications);
122                             } else {
123                                 LOG.error("BatchedModifications {} to leader {} failed",
124                                         modifications.getTransactionID(), leader, failure);
125                             }
126                         } else {
127                             LOG.debug("BatchedModifications {} to leader {} succeeded",
128                                     modifications.getTransactionID(), leader);
129                         }
130                     }
131                 }, getContext().dispatcher());
132             } else {
133                 LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
134
135                 retryModifications.add(modifications);
136             }
137         }
138     }
139
140     @Override
141     protected void onStateChanged() {
142         super.onStateChanged();
143
144         if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
145             LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
146
147             List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
148             retryModifications.clear();
149             for(BatchedModifications mods: retryModificationsCopy) {
150                 tryCommitModifications(mods);
151             }
152         }
153     }
154
155     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
156         // TODO - implement
157         getSender().tell(SuccessReply.INSTANCE, getSelf());
158     }
159
160     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
161             final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
162         return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
163     }
164
165     private static class Creator extends AbstractShardCreator {
166         private static final long serialVersionUID = 1L;
167
168         private final String localMemberName;
169
170         Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
171                 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
172                 final String localMemberName) {
173             super(name, peerAddresses, datastoreContext, schemaContext);
174             this.localMemberName = localMemberName;
175         }
176
177         @Override
178         public Shard create() throws Exception {
179             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
180         }
181     }
182 }