Post "Clustering optimization" updates
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import javax.annotation.Nullable;
11 import java.util.concurrent.Future;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Verify;
16 import com.google.common.util.concurrent.AsyncFunction;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.JdkFutureAdapters;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import io.netty.util.Timeout;
22 import io.netty.util.TimerTask;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
28 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
31 import org.opendaylight.openflowplugin.api.OFConstants;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
37 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
38 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
45 import org.opendaylight.yangtools.yang.common.RpcResult;
46 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Created by kramesha on 9/12/15.
52  */
53 public class RoleContextImpl implements RoleContext {
54     private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
55
56     private final EntityOwnershipService entityOwnershipService;
57     private EntityOwnershipCandidateRegistration entityOwnershipCandidateRegistration;
58     private EntityOwnershipCandidateRegistration txEntityOwnershipCandidateRegistration;
59
60     private final DeviceContext deviceContext;
61
62     private final Entity entity;
63     private final Entity txEntity;
64
65     private SalRoleService salRoleService;
66
67     private final Semaphore mainCandidateGuard = new Semaphore(1, true);
68     private final Semaphore txCandidateGuard = new Semaphore(1, true);
69
70     public RoleContextImpl(final DeviceContext deviceContext, final EntityOwnershipService entityOwnershipService,
71                            final Entity entity, final Entity txEntity) {
72         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
73         this.deviceContext = Preconditions.checkNotNull(deviceContext);
74         this.entity = Preconditions.checkNotNull(entity);
75         this.txEntity = Preconditions.checkNotNull(txEntity);
76         salRoleService = new SalRoleServiceImpl(this, deviceContext);
77     }
78
79     @Override
80     public void initialization() throws CandidateAlreadyRegisteredException {
81         LOG.debug("Initialization RoleContext for Node {}", deviceContext.getDeviceState().getNodeId());
82         final AsyncFunction<RpcResult<SetRoleOutput>, Void> initFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
83             @Override
84             public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> input) throws Exception {
85                 LOG.debug("Initialization request OpenflowEntityOwnership for entity {}", entity);
86                 getDeviceState().setRole(OfpRole.BECOMESLAVE);
87                 entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
88                 LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}", deviceContext
89                         .getPrimaryConnectionContext().getNodeId().getValue());
90                 return Futures.immediateFuture(null);
91             }
92         };
93         final ListenableFuture<Void> roleChange = sendRoleChangeToDevice(OfpRole.BECOMESLAVE, initFunction);
94         Futures.addCallback(roleChange, new FutureCallback<Void>() {
95
96             @Override
97             public void onSuccess(final Void result) {
98                 LOG.debug("Initial RoleContext for Node {} is successful", deviceContext.getDeviceState().getNodeId());
99             }
100
101             @Override
102             public void onFailure(final Throwable t) {
103                 LOG.warn("Initial RoleContext for Node {} fail", deviceContext.getDeviceState().getNodeId(), t);
104                 deviceContext.close();
105             }
106         });
107     }
108
109     @Override
110     public ListenableFuture<Void> onRoleChanged(final OfpRole oldRole, final OfpRole newRole) {
111         LOG.trace("onRoleChanged method call for Entity {}", entity);
112
113         if (!isDeviceConnected()) {
114             // this can happen as after the disconnect, we still get a last message from EntityOwnershipService.
115             LOG.info("Device {} is disconnected from this node. Hence not attempting a role change.",
116                     deviceContext.getPrimaryConnectionContext().getNodeId());
117             LOG.debug("SetRole cancelled for entity [{}], reason = device disconnected.", entity);
118             return Futures.immediateFailedFuture(new Exception(
119                     "Device disconnected - stopped by setRole: " + deviceContext
120                             .getPrimaryConnectionContext().getNodeId()));
121         }
122
123         LOG.debug("Role change received from ownership listener from {} to {} for device:{}", oldRole, newRole,
124                 deviceContext.getPrimaryConnectionContext().getNodeId());
125
126         final AsyncFunction<RpcResult<SetRoleOutput>, Void> roleChangeFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
127             @Override
128             public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
129                 LOG.debug("Role change {} successful made on switch :{}", newRole, deviceContext.getDeviceState()
130                         .getNodeId());
131                 getDeviceState().setRole(newRole);
132                 return deviceContext.onClusterRoleChange(oldRole, newRole);
133             }
134         };
135         return sendRoleChangeToDevice(newRole, roleChangeFunction);
136     }
137
138     @Override
139     public void setupTxCandidate() throws CandidateAlreadyRegisteredException {
140         LOG.debug("setupTxCandidate for entity {} and Transaction entity {}", entity, txEntity);
141         Verify.verify(txEntity != null);
142
143         txEntityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(txEntity);
144     }
145
146     @Override
147     public void close() {
148         if (entityOwnershipCandidateRegistration != null) {
149             LOG.debug("Closing EntityOwnershipCandidateRegistration for {}", entity);
150             entityOwnershipCandidateRegistration.close();
151         }
152     }
153
154     @Override
155     public Entity getEntity() {
156         return entity;
157     }
158
159     @Override
160     public Entity getTxEntity() {
161         return txEntity;
162     }
163
164     private boolean isDeviceConnected() {
165         return ConnectionContext.CONNECTION_STATE.WORKING.equals(
166                 deviceContext.getPrimaryConnectionContext().getConnectionState());
167     }
168
169     @Nullable
170     @Override
171     public <T> RequestContext<T> createRequestContext() {
172         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
173             @Override
174             public void close() {
175             }
176         };
177         return ret;
178     }
179
180     @VisibleForTesting
181     void setSalRoleService(final SalRoleService salRoleService) {
182         this.salRoleService = salRoleService;
183     }
184
185     @Override
186     public DeviceState getDeviceState() {
187         return deviceContext.getDeviceState();
188     }
189
190     @Override
191     public void suspendTxCandidate() {
192         if (txEntityOwnershipCandidateRegistration != null) {
193             txEntityOwnershipCandidateRegistration.close();
194             txEntityOwnershipCandidateRegistration = null;
195         }
196     }
197
198     @Override
199     public DeviceContext getDeviceContext() {
200         return deviceContext;
201     }
202
203     @Override
204     public Semaphore getMainCandidateGuard() {
205         return mainCandidateGuard;
206     }
207
208     @Override
209     public Semaphore getTxCandidateGuard() {
210         return txCandidateGuard;
211     }
212
213     private ListenableFuture<Void> sendRoleChangeToDevice(final OfpRole newRole, final AsyncFunction<RpcResult<SetRoleOutput>, Void> function) {
214         LOG.debug("Send new role {} to device {}", newRole, deviceContext.getDeviceState().getNodeId());
215         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
216         if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
217             LOG.debug("Device OF version {} not support ROLE", deviceContext.getDeviceState().getFeatures().getVersion());
218             setRoleOutputFuture = Futures.immediateFuture(RpcResultBuilder.<SetRoleOutput> success().build());
219         } else {
220             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
221                     .setNode(new NodeRef(deviceContext.getDeviceState().getNodeInstanceIdentifier())).build();
222             setRoleOutputFuture = salRoleService.setRole(setRoleInput);
223             final TimerTask timerTask = new TimerTask() {
224
225                 @Override
226                 public void run(final Timeout timeout) throws Exception {
227                     if (!setRoleOutputFuture.isDone()) {
228                         LOG.info("New role {} was not propagated to device {} during 10 sec. Close connection immediately.",
229                                 newRole, deviceContext.getDeviceState().getNodeId());
230                         deviceContext.close();
231                     }
232                 }
233             };
234             deviceContext.getTimer().newTimeout(timerTask, 10, TimeUnit.SECONDS);
235         }
236         return Futures.transform(JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture), function);
237     }
238 }