Merge "Bug-4957 Fix SalRoleService blocking call"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / SalRoleServiceImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.AsyncFunction;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.math.BigInteger;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.Semaphore;
20 import javax.annotation.concurrent.GuardedBy;
21 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
22 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
23 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
24 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
25 import org.opendaylight.openflowplugin.impl.role.RoleChangeException;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38
39 public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput, SetRoleOutput> implements SalRoleService  {
40
41     private static final Logger LOG = LoggerFactory.getLogger(SalRoleServiceImpl.class);
42
43     private static final BigInteger MAX_GENERATION_ID = new BigInteger("ffffffffffffffff", 16);
44
45     private static final int MAX_RETRIES = 42;
46
47     private final DeviceContext deviceContext;
48     private final RoleService roleService;
49     private final NodeId nodeId;
50     private final Short version;
51
52     private final Semaphore currentRoleGuard = new Semaphore(1, true);
53
54     @GuardedBy("currentRoleGuard")
55     private OfpRole currentRole = OfpRole.NOCHANGE;
56
57     public SalRoleServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
58         super(requestContextStack, deviceContext, SetRoleOutput.class);
59         this.deviceContext = Preconditions.checkNotNull(deviceContext);
60         this.roleService =  new RoleService(requestContextStack, deviceContext, RoleRequestOutput.class);
61         nodeId = deviceContext.getPrimaryConnectionContext().getNodeId();
62         version = deviceContext.getPrimaryConnectionContext().getFeatures().getVersion();
63     }
64
65     @Override
66     protected OfHeader buildRequest(final Xid xid, final SetRoleInput input) {
67         return null;
68     }
69
70     @Override
71     public Future<RpcResult<SetRoleOutput>> setRole(final SetRoleInput input) {
72         LOG.info("SetRole called with input:{}", input);
73         try {
74             currentRoleGuard.acquire();
75             LOG.trace("currentRole lock queue: " + currentRoleGuard.getQueueLength());
76         } catch (final InterruptedException e) {
77             LOG.warn("Unexpected exception for acquire semaphor for input {}", input);
78             return RpcResultBuilder.<SetRoleOutput> success().buildFuture();
79         }
80         // compare with last known role and set if different. If they are same, then return.
81         if (currentRole == input.getControllerRole()) {
82             LOG.info("Role to be set is same as the last known role for the device:{}. Hence ignoring.",
83                     input.getControllerRole());
84             currentRoleGuard.release();
85             return RpcResultBuilder.<SetRoleOutput> success().buildFuture();
86         }
87
88         final SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture
89                 .<RpcResult<SetRoleOutput>> create();
90         repeaterForChangeRole(resultFuture, input, 0);
91         return resultFuture;
92     }
93
94     private void repeaterForChangeRole(final SettableFuture<RpcResult<SetRoleOutput>> future, final SetRoleInput input,
95             final int retryCounter) {
96         if (retryCounter >= MAX_RETRIES) {
97             currentRoleGuard.release();
98             future.setException(new RoleChangeException(String.format("Set Role failed after %s tries on device %s",
99                     MAX_RETRIES, input.getNode().getValue())));
100             return;
101         }
102         // Check current connection state
103         final CONNECTION_STATE state = deviceContext.getPrimaryConnectionContext().getConnectionState();
104         switch (state) {
105         case RIP:
106             LOG.info("Device {} has been disconnected", input.getNode());
107             currentRoleGuard.release();
108             future.setException(new Exception(String.format(
109                     "Device connection doesn't exist anymore. Primary connection status : %s", state)));
110             return;
111         case WORKING:
112             // We can proceed
113             break;
114         default:
115             LOG.warn("Device {} is in state {}, role change is not allowed", input.getNode(), state);
116             currentRoleGuard.release();
117             future.setException(new Exception(String.format("Unexcpected device connection status : %s", state)));
118             return;
119         }
120
121         LOG.info("Requesting state change to {}", input.getControllerRole());
122         final ListenableFuture<SetRoleOutput> changeRoleFuture = tryToChangeRole(input.getControllerRole());
123         Futures.addCallback(changeRoleFuture, new FutureCallback<SetRoleOutput>() {
124
125             @Override
126             public void onSuccess(final SetRoleOutput result) {
127                 LOG.info("setRoleOutput received after roleChangeTask execution:{}", result);
128                 currentRole = input.getControllerRole();
129                 currentRoleGuard.release();
130                 future.set(RpcResultBuilder.<SetRoleOutput> success().withResult(result).build());
131             }
132
133             @Override
134             public void onFailure(final Throwable t) {
135                 LOG.info("Exception in setRole(), will retry: {} times.", MAX_RETRIES - retryCounter, t);
136                 repeaterForChangeRole(future, input, (retryCounter + 1));
137             }
138         });
139     }
140
141     private ListenableFuture<SetRoleOutput> tryToChangeRole(final OfpRole role) {
142         LOG.info("RoleChangeTask called on device:{} OFPRole:{}", nodeId.getValue(), role);
143
144         final Future<BigInteger> generationFuture = roleService.getGenerationIdFromDevice(version);
145
146         return Futures.transform(JdkFutureAdapters.listenInPoolThread(generationFuture), new AsyncFunction<BigInteger, SetRoleOutput>() {
147
148             @Override
149             public ListenableFuture<SetRoleOutput> apply(final BigInteger generationId) throws Exception {
150                 LOG.debug("RoleChangeTask, GenerationIdFromDevice from device {} is {}", nodeId.getValue(), generationId);
151                 final BigInteger nextGenerationId = getNextGenerationId(generationId);
152                 LOG.debug("nextGenerationId received from device:{} is {}", nodeId.getValue(), nextGenerationId);
153                 final Future<SetRoleOutput> submitRoleFuture = roleService.submitRoleChange(role, version, nextGenerationId);
154                 return JdkFutureAdapters.listenInPoolThread(submitRoleFuture);
155             }
156         });
157     }
158
159     private static BigInteger getNextGenerationId(final BigInteger generationId) {
160         if (generationId.compareTo(MAX_GENERATION_ID) < 0) {
161             return generationId.add(BigInteger.ONE);
162         } else {
163             return BigInteger.ZERO;
164         }
165     }
166 }