public static final String MODULE_NAME = "devicemanager";
- // protected ITopologyService topology;
- // protected IStorageSourceService storageSource;
- // protected IRestApiService restApi;
- // protected IThreadPoolService threadPool;
- // protected IFlowReconcileService flowReconcileMgr;
- // protected IFlowReconcileEngineService flowReconcileEngine;
- // protected IDebugCounterService debugCounters;
- // private ISyncService syncService;
- // private IStoreClient<String,DeviceSyncRepresentation> storeClient;
- // private DeviceSyncManager deviceSyncManager;
-
private ITopologyManager topology;
private ISwitchManager switchManager = null;
private IDataPacketService dataPacketService = null;
static final String DEVICE_SYNC_STORE_NAME = DeviceManagerImpl.class
.getCanonicalName() + ".stateStore";
- /**
- * Time interval between writes of entries for the same device to the sync
- * store.
- */
- // static final int DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS =
- // 5*60*1000; // 5 min
- // private int syncStoreWriteIntervalMs =
- // DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS;
-
- /**
- * Time after SLAVE->MASTER until we run the consolidate store code.
- */
- // static final int DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS =
- // 15*1000; // 15 sec
- // private int initialSyncStoreConsolidateMs =
- // DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS;
-
- /**
- * Time interval between consolidate store runs.
- */
- // static final int DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS =
- // 75*60*1000; // 75 min
- // private final int syncStoreConsolidateIntervalMs =
- // DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS;
/**
* Time in milliseconds before entities will expire
return -compare(newAP, oldAP);
long activeOffset = 0;
- // XXX - missing functionality -- need topology
- // if (!topology.isConsistent(oldSw, oldPort, newSw, newPort)) {
+
if (!newBD && oldBD) {
return -1;
}
activeOffset = AttachmentPoint.OPENFLOW_TO_EXTERNAL_TIMEOUT;
}
- // } else {
- // // The attachment point is consistent.
- // activeOffset = AttachmentPoint.CONSISTENT_TIMEOUT;
- // }
if ((newAP.getActiveSince() > oldAP.getLastSeen() + activeOffset)
|| (newAP.getLastSeen() > oldAP.getLastSeen()
addIndex(true, EnumSet.of(DeviceField.IPV4));
- // floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
- // floodlightProvider.addHAListener(this.haListenerDelegate);
- // if (topology != null)
- // topology.addListener(this);
- // flowReconcileMgr.addFlowReconcileListener(this);
- // entityClassifier.addListener(this);
-
stopped = false;
// XXX - Should use a common threadpool but this doesn't currently exist
ses = Executors.newScheduledThreadPool(1);
entityCleanupTask = new SingletonTask(ses, ecr);
entityCleanupTask.reschedule(ENTITY_CLEANUP_INTERVAL, TimeUnit.SECONDS);
- /*
- * XXX Missing functionality if (restApi != null) {
- * restApi.addRestletRoutable(new DeviceRoutable()); } else {
- * logger.debug("Could not instantiate REST API"); }
- */
-
registerDeviceManagerDebugCounters();
-
- /*
- * XXX Missing functionality try {
- * this.syncService.registerStore(DEVICE_SYNC_STORE_NAME, Scope.LOCAL);
- * this.storeClient = this.syncService
- * .getStoreClient(DEVICE_SYNC_STORE_NAME, String.class,
- * DeviceSyncRepresentation.class); } catch (SyncException e) { throw
- * new FloodlightModuleException("Error while setting up sync service",
- * e); }
- *
- * Runnable consolidateStoreRunner = new Runnable() {
- *
- * @Override public void run() { deviceSyncManager.consolidateStore();
- * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
- * TimeUnit.MILLISECONDS); debugCounters.flushCounters(); } };
- * storeConsolidateTask = new SingletonTask(ses,
- * consolidateStoreRunner); if (isMaster)
- * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
- * TimeUnit.MILLISECONDS);
- */
}
/**
}
}
- // ***************
- // IFlowReconcileListener
- // ***************
- /*
- * XXX - Missing functionality
- *
- * @Override public Command reconcileFlows(ArrayList<OFMatchReconcile>
- * ofmRcList) { ListIterator<OFMatchReconcile> iter =
- * ofmRcList.listIterator(); while (iter.hasNext()) { OFMatchReconcile ofm =
- * iter.next();
- *
- * // Remove the STOPPed flow. if (Command.STOP == reconcileFlow(ofm)) {
- * iter.remove(); } }
- *
- * if (ofmRcList.size() > 0) { return Command.CONTINUE; } else { return
- * Command.STOP; } }
- *
- * protected Command reconcileFlow(OFMatchReconcile ofm) {
- * debugCounters.updateCounter(CNT_RECONCILE_REQUEST); // Extract source
- * entity information Entity srcEntity =
- * getEntityFromFlowMod(ofm.ofmWithSwDpid, true); if (srcEntity == null) {
- * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
- * Command.STOP; }
- *
- * // Find the device by source entity Device srcDevice =
- * findDeviceByEntity(srcEntity); if (srcDevice == null) {
- * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
- * Command.STOP; } // Store the source device in the context
- * fcStore.put(ofm.cntx, CONTEXT_SRC_DEVICE, srcDevice);
- *
- * // Find the device matching the destination from the entity // classes of
- * the source. Entity dstEntity = getEntityFromFlowMod(ofm.ofmWithSwDpid,
- * false); Device dstDevice = null; if (dstEntity != null) { dstDevice =
- * findDestByEntity(srcDevice.getEntityClass(), dstEntity); if (dstDevice !=
- * null) fcStore.put(ofm.cntx, CONTEXT_DST_DEVICE, dstDevice); else
- * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } else {
- * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } if
- * (logger.isTraceEnabled()) {
- * logger.trace("Reconciling flow: match={}, srcEntity={}, srcDev={}, " +
- * "dstEntity={}, dstDev={}", new Object[] {ofm.ofmWithSwDpid.getOfMatch(),
- * srcEntity, srcDevice, dstEntity, dstDevice } ); } return
- * Command.CONTINUE; }
- */
// *****************
// IListenDataPacket
if (inPkt == null) {
return PacketResult.IGNORED;
}
- // try {
- // throw new Exception("Sample");
- // } catch (Exception e) {
- // logger.error("Sample stack trace", e);
- // }
Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
Ethernet eth;
NodeConnector inPort = inPkt.getIncomingNodeConnector();
Entity srcEntity = getSourceEntityFromPacket(eth, inPort);
if (srcEntity == null) {
- // debugCounters.updateCounter(CNT_BROADCAST_SOURCE);
return PacketResult.CONSUME;
}
// Learn/lookup device information
Device srcDevice = learnDeviceByEntity(srcEntity);
if (srcDevice == null) {
- // debugCounters.updateCounter(CNT_NO_SOURCE);
return PacketResult.CONSUME;
}
logger.trace("Saw packet from device {}", srcDevice);
- // // Store the source device in the context
- // fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice);
- //
- // // Find the device matching the destination from the entity
- // // classes of the source.
- // Entity dstEntity = getDestEntityFromPacket(eth);
- // Device dstDevice = null;
- // if (dstEntity != null) {
- // dstDevice =
- // findDestByEntity(srcDevice.getEntityClass(), dstEntity);
- // if (dstDevice != null)
- // fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice);
- // //else
- // //debugCounters.updateCounter(CNT_NO_DEST);
- // } else {
- // //debugCounters.updateCounter(CNT_NO_DEST);
- // }
- //
- // if (logger.isTraceEnabled()) {
- // logger.trace("Received PI: {} on switch {}, port {} *** eth={}" +
- // " *** srcDev={} *** dstDev={} *** ",
- // new Object[] { pi, sw.getStringId(), pi.getInPort(), eth,
- // srcDevice, dstDevice });
- // }
- //
- // snoopDHCPClientName(eth, srcDevice);
-
return PacketResult.KEEP_PROCESSING;
}
// Internal methods
// ****************
- /**
- * Snoop and record client-provided host name from DHCP requests
- *
- * @param eth
- * @param srcDevice
- */
- // private void snoopDHCPClientName(Ethernet eth, Device srcDevice) {
- // if (! (eth.getPayload() instanceof IPv4) )
- // return;
- // IPv4 ipv4 = (IPv4) eth.getPayload();
- // if (! (ipv4.getPayload() instanceof UDP) )
- // return;
- // UDP udp = (UDP) ipv4.getPayload();
- // if (!(udp.getPayload() instanceof DHCP))
- // return;
- // DHCP dhcp = (DHCP) udp.getPayload();
- // byte opcode = dhcp.getOpCode();
- // if (opcode == DHCP.OPCODE_REQUEST) {
- // DHCPOption dhcpOption = dhcp.getOption(
- // DHCPOptionCode.OptionCode_Hostname);
- // if (dhcpOption != null) {
- // debugCounters.updateCounter(CNT_DHCP_CLIENT_NAME_SNOOPED);
- // srcDevice.dhcpClientName = new String(dhcpOption.getData());
- // }
- // }
- // }
/**
* Check whether the given attachment point is valid given the current
learnDeviceByEntity(e);
}
- /**
- * Get a (partial) entity for the destination from the packet.
- *
- * @param eth
- * @return
- */
- // protected Entity getDestEntityFromPacket(Ethernet eth) {
- // byte[] dlAddrArr = eth.getDestinationMACAddress();
- // long dlAddr = Ethernet.toLong(dlAddrArr);
- // short vlan = eth.getVlanID();
- // int nwDst = 0;
- //
- // // Ignore broadcast/multicast destination
- // if ((dlAddrArr[0] & 0x1) != 0)
- // return null;
- //
- // if (eth.getPayload() instanceof IPv4) {
- // IPv4 ipv4 = (IPv4) eth.getPayload();
- // nwDst = ipv4.getDestinationAddress();
- // }
- //
- // return new Entity(dlAddr,
- // ((vlan >= 0) ? vlan : null),
- // ((nwDst != 0) ? nwDst : null),
- // null,
- // null,
- // null);
- // }
-
- /**
- * Parse an entity from an OFMatchWithSwDpid.
- *
- * @param ofmWithSwDpid
- * @return the entity from the packet
- */
- // private Entity getEntityFromFlowMod(OFMatchWithSwDpid ofmWithSwDpid,
- // boolean isSource) {
- // byte[] dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerSource();
- // int nwSrc = ofmWithSwDpid.getOfMatch().getNetworkSource();
- // if (!isSource) {
- // dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerDestination();
- // nwSrc = ofmWithSwDpid.getOfMatch().getNetworkDestination();
- // }
- //
- // long dlAddr = Ethernet.toLong(dlAddrArr);
- //
- // // Ignore broadcast/multicast source
- // if ((dlAddrArr[0] & 0x1) != 0)
- // return null;
- //
- // Long swDpid = null;
- // Short inPort = null;
- //
- // if (isSource) {
- // swDpid = ofmWithSwDpid.getSwitchDataPathId();
- // inPort = ofmWithSwDpid.getOfMatch().getInputPort();
- // }
- //
- // /**for the new flow cache design, the flow mods retrived are not always
- // from the source, learn AP should be disabled --meiyang*/
- // boolean learnap = false;
- // /**
- // * if (swDpid == null ||
- // inPort == null ||
- // !isValidAttachmentPoint(swDpid, inPort)) {
- // // If this is an internal port or we otherwise don't want
- // // to learn on these ports. In the future, we should
- // // handle this case by labeling flows with something that
- // // will give us the entity class. For now, we'll do our
- // // best assuming attachment point information isn't used
- // // as a key field.
- // learnap = false;
- // }
- // */
- //
- // short vlan = ofmWithSwDpid.getOfMatch().getDataLayerVirtualLan();
- // return new Entity(dlAddr,
- // ((vlan >= 0) ? vlan : null),
- // ((nwSrc != 0) ? nwSrc : null),
- // (learnap ? swDpid : null),
- // (learnap ? (int)inPort : null),
- // new Date());
- // }
-
/**
* Look up a {@link Device} based on the provided {@link Entity}. We first
* check the primary index. If we do not find an entry there we classify the
return true;
}
- /**
- * For testing: sets the interval between writes of the same device to the
- * device store.
- *
- * @param intervalMs
- */
- // void setSyncStoreWriteInterval(int intervalMs) {
- // this.syncStoreWriteIntervalMs = intervalMs;
- // }
-
- /**
- * For testing: sets the time between transition to MASTER and consolidate
- * store
- *
- * @param intervalMs
- */
- // void setInitialSyncStoreConsolidateMs(int intervalMs) {
- // this.initialSyncStoreConsolidateMs = intervalMs;
- // }
private long toLong(byte[] address) {
long mac = 0;
}
}
-
- /**
- * For testing: consolidate the store NOW
- */
- // void scheduleConsolidateStoreNow() {
- // this.storeConsolidateTask.reschedule(0, TimeUnit.MILLISECONDS);
- // }
-
- // private class DeviceSyncManager {
- // // maps (opaque) deviceKey to the time in System.nanoTime() when we
- // // last wrote the device to the sync store
- // private ConcurrentMap<Long, Long> lastWriteTimes =
- // new ConcurrentHashMap<Long, Long>();
- //
- // /**
- // * Write the given device to storage if we are MASTER.
- // * Use this method if the device has significantly changed (e.g.,
- // * new AP, new IP, entities removed).
- // * @param d the device to store
- // */
- // public void storeDevice(Device d) {
- // if (!isMaster)
- // return;
- // if (d == null)
- // return;
- // long now = System.nanoTime();
- // writeUpdatedDeviceToStorage(d);
- // lastWriteTimes.put(d.getDeviceKey(), now);
- // }
- //
- // /**
- // * Write the given device to storage if we are MASTER and if the
- // * last write for the device was more than this.syncStoreIntervalNs
- // * time ago.
- // * Use this method to updated last active times in the store.
- // * @param d the device to store
- // */
- // public void storeDeviceThrottled(Device d) {
- // long intervalNs = syncStoreWriteIntervalMs*1000L*1000L;
- // if (!isMaster)
- // return;
- // if (d == null)
- // return;
- // long now = System.nanoTime();
- // Long last = lastWriteTimes.get(d.getDeviceKey());
- // if (last == null ||
- // now - last > intervalNs) {
- // writeUpdatedDeviceToStorage(d);
- // lastWriteTimes.put(d.getDeviceKey(), now);
- // } else {
- // debugCounters.updateCounter(CNT_DEVICE_STORE_THROTTLED);
- // }
- // }
- //
- // /**
- // * Remove the given device from the store. If only some entities have
- // * been removed the updated device should be written using
- // * {@link #storeDevice(Device)}
- // * @param d
- // */
- // public void removeDevice(Device d) {
- // if (!isMaster)
- // return;
- // // FIXME: could we have a problem with concurrent put to the
- // // hashMap? I.e., we write a stale entry to the map after the
- // // delete and now are left with an entry we'll never clean up
- // lastWriteTimes.remove(d.getDeviceKey());
- // try {
- // // TODO: should probably do versioned delete. OTOH, even
- // // if we accidentally delete, we'll write it again after
- // // the next entity ....
- // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
- // storeClient.delete(DeviceSyncRepresentation.computeKey(d));
- // } catch(ObsoleteVersionException e) {
- // // FIXME
- // } catch (SyncException e) {
- // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
- // logger.error("Could not remove device " + d + " from store", e);
- // }
- // }
- //
- // /**
- // * Remove the given Versioned device from the store. If the device
- // * was locally modified ignore the delete request.
- // * @param syncedDeviceKey
- // */
- // private void removeDevice(Versioned<DeviceSyncRepresentation> dev) {
- // try {
- // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
- // storeClient.delete(dev.getValue().getKey(),
- // dev.getVersion());
- // } catch(ObsoleteVersionException e) {
- // // Key was locally modified by another thread.
- // // Do not delete and ignore.
- // } catch(SyncException e) {
- // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
- // logger.error("Failed to remove device entry for " +
- // dev.toString() + " from store.", e);
- // }
- // }
- //
- // /**
- // * Synchronously transition from SLAVE to MASTER. By iterating through
- // * the store and learning all devices from the store
- // */
- // private void goToMaster() {
- // if (logger.isDebugEnabled()) {
- // logger.debug("Transitioning to MASTER role");
- // }
- // debugCounters.updateCounter(CNT_TRANSITION_TO_MASTER);
- // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
- // iter = null;
- // try {
- // iter = storeClient.entries();
- // } catch (SyncException e) {
- // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
- // logger.error("Failed to read devices from sync store", e);
- // return;
- // }
- // try {
- // while(iter.hasNext()) {
- // Versioned<DeviceSyncRepresentation> versionedDevice =
- // iter.next().getValue();
- // DeviceSyncRepresentation storedDevice =
- // versionedDevice.getValue();
- // if (storedDevice == null)
- // continue;
- // debugCounters.updateCounter(CNT_DEVICES_FROM_STORE);
- // for(SyncEntity se: storedDevice.getEntities()) {
- // learnDeviceByEntity(se.asEntity());
- // }
- // }
- // } finally {
- // if (iter != null)
- // iter.close();
- // }
- // storeConsolidateTask.reschedule(initialSyncStoreConsolidateMs,
- // TimeUnit.MILLISECONDS);
- // }
- //
- // /**
- // * Actually perform the write of the device to the store
- // * FIXME: concurrent modification behavior
- // * @param device The device to write
- // */
- // private void writeUpdatedDeviceToStorage(Device device) {
- // try {
- // debugCounters.updateCounter(CNT_DEVICE_STORED);
- // // FIXME: use a versioned put
- // DeviceSyncRepresentation storeDevice =
- // new DeviceSyncRepresentation(device);
- // storeClient.put(storeDevice.getKey(), storeDevice);
- // } catch (ObsoleteVersionException e) {
- // // FIXME: what's the right behavior here. Can the store client
- // // even throw this error?
- // } catch (SyncException e) {
- // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
- // logger.error("Could not write device " + device +
- // " to sync store:", e);
- // }
- // }
- //
- // /**
- // * Iterate through all entries in the sync store. For each device
- // * in the store check if any stored entity matches a live device. If
- // * no entities match a live device we remove the entry from the store.
- // *
- // * Note: we do not check if all devices known to device manager are
- // * in the store. We rely on regular packetIns for that.
- // * Note: it's possible that multiple entries in the store map to the
- // * same device. We don't check or handle this case.
- // *
- // * We need to perform this check after a SLAVE->MASTER transition to
- // * get rid of all entries the old master might have written to the
- // * store after we took over. We also run it regularly in MASTER
- // * state to ensure we don't have stale entries in the store
- // */
- // private void consolidateStore() {
- // if (!isMaster)
- // return;
- // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_RUNS);
- // if (logger.isDebugEnabled()) {
- // logger.debug("Running consolidateStore.");
- // }
- // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
- // iter = null;
- // try {
- // iter = storeClient.entries();
- // } catch (SyncException e) {
- // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
- // logger.error("Failed to read devices from sync store", e);
- // return;
- // }
- // try {
- // while(iter.hasNext()) {
- // boolean found = false;
- // Versioned<DeviceSyncRepresentation> versionedDevice =
- // iter.next().getValue();
- // DeviceSyncRepresentation storedDevice =
- // versionedDevice.getValue();
- // if (storedDevice == null)
- // continue;
- // for(SyncEntity se: storedDevice.getEntities()) {
- // try {
- // // Do we have a device for this entity??
- // IDevice d = findDevice(se.macAddress, se.vlan,
- // se.ipv4Address,
- // se.switchDPID,
- // se.switchPort);
- // if (d != null) {
- // found = true;
- // break;
- // }
- // } catch (IllegalArgumentException e) {
- // // not all key fields provided. Skip entity
- // }
- // }
- // if (!found) {
- // // We currently DO NOT have a live device that
- // // matches the current device from the store.
- // // Delete device from store.
- // if (logger.isDebugEnabled()) {
- // logger.debug("Removing device {} from store. No "
- // + "corresponding live device",
- // storedDevice.getKey());
- // }
- // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_DEVICES_REMOVED);
- // removeDevice(versionedDevice);
- // }
- // }
- // } finally {
- // if (iter != null)
- // iter.close();
- // }
- // }
- // }
- //
- //
- // /**
- // * For testing. Sets the syncService. Only call after init but before
- // * startUp. Used by MockDeviceManager
- // * @param syncService
- // */
- // protected void setSyncServiceIfNotSet(ISyncService syncService) {
- // if (this.syncService == null)
- // this.syncService = syncService;
- // }
}