Skip to content

Commit

Permalink
Optimize bookingqueue (#949)
Browse files Browse the repository at this point in the history
* Update dispatchQuery to use min_cores

Sorting jobs only by priority causes a situation where low priority jobs can get starved by a constant flow of high priority jobs.
The new formula adds a modifier to the sorting rank to take into account the number of cores the job is requesting and also the number of days the job is waiting on the queue.
Priorities numbers over 200 will mostly override the formula and work as a priority only based scheduling.
sort = priority + (100 * (1 - (job.cores/job.int_min_cores))) + (age in days)

Besides that, also take layer_int_cores_min into account when filtering folder_resourse limitations to avoid allocating more cores than the folder limits.

(cherry picked from commit 566411aeeddc60983a30eabe121fd03263d05525)

* Revert "Update dispatchQuery to use min_cores"

This reverts commit 2eb4936

* Avoid repeated bookingQueue tasks

It was observed that the booking performance is degrating over time. Around 4 tasks are submitted for each booking executed. This solution avoid creating more tasks if there's already a task waiting to be executed.

(cherry picked from commit 57ef9ab989c4d82585b31c95d93b969f6e95e4cd)
  • Loading branch information
DiegoTavares committed Apr 6, 2021
1 parent 401a33b commit f833ed6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.imageworks.spcue.dispatcher.commands.DispatchBookHost;
import org.apache.log4j.Logger;

public class BookingQueue extends ThreadPoolExecutor {
Expand All @@ -41,15 +44,25 @@ public class BookingQueue extends ThreadPoolExecutor {

private QueueRejectCounter rejectCounter = new QueueRejectCounter();

private Cache<String, DispatchBookHost> bookingCache = CacheBuilder.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
// Invalidate entries that got executed by the threadpool and lost their reference
.weakValues()
.build();

public BookingQueue(int sleepTimeMs) {
super(THREADS_MINIMUM, THREADS_MAXIMUM, THREADS_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE));
this.baseSleepTimeMillis = sleepTimeMs;
this.setRejectedExecutionHandler(rejectCounter);
}

public void execute(Runnable r) {
if (!isShutdown.get()) {
public void execute(DispatchBookHost r) {
if (isShutdown.get()) {
return;
}
if (bookingCache.getIfPresent(r.getKey()) == null){
bookingCache.put(r.getKey(), r);
super.execute(r);
}
}
Expand Down Expand Up @@ -100,7 +113,13 @@ protected void beforeExecute(Thread t, Runnable r) {

protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);

// Invalidate cache to avoid having to wait for GC to mark processed entries collectible
DispatchBookHost h = (DispatchBookHost)r;
bookingCache.invalidate(h.getKey());

if (sleepTime() < 100) {
logger.info("BookingQueue cleanup executed.");
getQueue().clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,44 @@ public class DispatchBookHost implements Runnable {
private JobInterface job = null;
private DispatchHost host;
private Dispatcher dispatcher;
private String key;

public DispatchHost getDispatchHost() {
this.key = host.getId();
return host;
}

public DispatchBookHost(DispatchHost host, Dispatcher d) {
this.host = host;
this.key = host.getId();
this.dispatcher = d;
}

public DispatchBookHost(DispatchHost host, JobInterface job, Dispatcher d) {
this.host = host;
this.job = job;
this.key = host.getId() + "_job_" + job.getJobId();
this.dispatcher = d;
}

public DispatchBookHost(DispatchHost host, GroupInterface group, Dispatcher d) {
this.host = host;
this.group = group;
this.key = host.getId() + "_group_" + group.getGroupId();
this.dispatcher = d;
}

public DispatchBookHost(DispatchHost host, ShowInterface show, Dispatcher d) {
this.host = host;
this.show = show;
this.key = host.getId() + "_name_" + show.getName();
this.dispatcher = d;
}

public String getKey() {
return this.key;
}

public void run() {
new DispatchCommandTemplate() {
public void wrapDispatchCommand() {
Expand Down

0 comments on commit f833ed6

Please sign in to comment.