Skip to content

Commit

Permalink
Add balanced mode to dispatch query (#1103)
Browse files Browse the repository at this point in the history
Dispatch balanced mode switch. When turned on, the dispatch query will use a formula that makes sure smaller jobs don't get starved by large jobs. This mode also disregards the layer_limit feature for performance implications.
the formula takes into account the number of required cores and the time the
job has been waiting on queue.  

Formula:
```
  rank = priority + (100 * (1 - (job.cores/job.int_min_cores))) + (age in days)
```

This heuristic favours scenarios with a large amount of jobs and hosts and
when turned off, only job priorities are taken into account, making the queue more predictable.
  • Loading branch information
DiegoTavares committed Apr 11, 2022
1 parent c570483 commit 351c9ae
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 50 deletions.
23 changes: 18 additions & 5 deletions cuebot/src/main/java/com/imageworks/spcue/dao/DispatcherDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,31 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, DispatchHost ho
int limit);

/**
* Return whether FIFO scheduling is enabled or not in the same priority for unittest.
* Return Scheduling Mode selected
*
* @return
*/
boolean getFifoSchedulingEnabled();
SchedulingMode getSchedulingMode();

/**
* Set whether FIFO scheduling is enabled or not in the same priority for unittest.
* Set Scheduling Mode.
*
* @param fifoSchedulingEnabled
* @param schedulingMode
*/
void setFifoSchedulingEnabled(boolean fifoSchedulingEnabled);
void setSchedulingMode(SchedulingMode schedulingMode);

/**
* - PRIORITY_ONLY: Sort by priority only
* - FIFO: Whether or not to enable FIFO scheduling in the same priority.
* - BALANCED: Use a rank formula that takes into account time waiting, and number
* of cores required: rank = priority + (100 * (1 - (job.cores/job.int_min_cores))) + age in days
*/
enum SchedulingMode {
PRIORITY_ONLY,
FIFO,
BALANCED
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

public class DispatchQuery {

public static final String FIND_JOBS_BY_SHOW =
"/* FIND_JOBS_BY_SHOW */ " +
public static final String FIND_JOBS_BY_SHOW_PRIORITY_MODE =
"/* FIND_JOBS_BY_SHOW_PRIORITY_MODE */ " +
"SELECT pk_job, int_priority, rank FROM ( " +
"SELECT " +
"ROW_NUMBER() OVER (ORDER BY job_resource.int_priority DESC) AS rank, " +
Expand Down Expand Up @@ -101,16 +101,91 @@ public class DispatchQuery {
") " +
") AS t1 WHERE rank < ?";

// sort = priority + (100 * (1 - (job.cores/job.int_min_cores))) + (age in days) */
public static final String FIND_JOBS_BY_SHOW_BALANCED_MODE =
"/* FIND_JOBS_BY_SHOW_BALANCED_MODE */ " +
"SELECT pk_job, int_priority, rank FROM ( " +
"SELECT " +
"ROW_NUMBER() OVER (ORDER BY int_priority DESC) AS rank, " +
"pk_job, " +
"int_priority " +
"FROM ( " +
"SELECT DISTINCT " +
"job.pk_job as pk_job, " +
"CAST( " +
"job_resource.int_priority + ( " +
"100 * (CASE WHEN job_resource.int_min_cores <= 0 THEN 0 " +
"ELSE " +
"CASE WHEN job_resource.int_cores > job_resource.int_min_cores THEN 0 " +
"ELSE 1 - job_resource.int_cores/job_resource.int_min_cores " +
"END " +
"END) " +
") + ( " +
"(DATE_PART('days', NOW()) - DATE_PART('days', job.ts_updated)) " +
") as INT) as int_priority " +
"FROM " +
"job , " +
"job_resource , " +
"folder , " +
"folder_resource, " +
"point , " +
"layer , " +
"layer_stat , " +
"host " +
"WHERE " +
"job.pk_job = job_resource.pk_job " +
"AND job.pk_folder = folder.pk_folder " +
"AND folder.pk_folder = folder_resource.pk_folder " +
"AND folder.pk_dept = point.pk_dept " +
"AND folder.pk_show = point.pk_show " +
"AND job.pk_job = layer.pk_job " +
"AND job_resource.pk_job = job.pk_job " +
"AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN layer_stat.pk_layer ELSE NULL END) = layer.pk_layer " +
"AND " +
"(" +
"folder_resource.int_max_cores = -1 " +
"OR " +
"folder_resource.int_cores + layer.int_cores_min < folder_resource.int_max_cores " +
") " +
"AND job.str_state = 'PENDING' " +
"AND job.b_paused = false " +
"AND job.pk_show = ? " +
"AND job.pk_facility = ? " +
"AND " +
"(" +
"job.str_os IS NULL OR job.str_os = '' " +
"OR " +
"job.str_os = ? " +
") " +
"AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " +
"AND layer.int_cores_min <= ? " +
"AND layer.int_mem_min <= ? " +
"AND (CASE WHEN layer.b_threadable = true THEN 1 ELSE 0 END) >= ? " +
"AND layer.int_gpus_min <= ? " +
"AND layer.int_gpu_mem_min BETWEEN ? AND ? " +
"AND job_resource.int_cores + layer.int_cores_min <= job_resource.int_max_cores " +
"AND host.str_tags ~* ('(?x)' || layer.str_tags) " +
"AND host.str_name = ? " +
") AS t1 ) AS t2 WHERE rank < ?";


public static final String FIND_JOBS_BY_GROUP =
FIND_JOBS_BY_SHOW
public static final String FIND_JOBS_BY_GROUP_PRIORITY_MODE =
FIND_JOBS_BY_SHOW_PRIORITY_MODE
.replace(
"FIND_JOBS_BY_SHOW",
"FIND_JOBS_BY_GROUP")
.replace(
"AND job.pk_show = ? ",
"AND job.pk_folder = ? ");

public static final String FIND_JOBS_BY_GROUP_BALANCED_MODE =
FIND_JOBS_BY_SHOW_BALANCED_MODE
.replace(
"FIND_JOBS_BY_SHOW",
"FIND_JOBS_BY_GROUP")
.replace(
"AND job.pk_show = ? ",
"AND job.pk_folder = ? ");

private static final String replaceQueryForFifo(String query) {
return query
Expand All @@ -125,8 +200,8 @@ private static final String replaceQueryForFifo(String query) {
"WHERE rank < ? ORDER BY rank");
}

public static final String FIND_JOBS_FIFO_BY_SHOW = replaceQueryForFifo(FIND_JOBS_BY_SHOW);
public static final String FIND_JOBS_FIFO_BY_GROUP = replaceQueryForFifo(FIND_JOBS_BY_GROUP);
public static final String FIND_JOBS_BY_SHOW_FIFO_MODE = replaceQueryForFifo(FIND_JOBS_BY_SHOW_PRIORITY_MODE);
public static final String FIND_JOBS_BY_GROUP_FIFO_MODE = replaceQueryForFifo(FIND_JOBS_BY_GROUP_PRIORITY_MODE);

/**
* Dispatch a host in local booking mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,7 @@
import com.imageworks.spcue.grpc.host.ThreadMode;
import com.imageworks.spcue.util.CueUtil;

import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_JOB_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_JOB_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_LAYER_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_LAYER_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_GROUP;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_LOCAL;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_SHOW;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_FIFO_BY_GROUP;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_FIFO_BY_SHOW;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_SHOWS;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_UNDER_PROCED_JOB_BY_FACILITY;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.HIGHER_PRIORITY_JOB_BY_FACILITY_EXISTS;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.*;


/**
Expand Down Expand Up @@ -130,24 +115,24 @@ public List<SortableShow> getShows() {
new ConcurrentHashMap<String, ShowCache>();

/**
* Whether or not to enable FIFO scheduling in the same priority.
* Choose between different scheduling strategies
*/
private boolean fifoSchedulingEnabled;
private SchedulingMode schedulingMode;

@Autowired
public DispatcherDaoJdbc(Environment env) {
fifoSchedulingEnabled = env.getProperty(
"dispatcher.fifo_scheduling_enabled", Boolean.class, false);
this.schedulingMode = SchedulingMode.valueOf(env.getProperty(
"dispatcher.scheduling_mode", String.class, "PRIORITY_ONLY"));
}

@Override
public boolean getFifoSchedulingEnabled() {
return fifoSchedulingEnabled;
public SchedulingMode getSchedulingMode() {
return schedulingMode;
}

@Override
public void setFifoSchedulingEnabled(boolean fifoSchedulingEnabled) {
this.fifoSchedulingEnabled = fifoSchedulingEnabled;
public void setSchedulingMode(SchedulingMode schedulingMode) {
this.schedulingMode = schedulingMode;
}

/**
Expand Down Expand Up @@ -211,7 +196,7 @@ private List<String> findDispatchJobs(DispatchHost host, int numJobs, boolean sh
}

result.addAll(getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_SHOW : FIND_JOBS_BY_SHOW,
findByShowQuery(),
PKJOB_MAPPER,
s.getShowId(), host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
Expand All @@ -233,6 +218,24 @@ private List<String> findDispatchJobs(DispatchHost host, int numJobs, boolean sh

}

private String findByShowQuery() {
switch (schedulingMode) {
case PRIORITY_ONLY: return FIND_JOBS_BY_SHOW_PRIORITY_MODE;
case FIFO: return FIND_JOBS_BY_SHOW_FIFO_MODE;
case BALANCED: return FIND_JOBS_BY_SHOW_BALANCED_MODE;
default: return FIND_JOBS_BY_SHOW_PRIORITY_MODE;
}
}

private String findByGroupQuery() {
switch (schedulingMode) {
case PRIORITY_ONLY: return FIND_JOBS_BY_GROUP_PRIORITY_MODE;
case FIFO: return FIND_JOBS_BY_GROUP_FIFO_MODE;
case BALANCED: return FIND_JOBS_BY_GROUP_BALANCED_MODE;
default: return FIND_JOBS_BY_GROUP_PRIORITY_MODE;
}
}

@Override
public List<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs) {
return findDispatchJobs(host, numJobs, true);
Expand All @@ -246,7 +249,7 @@ public List<String> findDispatchJobs(DispatchHost host, int numJobs) {
@Override
public List<String> findDispatchJobs(DispatchHost host, GroupInterface g) {
List<String> result = getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_GROUP : FIND_JOBS_BY_GROUP,
findByGroupQuery(),
PKJOB_MAPPER,
g.getGroupId(),host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
Expand Down Expand Up @@ -406,7 +409,7 @@ public boolean higherPriorityJobExists(JobDetail baseJob, VirtualProc proc) {
public List<String> findDispatchJobs(DispatchHost host,
ShowInterface show, int numJobs) {
List<String> result = getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_SHOW : FIND_JOBS_BY_SHOW,
findByShowQuery(),
PKJOB_MAPPER,
show.getShowId(), host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
Expand Down
9 changes: 7 additions & 2 deletions cuebot/src/main/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ dispatcher.frame_query_max=20
dispatcher.job_frame_dispatch_max=8
# Maximum number of frames to dispatch from a host at one time.
dispatcher.host_frame_dispatch_max=12
# Whether or not to enable FIFO scheduling in the same priority.
dispatcher.fifo_scheduling_enabled=false
# Choose between different scheduling strategies:
# - PRIORITY_ONLY: Sort by priority only
# - FIFO: Whether or not to enable FIFO scheduling in the same priority.
# - BALANCED: Use a rank formula that takes into account time waiting, and number
# of cores required: rank = priority + (100 * (1 - (job.cores/job.int_min_cores))) + age in days
# layer limiting is also disabled in this mode for performance reasons
dispatcher.scheduling_mode=PRIORITY_ONLY

# Number of threads to keep in the pool for launching job.
dispatcher.launch_queue.core_pool_size=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ private void launchJobs(int count) throws Exception {

@Before
public void launchJob() {
dispatcherDao.setFifoSchedulingEnabled(true);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.FIFO);

dispatcher.setTestMode(true);
jobLauncher.testMode = true;
}

@After
public void resetFifoScheduling() {
dispatcherDao.setFifoSchedulingEnabled(false);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.PRIORITY_ONLY);
}

@Before
Expand Down Expand Up @@ -171,11 +171,11 @@ public void createHost() {
@Transactional
@Rollback(true)
public void testFifoSchedulingEnabled() {
assertTrue(dispatcherDao.getFifoSchedulingEnabled());
dispatcherDao.setFifoSchedulingEnabled(false);
assertFalse(dispatcherDao.getFifoSchedulingEnabled());
dispatcherDao.setFifoSchedulingEnabled(true);
assertTrue(dispatcherDao.getFifoSchedulingEnabled());
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.FIFO);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.PRIORITY_ONLY);
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.PRIORITY_ONLY);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.FIFO);
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.FIFO);
}

@Test
Expand Down Expand Up @@ -213,8 +213,7 @@ public void testPortionSorted() throws Exception {
@Transactional
@Rollback(true)
public void testFifoSchedulingDisabled() throws Exception {
dispatcherDao.setFifoSchedulingEnabled(false);
assertFalse(dispatcherDao.getFifoSchedulingEnabled());
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.PRIORITY_ONLY);

int count = 10;
launchJobs(count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,6 @@ public void testHigherPriorityJobExistsMaxProcBound() {
@Transactional
@Rollback(true)
public void testFifoSchedulingEnabled() {
assertFalse(dispatcherDao.getFifoSchedulingEnabled());
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.PRIORITY_ONLY);
}
}

0 comments on commit 351c9ae

Please sign in to comment.