Skip to content

Commit

Permalink
[cuebot] Combine frame usage and memory usage updates. (#1006)
Browse files Browse the repository at this point in the history
  • Loading branch information
splhack committed Sep 12, 2021
1 parent 4deedac commit da28ae9
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 93 deletions.
18 changes: 3 additions & 15 deletions cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,31 +307,19 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu
ResourceUsage getResourceUsage(FrameInterface f);

/**
* Update Frame usage values for the given frame. The
* frame must be in the Running state. If the frame
* is locked by another thread, the process is aborted because
* we'll most likely get a new update one minute later.
*
* @param f
* @param lluTime
* @throws FrameReservationException if the frame is locked
* by another thread.
*/
void updateFrameUsage(FrameInterface f, long lluTime);

/**
* Update memory usage values for the given frame. The
* Update memory usage values and LLU time for the given frame. The
* frame must be in the Running state. If the frame
* is locked by another thread, the process is aborted because
* we'll most likely get a new update one minute later.
*
* @param f
* @param maxRss
* @param rss
* @param lluTime
* @throws FrameReservationException if the frame is locked
* by another thread.
*/
void updateFrameMemoryUsage(FrameInterface f, long maxRss, long rss);
void updateFrameMemoryUsageAndLluTime(FrameInterface f, long maxRss, long rss, long lluTime);

/**
* Attempt to put a exclusive row lock on the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,35 +966,22 @@ public ResourceUsage getResourceUsage(FrameInterface f) {
"pk_frame = ?", RESOURCE_USAGE_MAPPER, f.getFrameId());
}

private static final String UPDATE_FRAME_IO_USAGE =
"UPDATE " +
"frame " +
"SET " +
"ts_updated = current_timestamp," +
"ts_llu = ? " +
"WHERE " +
"pk_frame = ? ";

@Override
public void updateFrameUsage(FrameInterface f, long lluTime) {
getJdbcTemplate().update(UPDATE_FRAME_IO_USAGE,
new Timestamp(lluTime * 1000l), f.getFrameId());
}

private static final String UPDATE_FRAME_MEMORY_USAGE =
private static final String UPDATE_FRAME_MEMORY_USAGE_AND_LLU_TIME =
"UPDATE " +
"frame " +
"SET " +
"ts_updated = current_timestamp," +
"int_mem_max_used = ?," +
"int_mem_used = ? " +
"int_mem_used = ?," +
"ts_llu = ? " +
"WHERE " +
"pk_frame = ? ";

@Override
public void updateFrameMemoryUsage(FrameInterface f, long maxRss, long rss) {
getJdbcTemplate().update(UPDATE_FRAME_MEMORY_USAGE,
maxRss, rss, f.getFrameId());
public void updateFrameMemoryUsageAndLluTime(FrameInterface f, long maxRss, long rss,
long lluTime) {
getJdbcTemplate().update(UPDATE_FRAME_MEMORY_USAGE_AND_LLU_TIME,
maxRss, rss, new Timestamp(lluTime * 1000l), f.getFrameId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,21 +416,15 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
void clearFrame(DispatchFrame frame);

/**
* Update usage data for the given frame.
*
* @param frame
* @param lluTime
*/
void updateFrameUsage(FrameInterface frame, long lluTime);

/**
* Update memory usage data for the given frame.
* Update Memory usage data and LLU time for the given frame.
*
* @param frame
* @param rss
* @param maxRss
* @param lluTime
*/
void updateFrameMemoryUsage(FrameInterface frame, long rss, long maxRss);
void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRss,
long lluTime);

/**
* Update memory usage data for a given frame's proc record. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,33 +543,18 @@ public void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss,

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void updateFrameUsage(FrameInterface frame, long lluTime) {
public void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRss,
long lluTime) {

try {
frameDao.updateFrameUsage(frame, lluTime);
frameDao.updateFrameMemoryUsageAndLluTime(frame, maxRss, rss, lluTime);
}
catch (FrameReservationException ex) {
// Eat this, the frame was not in the correct state or
// was locked by another thread. The only reason it would
// be locked by another thread would be if the state is
// changing.
logger.warn("failed to update io stats for frame: " + frame);
}
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void updateFrameMemoryUsage(FrameInterface frame, long rss, long maxRss) {

try {
frameDao.updateFrameMemoryUsage(frame, maxRss, rss);
}
catch (FrameReservationException ex) {
// Eat this, the frame was not in the correct state or
// was locked by another thread. The only reason it would
// be locked by another thread would be if the state is
// changing.
logger.warn("failed to update memory stats for frame: " + frame);
logger.warn("failed to update memory usage and LLU time for frame: " + frame);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,9 @@ public void handleHostReport(HostReport report, boolean isBoot) {

/*
* Updates memory usage for the proc, frames,
* jobs, and layers.
* jobs, and layers. And LLU time for the frames.
*/
updateMemoryUsage(report.getFramesList());

/*
* Updates usage for the proc, frames,
* jobs, and layers.
*/
updateFrameUsage(report.getFramesList());
updateMemoryUsageAndLluTime(report.getFramesList());

/*
* kill frames that have over run.
Expand Down Expand Up @@ -545,31 +539,18 @@ private void killTimedOutFrames(HostReport report) {
}

/**
* Update IO usage for the given list of frames.
*
* @param rFrames
*/
private void updateFrameUsage(List<RunningFrameInfo> rFrames) {

for (RunningFrameInfo rf: rFrames) {
FrameInterface frame = jobManager.getFrame(rf.getFrameId());
dispatchSupport.updateFrameUsage(frame, rf.getLluTime());
}
}

/**
* Update memory usage for the given list of frames.
* Update memory usage and LLU time for the given list of frames.
*
* @param rFrames
*/
private void updateMemoryUsage(List<RunningFrameInfo> rFrames) {
private void updateMemoryUsageAndLluTime(List<RunningFrameInfo> rFrames) {

for (RunningFrameInfo rf: rFrames) {

FrameInterface frame = jobManager.getFrame(rf.getFrameId());

dispatchSupport.updateFrameMemoryUsage(frame,
rf.getRss(), rf.getMaxRss());
dispatchSupport.updateFrameMemoryUsageAndLluTime(frame,
rf.getRss(), rf.getMaxRss(), rf.getLluTime());

dispatchSupport.updateProcMemoryUsage(frame,
rf.getRss(), rf.getMaxRss(), rf.getVsize(), rf.getMaxVsize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void filterByMemoryRange() {
i -> {
FrameInterface frame = frameDao.findFrame(layer, i);
frameDao.updateFrameState(frame, FrameState.RUNNING);
frameDao.updateFrameMemoryUsage(frame, CueUtil.GB * 5, CueUtil.GB);
frameDao.updateFrameMemoryUsageAndLluTime(frame, CueUtil.GB * 5, CueUtil.GB, 0);
});

FrameSearchInterface frameSearch = frameSearchFactory.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.imageworks.spcue.test.dispatcher;

import java.io.File;
import java.sql.Timestamp;
import java.util.List;
import javax.annotation.Resource;

import org.junit.Before;
Expand All @@ -32,15 +35,20 @@
import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.dispatcher.HostReportHandler;
import com.imageworks.spcue.FacilityInterface;
import com.imageworks.spcue.FrameDetail;
import com.imageworks.spcue.grpc.host.HardwareState;
import com.imageworks.spcue.grpc.host.LockState;
import com.imageworks.spcue.grpc.report.CoreDetail;
import com.imageworks.spcue.grpc.report.HostReport;
import com.imageworks.spcue.grpc.report.RenderHost;
import com.imageworks.spcue.grpc.report.RunningFrameInfo;
import com.imageworks.spcue.service.AdminManager;
import com.imageworks.spcue.service.HostManager;
import com.imageworks.spcue.service.JobLauncher;
import com.imageworks.spcue.service.JobManager;
import com.imageworks.spcue.test.TransactionalTest;
import com.imageworks.spcue.util.CueUtil;
import com.imageworks.spcue.VirtualProc;

import static org.junit.Assert.assertEquals;

Expand All @@ -59,6 +67,12 @@ public class HostReportHandlerTests extends TransactionalTest {
@Resource
Dispatcher dispatcher;

@Resource
JobLauncher jobLauncher;

@Resource
JobManager jobManager;

private static final String HOSTNAME = "beta";
private static final String NEW_HOSTNAME = "gamma";

Expand Down Expand Up @@ -91,12 +105,12 @@ private static RenderHost getRenderHost() {
.setName(HOSTNAME)
.setBootTime(1192369572)
.setFreeMcp(76020)
.setFreeMem(53500)
.setFreeMem((int) CueUtil.GB8)
.setFreeSwap(20760)
.setLoad(0)
.setTotalMcp(195430)
.setTotalMem(8173264)
.setTotalSwap(20960)
.setTotalMem(CueUtil.GB8)
.setTotalSwap(CueUtil.GB2)
.setNimbyEnabled(false)
.setNumProcs(2)
.setCoresPerProc(100)
Expand Down Expand Up @@ -213,5 +227,41 @@ public void testHandleHostReportWithNonExistentTags() {
DispatchHost host = hostManager.findDispatchHost(NEW_HOSTNAME);
assertEquals(host.getAllocationId(), alloc.id);
}

@Test
@Transactional
@Rollback(true)
public void testMemoryAndLlu() {
jobLauncher.testMode = true;
jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml"));

DispatchHost host = getHost();
List<VirtualProc> procs = dispatcher.dispatchHost(host);
assertEquals(1, procs.size());
VirtualProc proc = procs.get(0);

CoreDetail cores = getCoreDetail(200, 200, 0, 0);
long now = System.currentTimeMillis();

RunningFrameInfo info = RunningFrameInfo.newBuilder()
.setJobId(proc.getJobId())
.setLayerId(proc.getLayerId())
.setFrameId(proc.getFrameId())
.setResourceId(proc.getProcId())
.setLluTime(now / 1000)
.setMaxRss(420000)
.build();
HostReport report = HostReport.newBuilder()
.setHost(getRenderHost())
.setCoreInfo(cores)
.addFrames(info)
.build();

hostReportHandler.handleHostReport(report, false);

FrameDetail frame = jobManager.getFrameDetail(proc.getFrameId());
assertEquals(frame.dateLLU, new Timestamp(now / 1000 * 1000));
assertEquals(420000, frame.maxRss);
}
}

47 changes: 47 additions & 0 deletions cuebot/src/test/resources/conf/jobspec/jobspec_simple.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0"?>
<!--
Copyright Contributors to the OpenCue Project
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->




<!DOCTYPE spec SYSTEM "../dtd/cjsl-1.12.dtd">
<spec>
<facility>spi</facility>
<show>pipe</show>
<shot>default</shot>
<user>testuser</user>
<uid>9860</uid>

<job name="test">
<paused>False</paused>
<maxretries>2</maxretries>
<autoeat>False</autoeat>
<env/>
<layers>
<layer name="test_layer" type="Render">
<cmd>echo hello</cmd>
<range>0</range>
<chunk>1</chunk>
<env/>
<services>
<service>shell</service>
</services>
</layer>
</layers>
</job>
<depends/>
</spec>

0 comments on commit da28ae9

Please sign in to comment.