Skip to content

Commit

Permalink
Add Proc's child PIDs to Host report stats (#1130)
Browse files Browse the repository at this point in the history
* Add Proc's child PIDs to Host report stats

Users require additional information about the running frame.data
pid. For each parent process (frame.pid) add to report.proto the
child process: name, rss, vsize, state, cmdline, pid. This
additional info will get stored in the Proc table, while proc
is running; users can view child proc stats via Cuegui and rqlog
will output the highest recorded values for rss for each child
pid.

* Fix Proc Pylint errors

* Add more exit codes to Frame state waiting

When determining the frame state cuebot
needs to make sure that certain exit statuses
put the frame state back into waiting, this
will help save time for users when a frame
fails for host hardware issues.

(cherry picked from commit cbeda9387b09a8713bb76d9075fdee72c3c795f1)

* Add more exit codes to Frame state waiting

* Removed unused method updateFrameHostDown

* Removed deprecated oracle FrameDaoJdbc file

* Remove log debugging from FrameCompleteHandler

* Fixes to RQD and unittests

* Remove unrelated changes from FrameDao

* Fix pylint errors for rqmachine.py

* Remove unrelated frame changes from pycue

* Removing unrelated change from DispatchSupportService

* Fix merged conflicts for ProcDaoTests
  • Loading branch information
roulaoregan-spi committed Apr 28, 2022
1 parent 7647841 commit 5536904
Show file tree
Hide file tree
Showing 26 changed files with 460 additions and 34 deletions.
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.15
0.17
1 change: 1 addition & 0 deletions cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class VirtualProc extends FrameEntity implements ProcInterface {
public String frameId;
public String hostName;
public String os;
public byte[] childProcesses;

public int coresReserved;
public long memoryReserved;
Expand Down
4 changes: 2 additions & 2 deletions cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ public interface ProcDao {
* @param maxKb
*/
void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss,
long vsize, long maxVsize,
long usedGpuMemory, long maxUsedGpuMemory);
long vsize, long maxVsize, long usedGpuMemory,
long maxUsedGpuMemory, byte[] children);

/**
* get aq virual proc from its unique id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class FrameDaoJdbc extends JdbcDaoSupport implements FrameDao {
"int_version = int_version + 1, " +
"int_total_past_core_time = int_total_past_core_time + " +
"round(INTERVAL_TO_SECONDS(current_timestamp - ts_started) * int_cores / 100)," +
"int_total_past_gpu_time = int_total_past_gpu_time + " +
"int_total_past_gpu_time = int_total_past_gpu_time + " +
"round(INTERVAL_TO_SECONDS(current_timestamp - ts_started) * int_gpus) " +
"WHERE " +
"frame.pk_frame = ? " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.imageworks.spcue.dao.postgres;


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -29,6 +32,7 @@
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import org.springframework.jdbc.core.PreparedStatementCreator;

import com.imageworks.spcue.FrameInterface;
import com.imageworks.spcue.HostInterface;
Expand Down Expand Up @@ -236,13 +240,14 @@ public boolean clearVirtualProcAssignment(FrameInterface frame) {
"int_virt_max_used = ?, " +
"int_gpu_mem_used = ?, " +
"int_gpu_mem_max_used = ?, " +
"bytea_children = ?, " +
"ts_ping = current_timestamp " +
"WHERE " +
"pk_frame = ?";

@Override
public void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss,
long vss, long maxVss, long usedGpuMemory, long maxUsedGpuMemory) {
long vss, long maxVss, long usedGpuMemory, long maxUsedGpuMemory, byte[] children) {
/*
* This method is going to repeat for a proc every 1 minute, so
* if the proc is being touched by another thread, then return
Expand All @@ -261,7 +266,26 @@ public void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss,
rss, maxRss, vss, maxVss,
usedGpuMemory, maxUsedGpuMemory, f.getFrameId());
}
} catch (DataAccessException dae) {
getJdbcTemplate().update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(Connection conn)
throws SQLException {
PreparedStatement updateProc = conn.prepareStatement(
UPDATE_PROC_MEMORY_USAGE);
updateProc.setLong(1, rss);
updateProc.setLong(2, maxRss);
updateProc.setLong(3, vss);
updateProc.setLong(4, maxVss);
updateProc.setLong(5, usedGpuMemory);
updateProc.setLong(6, maxUsedGpuMemory);
updateProc.setBytes(7, children);
updateProc.setString(8, f.getFrameId());
return updateProc;
}
}
);
}
catch (DataAccessException dae) {
logger.info("The proc for frame " + f +
" could not be updated with new memory stats: " + dae);
}
Expand Down Expand Up @@ -295,6 +319,7 @@ public VirtualProc mapRow(ResultSet rs, int rowNum) throws SQLException {
proc.unbooked = rs.getBoolean("b_unbooked");
proc.isLocalDispatch = rs.getBoolean("b_local");
proc.os = rs.getString("str_os");
proc.childProcesses = rs.getBytes("bytea_children");
return proc;
}
};
Expand All @@ -319,6 +344,7 @@ public VirtualProc mapRow(ResultSet rs, int rowNum) throws SQLException {
"proc.int_gpu_mem_reserved,"+
"proc.int_gpu_mem_max_used,"+
"proc.int_gpu_mem_used,"+
"proc.bytea_children,"+
"proc.int_virt_max_used,"+
"proc.int_virt_used,"+
"host.str_name AS host_name, " +
Expand Down Expand Up @@ -571,7 +597,8 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {
"int_virt_max_used,"+
"int_virt_used,"+
"host_name, " +
"str_os " +
"str_os, " +
"bytea_children " +
"FROM ("
+ GET_VIRTUAL_PROC + " " +
"AND " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
Expand Down Expand Up @@ -547,13 +548,15 @@ public ProcSeq getProcs(HostInterface host) {
r.filterByHost(host);
r.sortByHostName();
r.sortByDispatchedTime();
logger.info("!!!! INSIDE getProcs Whiteboard!!! called getProcs !!! line 551");
return ProcSeq.newBuilder().addAllProcs(getProcs(r).getProcsList()).build();
}

@Override
public ProcSeq getProcs(ProcSearchInterface p) {
p.sortByHostName();
p.sortByDispatchedTime();
logger.info("!!!! Inside getPROCS!!!!! line 559");
List<Proc> procs = getJdbcTemplate().query(p.getFilteredQuery(GET_PROC),
PROC_MAPPER, p.getValuesArray());
return ProcSeq.newBuilder().addAllProcs(procs).build();
Expand Down Expand Up @@ -969,9 +972,11 @@ public Proc mapRow(ResultSet rs, int row) throws SQLException {
SqlUtil.getString(rs,"str_log_dir"), SqlUtil.getString(rs,"job_name"),
SqlUtil.getString(rs,"frame_name")))
.setRedirectTarget(SqlUtil.getString(rs, "str_redirect"))
.setChildProcesses(SqlUtil.getByteString(rs, "bytea_children"))
.addAllServices(Arrays.asList(SqlUtil.getString(rs,"str_services").split(",")))
.build();
}
// logger.info("called ROW MAPPER!!! setChildProcesses!!!");
};

public static final RowMapper<Task> TASK_MAPPER =
Expand Down Expand Up @@ -1609,6 +1614,7 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException {
"proc.ts_booked, " +
"proc.ts_dispatched, " +
"proc.b_unbooked, " +
"proc.bytea_children, " +
"redirect.str_name AS str_redirect " +
"FROM proc " +
"JOIN host ON proc.pk_host = host.pk_host " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,9 @@ void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRs
* @param usedGpuMemory
* @param maxUsedGpuMemory
*/
void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long vsize,
long maxVsize, long usedGpuMemory, long maxUsedGpuMemory);
void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss,
long vsize, long maxVsize, long usedGpuMemory,
long maxUsedGpuMemory, byte[] children);

/**
* Return true if adding the given core units would put the show
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,10 @@ public void lostProc(VirtualProc proc, String reason, int exitStatus) {
@Override
@Transactional(propagation = Propagation.REQUIRED)
public void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss,
long vsize, long maxVsize,
long usedGpuMemory, long maxUsedGpuMemory) {
long vsize, long maxVsize, long usedGpuMemory,
long maxUsedGpuMemory, byte[] children) {
procDao.updateProcMemoryUsage(frame, rss, maxRss, vsize, maxVsize,
usedGpuMemory, maxUsedGpuMemory);
usedGpuMemory, maxUsedGpuMemory, children);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,9 @@ private void updateMemoryUsageAndLluTime(List<RunningFrameInfo> rFrames) {
dispatchSupport.updateFrameMemoryUsageAndLluTime(frame,
rf.getRss(), rf.getMaxRss(), rf.getLluTime());

dispatchSupport.updateProcMemoryUsage(frame,
rf.getRss(), rf.getMaxRss(), rf.getVsize(), rf.getMaxVsize(),
rf.getUsedGpuMemory(), rf.getMaxUsedGpuMemory());
dispatchSupport.updateProcMemoryUsage(frame, rf.getRss(), rf.getMaxRss(),
rf.getVsize(), rf.getMaxVsize(), rf.getUsedGpuMemory(),
rf.getMaxUsedGpuMemory(), rf.getChildren().toByteArray());
}

updateJobMemoryUsage(rFrames);
Expand Down
11 changes: 11 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/util/SqlUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package com.imageworks.spcue.util;

import com.google.protobuf.ByteString;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
Expand Down Expand Up @@ -132,5 +134,14 @@ public static String getString(ResultSet rs, int index) throws SQLException {
return value;
}
}

public static ByteString getByteString(ResultSet rs, String field) throws SQLException {
byte[] data = rs.getBytes(field);
if (rs.wasNull()) {
return ByteString.copyFrom("".getBytes());
} else {
return ByteString.copyFrom(data);
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE proc
ADD COLUMN bytea_children BYTEA;
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,9 @@ public void testUpdateProcMemoryUsage() {

procDao.insertVirtualProc(proc);
procDao.verifyRunningProc(proc.getId(), frame.getId());
byte[] children = new byte[100];

procDao.updateProcMemoryUsage(frame, 100, 100, 1000, 1000, 0, 0);
procDao.updateProcMemoryUsage(frame, 100, 100, 1000, 1000, 0, 0, children);

}

Expand Down Expand Up @@ -584,16 +585,17 @@ public void testFindReservedMemoryOffender() {
int i = 1;
List<DispatchFrame> frames = dispatcherDao.findNextDispatchFrames(job, host, 6);
assertEquals(6, frames.size());

byte[] children = new byte[100];
for (DispatchFrame frame: frames) {

VirtualProc proc = VirtualProc.build(host, frame);
proc.childProcesses = children;
frame.minMemory = Dispatcher.MEM_RESERVED_DEFAULT;
dispatcher.dispatch(frame, proc);

// Increase the memory usage as frames are added
procDao.updateProcMemoryUsage(frame,
1000*i, 1000*i, 1000*i, 1000*i, 0, 0);
1000*i, 1000*i, 1000*i, 1000*i, 0, 0, children);
i++;
}

Expand Down Expand Up @@ -666,7 +668,8 @@ public void testBalanceUnderUtilizedProcs() {
proc1.frameId = frame1.id;
procDao.insertVirtualProc(proc1);

procDao.updateProcMemoryUsage(frame1, 250000, 250000, 250000, 250000, 0, 0);
byte[] children = new byte[100];
procDao.updateProcMemoryUsage(frame1, 250000, 250000, 250000, 250000, 0, 0, children);
layerDao.updateLayerMaxRSS(frame1, 250000, true);

FrameDetail frameDetail2 = frameDao.findFrameDetail(job, "0002-pass_1");
Expand All @@ -676,7 +679,7 @@ public void testBalanceUnderUtilizedProcs() {
proc2.frameId = frame2.id;
procDao.insertVirtualProc(proc2);

procDao.updateProcMemoryUsage(frame2, 255000, 255000,255000, 255000, 0, 0);
procDao.updateProcMemoryUsage(frame2, 255000, 255000,255000, 255000, 0, 0, children);
layerDao.updateLayerMaxRSS(frame2, 255000, true);

FrameDetail frameDetail3 = frameDao.findFrameDetail(job, "0003-pass_1");
Expand All @@ -686,7 +689,7 @@ public void testBalanceUnderUtilizedProcs() {
proc3.frameId = frame3.id;
procDao.insertVirtualProc(proc3);

procDao.updateProcMemoryUsage(frame3, 3145728, 3145728,3145728, 3145728, 0, 0);
procDao.updateProcMemoryUsage(frame3, 3145728, 3145728,3145728, 3145728, 0, 0, children);
layerDao.updateLayerMaxRSS(frame3,300000, true);

procDao.balanceUnderUtilizedProcs(proc3, 100000);
Expand Down Expand Up @@ -797,6 +800,7 @@ public void getProcsBySearch() {
proc.frameId = f.id;
proc.layerId = f.layerId;
proc.showId = f.showId;
proc.childProcesses = "".getBytes();
procDao.insertVirtualProc(proc);
}

Expand Down
1 change: 1 addition & 0 deletions cuegui/cuegui/FrameMonitorTree.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,3 +907,4 @@ def __init__(self, widget, filterSelectedLayersCallback):
self.__menuActions.frames().addAction(self, "eat")
self.__menuActions.frames().addAction(self, "kill")
self.__menuActions.frames().addAction(self, "eatandmarkdone")
self.__menuActions.frames().addAction(self, "viewRunning")
13 changes: 13 additions & 0 deletions cuegui/cuegui/MenuActions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import cuegui.LocalBooking
import cuegui.Logger
import cuegui.PreviewWidget
import cuegui.ProcChildren
import cuegui.ServiceDialog
import cuegui.ShowDialog
import cuegui.TasksDialog
Expand Down Expand Up @@ -871,6 +872,18 @@ def viewLastLog(self, rpcObjects=None):
else:
cuegui.Utils.popupView(path)

viewRunning_info = ["View Running", None, "viewRunning"]

def viewRunning(self):
""" Display a Proc's child processes Host statistics."""
job = self._getSource()
text = "Displaying host stats for each child process for job:\n%s" % job.name()
title = "View Running Child Proc Host Stats"
procDialog = cuegui.ProcChildren.ProcChildrenDialog(job=job,
text=text,
title=title)
procDialog.exec_()

useLocalCores_info = ["Use local cores...",
"Set a single frame to use the local desktop cores.",
"configure"]
Expand Down

0 comments on commit 5536904

Please sign in to comment.