Bug-2827: role switch proposal
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / OFRoleManager.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
9
10 import com.google.common.base.Objects;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.math.BigInteger;
17 import java.util.Comparator;
18 import java.util.concurrent.ArrayBlockingQueue;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.PriorityBlockingQueue;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManager;
25 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
26 import org.opendaylight.openflowplugin.openflow.md.util.RoleUtil;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.common.config.impl.rev140326.OfpRole;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * manage OF-role propagation to devices
33  */
34 public class OFRoleManager implements AutoCloseable {
35
36     /**
37      * starting value of generationId
38      */
39     public static final BigInteger MAX_GENERATION_ID = new BigInteger("ffffffffffffffff", 16);
40
41     private static final Logger LOG = LoggerFactory.getLogger(OFRoleManager.class);
42
43     private static final long TIMEOUT = 2000;
44
45     private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
46
47     private static final int RETRY_LIMIT = 42;
48
49     private final ListeningExecutorService broadcastPool;
50
51     private final BlockingQueue<RolePushTask> workQueue;
52
53     private final SessionManager sessionManager;
54
55     /**
56      * @param sessionManager
57      */
58     public OFRoleManager(SessionManager sessionManager) {
59         Preconditions.checkNotNull("Session manager can not be empty.", sessionManager);
60         this.sessionManager = sessionManager;
61         workQueue = new PriorityBlockingQueue<>(500, new Comparator<RolePushTask>() {
62             @Override
63             public int compare(RolePushTask o1, RolePushTask o2) {
64                 return Integer.compare(o1.getPriority(), o2.getPriority());
65             }
66         });
67         ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
68                 1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "ofRoleBroadcast");
69         broadcastPool = MoreExecutors.listeningDecorator(
70                 delegate);
71     }
72
73     /**
74      * change role on each connected device
75      *
76      * @param role
77      */
78     public void manageRoleChange(final OfpRole role) {
79         for (final SessionContext session : sessionManager.getAllSessions()) {
80             try {
81                 workQueue.put(new RolePushTask(role, session));
82             } catch (InterruptedException e) {
83                 LOG.warn("Processing of role request failed while enqueueing role task: {}", e.getMessage());
84             }
85         }
86
87         while (!workQueue.isEmpty()) {
88             RolePushTask task = workQueue.poll();
89             ListenableFuture<Boolean> rolePushResult = broadcastPool.submit(task);
90             CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
91                     RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
92             try {
93                 Boolean succeeded = rolePushResultChecked.checkedGet(TIMEOUT, TIMEOUT_UNIT);
94                 if (!Objects.firstNonNull(succeeded, Boolean.FALSE)) {
95                     if (task.getRetryCounter() < RETRY_LIMIT) {
96                         workQueue.offer(task);
97                     }
98                 }
99             } catch (RolePushException | TimeoutException e) {
100                 LOG.warn("failed to process role request: {}", e);
101             }
102         }
103     }
104
105     @Override
106     public void close() throws Exception {
107         broadcastPool.shutdown();
108     }
109 }