2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.pattern.Patterns;
17 import java.util.ArrayList;
18 import java.util.List;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
22 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
23 import org.opendaylight.controller.cluster.datastore.Shard;
24 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
25 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
26 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
27 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
29 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
30 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import scala.concurrent.Future;
36 * Special Shard for EntityOwnership.
38 * @author Thomas Pantelis
40 class EntityOwnershipShard extends Shard {
41 private int transactionIDCounter = 0;
42 private final String localMemberName;
43 private final List<BatchedModifications> retryModifications = new ArrayList<>();
45 private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
46 return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
49 protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
50 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
51 super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
52 this.localMemberName = localMemberName;
56 protected void onDatastoreContext(DatastoreContext context) {
57 super.onDatastoreContext(noPersistenceDatastoreContext(context));
61 public void onReceiveCommand(final Object message) throws Exception {
62 if(message instanceof RegisterCandidateLocal) {
63 onRegisterCandidateLocal((RegisterCandidateLocal)message);
64 } else if(message instanceof UnregisterCandidateLocal) {
65 onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
67 super.onReceiveCommand(message);
71 private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
72 LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
74 // TODO - add the listener locally.
76 BatchedModifications modifications = new BatchedModifications(
77 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
78 DataStoreVersions.CURRENT_VERSION, "");
79 modifications.setDoCommitOnReady(true);
80 modifications.setReady(true);
81 modifications.setTotalMessagesSent(1);
83 NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
84 registerCandidate.getEntity().getId(), localMemberName);
85 modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
87 tryCommitModifications(modifications);
89 getSender().tell(SuccessReply.INSTANCE, getSelf());
92 private void tryCommitModifications(final BatchedModifications modifications) {
94 if(isIsolatedLeader()) {
95 LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
97 retryModifications.add(modifications);
99 LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
101 // Note that it's possible the commit won't get consensus and will timeout and not be applied
102 // to the state. However we don't need to retry it in that case b/c it will be committed to
103 // the journal first and, once a majority of followers come back on line and it is replicated,
104 // it will be applied at that point.
105 handleBatchedModificationsLocal(modifications, self());
108 final ActorSelection leader = getLeader();
109 if (leader != null) {
110 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
112 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
113 getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
114 future.onComplete(new OnComplete<Object>() {
116 public void onComplete(Throwable failure, Object response) {
117 if(failure != null) {
118 if(failure instanceof AskTimeoutException) {
119 LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
120 modifications.getTransactionID(), leader);
121 tryCommitModifications(modifications);
123 LOG.error("BatchedModifications {} to leader {} failed",
124 modifications.getTransactionID(), leader, failure);
127 LOG.debug("BatchedModifications {} to leader {} succeeded",
128 modifications.getTransactionID(), leader);
131 }, getContext().dispatcher());
133 LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
135 retryModifications.add(modifications);
141 protected void onStateChanged() {
142 super.onStateChanged();
144 if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
145 LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
147 List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
148 retryModifications.clear();
149 for(BatchedModifications mods: retryModificationsCopy) {
150 tryCommitModifications(mods);
155 private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
157 getSender().tell(SuccessReply.INSTANCE, getSelf());
160 public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
161 final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
162 return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
165 private static class Creator extends AbstractShardCreator {
166 private static final long serialVersionUID = 1L;
168 private final String localMemberName;
170 Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
171 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
172 final String localMemberName) {
173 super(name, peerAddresses, datastoreContext, schemaContext);
174 this.localMemberName = localMemberName;
178 public Shard create() throws Exception {
179 return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);