+ // Distributes FRM programming in the cluster
+ private IConnectionManager connectionManager;
+
+ /*
+ * Name clustered caches used to support FRM entry distribution these are by
+ * necessity non-transactional as long as need to be able to synchronize
+ * states also while a transaction is in progress
+ */
+ static final String WORK_ORDER_CACHE = "frm.workOrder";
+ static final String WORK_STATUS_CACHE = "frm.workStatus";
+ static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView";
+ static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView";
+
+ /*
+ * Data structure responsible for distributing the FlowEntryInstall requests
+ * in the cluster. The key value is entry that is being either Installed or
+ * Updated or Delete. The value field is the same of the key value in case
+ * of Installation or Deletion, it's the new entry in case of Modification,
+ * this because the clustering caches don't allow null values.
+ *
+ * The logic behind this data structure is that the controller that initiate
+ * the request will place the order here, someone will pick it and then will
+ * remove from this data structure because is being served.
+ *
+ * TODO: We need to have a way to cleanup this data structure if entries are
+ * not picked by anyone, which is always a case can happen especially on
+ * Node disconnect cases.
+ */
+ protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+
+ /*
+ * Data structure responsible for retrieving the results of the workOrder
+ * submitted to the cluster.
+ *
+ * The logic behind this data structure is that the controller that has
+ * executed the order will then place the result in workStatus signaling
+ * that there was a success or a failure.
+ *
+ * TODO: The workStatus entries need to have a lifetime associated in case
+ * of requestor controller leaving the cluster.
+ */
+ protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+
+ /*
+ * Local Map used to hold the Future which a caller can use to monitor for
+ * completion
+ */
+ private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
+ new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+
+ /*
+ * Max pool size for the executor
+ */
+ private static final int maxPoolSize = 10;
+
+ /**
+ * @param e
+ * Entry being installed/updated/removed
+ * @param u
+ * New entry will be placed after the update operation. Valid
+ * only for UpdateType.CHANGED, null for all the other cases
+ * @param t
+ * Type of update
+ * @return a Future object for monitoring the progress of the result, or
+ * null in case the processing should take place locally
+ */
+ private FlowEntryDistributionOrderFutureTask distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u,
+ UpdateType t) {
+ // A null entry it's an unexpected condition, anyway it's safe to keep
+ // the handling local
+ if (e == null) {
+ return null;
+ }
+
+ Node n = e.getNode();
+ if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", n, fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
+ }
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
+ }
+
+ logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
+ return null;
+ }
+