Bug 4105: Change commit retry mechanism in EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.pattern.Patterns;
16 import java.util.Map;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
19 import org.opendaylight.controller.cluster.datastore.Shard;
20 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
21 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
25 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
26 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import scala.concurrent.Future;
29
30 /**
31  * Special Shard for EntityOwnership.
32  *
33  * @author Thomas Pantelis
34  */
35 class EntityOwnershipShard extends Shard {
36     private final String localMemberName;
37     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
38
39     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
40         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
41     }
42
43     protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
44             DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
45         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
46         this.localMemberName = localMemberName;
47         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
48     }
49
50     @Override
51     protected void onDatastoreContext(DatastoreContext context) {
52         super.onDatastoreContext(noPersistenceDatastoreContext(context));
53     }
54
55     @Override
56     public void onReceiveCommand(final Object message) throws Exception {
57         if(message instanceof RegisterCandidateLocal) {
58             onRegisterCandidateLocal((RegisterCandidateLocal)message);
59         } else if(message instanceof UnregisterCandidateLocal) {
60             onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
61         } else if(!commitCoordinator.handleMessage(message, this)) {
62             super.onReceiveCommand(message);
63         }
64     }
65
66     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
67         LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
68
69         // TODO - add the listener locally.
70
71         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
72                 registerCandidate.getEntity().getId(), localMemberName);
73         commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
74
75         getSender().tell(SuccessReply.INSTANCE, getSelf());
76     }
77
78     void tryCommitModifications(final BatchedModifications modifications) {
79         if(isLeader()) {
80             LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
81
82             // Note that it's possible the commit won't get consensus and will timeout and not be applied
83             // to the state. However we don't need to retry it in that case b/c it will be committed to
84             // the journal first and, once a majority of followers come back on line and it is replicated,
85             // it will be applied at that point.
86             handleBatchedModificationsLocal(modifications, self());
87         } else {
88             final ActorSelection leader = getLeader();
89             if (leader != null) {
90                 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
91
92                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
93                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
94
95                 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
96             }
97         }
98     }
99
100     boolean hasLeader() {
101         return getLeader() != null && !isIsolatedLeader();
102     }
103
104     @Override
105     protected void onStateChanged() {
106         super.onStateChanged();
107
108         commitCoordinator.onStateChanged(this, isLeader());
109     }
110
111     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
112         // TODO - implement
113         getSender().tell(SuccessReply.INSTANCE, getSelf());
114     }
115
116     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
117             final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
118         return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
119     }
120
121     private static class Creator extends AbstractShardCreator {
122         private static final long serialVersionUID = 1L;
123
124         private final String localMemberName;
125
126         Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
127                 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
128                 final String localMemberName) {
129             super(name, peerAddresses, datastoreContext, schemaContext);
130             this.localMemberName = localMemberName;
131         }
132
133         @Override
134         public Shard create() throws Exception {
135             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
136         }
137     }
138 }