Skip to content

Commit

Permalink
[rqd] Core affinity for cache optimization (#1171)
Browse files Browse the repository at this point in the history
When possible, try to book frames from the same Layer on the same core to leverage shared cache
  • Loading branch information
DiegoTavares committed Jul 13, 2022
1 parent 3f974d2 commit 6fafd62
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 83 deletions.
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.18
0.19
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.Map;

import com.imageworks.spcue.dispatcher.AbstractDispatcher;
import org.apache.log4j.Logger;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
Expand Down
2 changes: 1 addition & 1 deletion cuegui/cuegui/ProcChildren.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def update(self):

try:
procs = opencue.api.getProcs(job=[self._job.name()],
layer=[self._layer.name()],
layer=[x.name() for x in self._job.getLayers()],
host=self._hosts)
for proc in procs:
data['children_processes'] =\
Expand Down
7 changes: 7 additions & 0 deletions proto/report.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ message CoreDetail {
int32 idle_cores = 2;
int32 locked_cores = 3;
int32 booked_cores = 4;
//map <key: physid, value: coreid>
map<int64, CoreId> reserved_cores = 5;
}

message CoreId {
repeated int64 coreid = 1;

}

message FrameCompleteReport {
Expand Down
2 changes: 1 addition & 1 deletion pycue/opencue/wrappers/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def coresReserved(self):
:rtype: float
:return: number of cores reserved
"""
return self.data.cores - self.data.idle_ores
return self.data.cores - self.data.idle_cores

def coresIdle(self):
"""Returns the number of cores the host currently has idel.
Expand Down
9 changes: 9 additions & 0 deletions rqd/rqd/rqcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ def __init__(self, optNimbyoff=False):
idle_cores=0,
locked_cores=0,
booked_cores=0,
reserved_cores=[],
)

self.nimby = rqd.rqnimby.NimbyFactory.getNimby(self)
Expand Down Expand Up @@ -732,6 +733,14 @@ def deleteFrame(self, frameId):
try:
if frameId in self.__cache:
del self.__cache[frameId]
# pylint: disable=no-member
if not self.__cache and self.cores.reserved_cores:
# pylint: disable=no-member
log.error(
'No running frames but reserved_cores is not empty: %s',
self.cores.reserved_cores)
# pylint: disable=no-member
self.cores.reserved_cores.clear()
finally:
self.__threadLock.release()

Expand Down
191 changes: 117 additions & 74 deletions rqd/rqd/rqmachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,16 @@ def __init__(self, rqCore, coreInfo):
"""
self.__rqCore = rqCore
self.__coreInfo = coreInfo
self.__tasksets = set()
self.__gpusets = set()

# A dictionary built from /proc/cpuinfo containing
# { <physical id> : { <core_id> : set([<processor>, <processor>, ...]), ... }, ... }
self.__procs_by_physid_and_coreid = {}

# A reverse mapping of the above.
# { <processor> : (<physical id>, <core_id>), ... }
self.__physid_and_coreid_by_proc = {}

if platform.system() == 'Linux':
self.__vmstat = rqd.rqswap.VmStat()

Expand All @@ -103,8 +110,8 @@ def __init__(self, rqCore, coreInfo):

self.__pidHistory = {}

self.setupHT()
self.setupGpu()
self.setupTaskset()

def isNimbySafeToRunJobs(self):
"""Returns False if nimby should be triggered due to resource limits"""
Expand Down Expand Up @@ -207,17 +214,9 @@ def __updateGpuAndLlu(self, frame):
stat = os.stat(frame.runFrame.log_dir_file).st_mtime
frame.lluTime = int(stat)

def _getFields(self, pidFilePath):
fields = []

try:
with open(pidFilePath, "r") as statFile:
fields = statFile.read().split()
# pylint: disable=broad-except
except Exception:
log.warning("Not able to read pidFilePath: %s", pidFilePath)

return fields
def _getStatFields(self, pidFilePath):
with open(pidFilePath, "r") as statFile:
return [None, None] + statFile.read().rsplit(")", 1)[-1].split()

def rssUpdate(self, frames):
"""Updates the rss and maxrss for all running frames"""
Expand Down Expand Up @@ -247,10 +246,8 @@ def rssUpdate(self, frames):
for pid in os.listdir("/proc"):
if pid.isdigit():
try:
with open(rqd.rqconstants.PATH_PROC_PID_STAT
.format(pid), "r") as statFile:
statFields = [None, None] + statFile.read().rsplit(")", 1)[-1].split()

statFields = self._getStatFields(rqd.rqconstants.PATH_PROC_PID_STAT
.format(pid))
pids[pid] = {
"name": statFields[1],
"state": statFields[2],
Expand All @@ -272,24 +269,21 @@ def rssUpdate(self, frames):
p = psutil.Process(int(pid))
pids[pid]["cmd_line"] = p.cmdline()

try:
# 2. Collect Statm file: /proc/[pid]/statm (same as status vsize in kb)
# - size: "total program size"
# - rss: inaccurate, similar to VmRss in /proc/[pid]/status
child_statm_fields = self._getFields(
rqd.rqconstants.PATH_PROC_PID_STATM.format(pid))
pids[pid]['statm_size'] = \
int(re.search("\\d+", child_statm_fields[0]).group()) \
if re.search("\\d+", child_statm_fields[0]) else -1
pids[pid]['statm_rss'] = \
int(re.search("\\d+", child_statm_fields[1]).group()) \
if re.search("\\d+", child_statm_fields[1]) else -1
except rqd.rqexceptions.RqdException as e:
log.warning("Failed to read statm file: %s", e)
# 2. Collect Statm file: /proc/[pid]/statm (same as status vsize in kb)
# - size: "total program size"
# - rss: inaccurate, similar to VmRss in /proc/[pid]/status
child_statm_fields = self._getStatFields(
rqd.rqconstants.PATH_PROC_PID_STATM.format(pid))
pids[pid]['statm_size'] = \
int(re.search(r"\d+", child_statm_fields[0]).group()) \
if re.search(r"\d+", child_statm_fields[0]) else -1
pids[pid]['statm_rss'] = \
int(re.search(r"\d+", child_statm_fields[1]).group()) \
if re.search(r"\d+", child_statm_fields[1]) else -1

# pylint: disable=broad-except
except rqd.rqexceptions.RqdException:
log.exception('Failed to read stat file for pid %s', pid)
except (OSError, IOError):
log.exception('Failed to read stat/statm file for pid %s', pid)

# pylint: disable=too-many-nested-blocks
try:
Expand All @@ -298,10 +292,8 @@ def rssUpdate(self, frames):
bootTime = self.getBootTime()

values = list(frames.values())

for frame in values:
if frame.pid > 0:

session = str(frame.pid)
rss = 0
vsize = 0
Expand Down Expand Up @@ -573,33 +565,50 @@ def __initMachineStats(self, pathCpuInfo=None):
mcpStat = os.statvfs(self.getTempPath())
self.__renderHost.total_mcp = mcpStat.f_blocks * mcpStat.f_frsize // KILOBYTE

# Reset mappings
self.__procs_by_physid_and_coreid = {}
self.__physid_and_coreid_by_proc = {}

# Reads static information from /proc/cpuinfo
with open(pathCpuInfo or rqd.rqconstants.PATH_CPUINFO, "r") as cpuinfoFile:
singleCore = {}
currCore = {}
procsFound = []
for line in cpuinfoFile:
lineList = line.strip().replace("\t","").split(": ")
lineList = line.strip().replace("\t", "").split(": ")
# A normal entry added to the singleCore dictionary
if len(lineList) >= 2:
singleCore[lineList[0]] = lineList[1]
currCore[lineList[0]] = lineList[1]
# The end of a processor block
elif lineList == ['']:
# Check for hyper-threading
hyperthreadingMultiplier = (int(singleCore.get('siblings', '1'))
// int(singleCore.get('cpu cores', '1')))
hyperthreadingMultiplier = (int(currCore.get('siblings', '1'))
// int(currCore.get('cpu cores', '1')))

__totalCores += rqd.rqconstants.CORE_VALUE
if "core id" in singleCore \
and "physical id" in singleCore \
and not singleCore["physical id"] in procsFound:
procsFound.append(singleCore["physical id"])
if "core id" in currCore \
and "physical id" in currCore \
and not currCore["physical id"] in procsFound:
procsFound.append(currCore["physical id"])
__numProcs += 1
elif "core id" not in singleCore:
elif "core id" not in currCore:
__numProcs += 1
singleCore = {}

if 'physical id' in currCore and 'core id' in currCore:
# Keep track of what processors are on which core on
# which physical socket.
procid, physid, coreid = (
currCore['processor'],
currCore['physical id'],
currCore['core id'])
self.__procs_by_physid_and_coreid \
.setdefault(physid, {}) \
.setdefault(coreid, set()).add(procid)
self.__physid_and_coreid_by_proc[procid] = physid, coreid
currCore = {}

# An entry without data
elif len(lineList) == 1:
singleCore[lineList[0]] = ""
currCore[lineList[0]] = ""
else:
hyperthreadingMultiplier = 1

Expand Down Expand Up @@ -782,50 +791,76 @@ def __enabledHT(self):
def __getHyperthreadingMultiplier(self):
return int(self.__renderHost.attributes['hyperthreadingMultiplier'])

def setupHT(self):
def setupTaskset(self):
""" Setup rqd for hyper-threading """

if self.__enabledHT():
self.__tasksets = set(range(self.__coreInfo.total_cores // 100))
self.__coreInfo.reserved_cores.clear()

def setupGpu(self):
""" Setup rqd for Gpus """
self.__gpusets = set(range(self.getGpuCount()))

def reserveHT(self, reservedCores):
def reserveHT(self, frameCores):
""" Reserve cores for use by taskset
taskset -c 0,1,8,9 COMMAND
Not thread save, use with locking.
@type reservedCores: int
@param reservedCores: The total physical cores reserved by the frame.
@type frameCores: int
@param frameCores: The total physical cores reserved by the frame.
@rtype: string
@return: The cpu-list for taskset -c
"""

if not self.__enabledHT():
return None

if reservedCores % 100:
log.debug('Taskset: Can not reserveHT with fractional cores')
if frameCores % 100:
log.warning('Taskset: Can not reserveHT with fractional cores')
return None
log.warning('Taskset: Requesting reserve of %d', (frameCores // 100))

# Look for the most idle physical cpu.
# Prefer to assign cores from the same physical cpu.
# Spread different frames around on different physical cpus.
avail_cores = {}
avail_cores_count = 0
reserved_cores = self.__coreInfo.reserved_cores

for physid, cores in self.__procs_by_physid_and_coreid.items():
for coreid in cores.keys():
if int(physid) in reserved_cores and \
int(coreid) in reserved_cores[int(physid)].coreid:
continue
avail_cores.setdefault(physid, set()).add(coreid)
avail_cores_count += 1

log.debug('Taskset: Requesting reserve of %d', (reservedCores // 100))
remaining_cores = frameCores / 100

if len(self.__tasksets) < reservedCores // 100:
if avail_cores_count < remaining_cores:
err = ('Not launching, insufficient hyperthreading cores to reserve '
'based on reservedCores')
'based on frameCores (%s < %s)') \
% (avail_cores_count, remaining_cores)
log.critical(err)
raise rqd.rqexceptions.CoreReservationFailureException(err)

hyperthreadingMultiplier = self.__getHyperthreadingMultiplier()
tasksets = []
for _ in range(reservedCores // 100):
core = self.__tasksets.pop()
tasksets.append(str(core))
if hyperthreadingMultiplier > 1:
tasksets.append(str(core + self.__coreInfo.total_cores // 100))

log.debug('Taskset: Reserving cores - %s', ','.join(tasksets))
for physid, cores in sorted(
avail_cores.items(),
# Return the physical socket that has
# the most idle cores first.
key=lambda tup: len(tup[1]),
reverse=True):

while remaining_cores > 0 and len(cores) > 0:
coreid = cores.pop()
# Give all the hyperthreads on this core.
# This counts as one core.
reserved_cores[int(physid)].coreid.extend([int(coreid)])
remaining_cores -= 1

for procid in self.__procs_by_physid_and_coreid[physid][coreid]:
tasksets.append(procid)

if remaining_cores == 0:
break

log.warning('Taskset: Reserving procs - %s', ','.join(tasksets))

return ','.join(tasksets)

Expand All @@ -838,13 +873,21 @@ def releaseHT(self, reservedHT):
@param: The cpu-list used for taskset to release. ex: '0,8,1,9'
"""

if not self.__enabledHT():
return None

log.debug('Taskset: Releasing cores - %s', reservedHT)

# Remove these cores from the reserved set.
# Silently ignore any that weren't really reserved or
# aren't valid core identities.
reserved_cores = self.__coreInfo.reserved_cores
for core in reservedHT.split(','):
if int(core) < self.__coreInfo.total_cores // 100:
self.__tasksets.add(int(core))
physical_id_str, core_id_str = self.__physid_and_coreid_by_proc.get(core)
physical_id = int(physical_id_str)
core_id = int(core_id_str)

if physical_id in reserved_cores and core_id in reserved_cores[physical_id].coreid:
reserved_cores[physical_id].coreid.remove(core_id)
if len(reserved_cores[physical_id].coreid) == 0:
del reserved_cores[physical_id]

def reserveGpus(self, reservedGpus):
""" Reserve gpus
Expand Down

0 comments on commit 6fafd62

Please sign in to comment.