Skip to content

Commit

Permalink
Add Job.shutdownIfCompleted API method. (#1033)
Browse files Browse the repository at this point in the history
  • Loading branch information
splhack committed Jan 17, 2022
1 parent 4200dbb commit 9e03b95
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

/*
* Copyright Contributors to the OpenCue Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



package com.imageworks.spcue.dispatcher.commands;

import com.imageworks.spcue.JobInterface;
import com.imageworks.spcue.Source;
import com.imageworks.spcue.service.JobManagerSupport;

/**
* A command for shutting down a job if it is completed.
* This is a workaround for when Cuebot failed to shutdown a job due to database access error.
*
* @category command
*/
public class DispatchShutdownJobIfCompleted implements Runnable {
private JobInterface job;

private JobManagerSupport jobManagerSupport;
public DispatchShutdownJobIfCompleted(JobInterface job, JobManagerSupport jobManagerSupport) {
this.job = job;
this.jobManagerSupport = jobManagerSupport;
}

public void run() {
new DispatchCommandTemplate() {
public void wrapDispatchCommand() {
if (jobManagerSupport.isJobComplete(job)) {
jobManagerSupport.shutdownJob(job, new Source("natural"), false);
}
}
}.execute();
}
}

19 changes: 19 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/servant/ManageJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.imageworks.spcue.dispatcher.commands.DispatchReorderFrames;
import com.imageworks.spcue.dispatcher.commands.DispatchRetryFrames;
import com.imageworks.spcue.dispatcher.commands.DispatchSatisfyDepends;
import com.imageworks.spcue.dispatcher.commands.DispatchShutdownJobIfCompleted;
import com.imageworks.spcue.dispatcher.commands.DispatchStaggerFrames;
import com.imageworks.spcue.grpc.comment.Comment;
import com.imageworks.spcue.grpc.job.FrameSeq;
Expand Down Expand Up @@ -134,6 +135,8 @@
import com.imageworks.spcue.grpc.job.JobSetMinGpusResponse;
import com.imageworks.spcue.grpc.job.JobSetPriorityRequest;
import com.imageworks.spcue.grpc.job.JobSetPriorityResponse;
import com.imageworks.spcue.grpc.job.JobShutdownIfCompletedRequest;
import com.imageworks.spcue.grpc.job.JobShutdownIfCompletedResponse;
import com.imageworks.spcue.grpc.job.JobStaggerFramesRequest;
import com.imageworks.spcue.grpc.job.JobStaggerFramesResponse;
import com.imageworks.spcue.grpc.job.LayerSeq;
Expand Down Expand Up @@ -780,6 +783,22 @@ public void reorderFrames(JobReorderFramesRequest request,
}
}

@Override
public void shutdownIfCompleted(JobShutdownIfCompletedRequest request,
StreamObserver<JobShutdownIfCompletedResponse> responseObserver) {
try {
setupJobData(request.getJob());
manageQueue.execute(new DispatchShutdownJobIfCompleted(job, jobManagerSupport));
responseObserver.onNext(JobShutdownIfCompletedResponse.newBuilder().build());
responseObserver.onCompleted();
}
catch (EmptyResultDataAccessException e) {
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to find job data")
.asRuntimeException());
}
}

@Override
public void staggerFrames(JobStaggerFramesRequest request,
StreamObserver<JobStaggerFramesResponse> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ public void satisfyWhatDependsOn(FrameSearchInterface request) {
}
}

public boolean isJobComplete(JobInterface job) {
return jobManager.isJobComplete(job);
}

/*
* Destructive functions require a extra Source argument which contains
* information about the user making the call. This information is
Expand Down
11 changes: 11 additions & 0 deletions proto/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ service JobInterface {
// Sets the job priority
rpc SetPriority(JobSetPriorityRequest) returns (JobSetPriorityResponse);

// Shutdown the job if it is completed. This is a workaround for when
// Cuebot failed to shutdown a job due to database access error.
rpc ShutdownIfCompleted(JobShutdownIfCompletedRequest) returns (JobShutdownIfCompletedResponse);

// Staggers the specified frame range
rpc StaggerFrames(JobStaggerFramesRequest) returns (JobStaggerFramesResponse);
}
Expand Down Expand Up @@ -1427,6 +1431,13 @@ message JobSetPriorityRequest {

message JobSetPriorityResponse {} // Empty

// ShutdownIfCompleted
message JobShutdownIfCompletedRequest {
Job job = 1;
}

message JobShutdownIfCompletedResponse {} // Empty

// StaggerFrames
message JobStaggerFramesRequest {
Job job = 1;
Expand Down
5 changes: 5 additions & 0 deletions pycue/opencue/wrappers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,11 @@ def maxRss(self):
:return: most memory used by any frame in kB"""
return self.data.job_stats.max_rss

def shutdownIfCompleted(self):
"""Shutdown the job if it is completed."""
self.stub.ShutdownIfCompleted(job_pb2.JobShutdownIfCompletedRequest(job=self.data),
timeout=Cuebot.Timeout)


class NestedJob(Job):
"""This class contains information and actions related to a nested job."""
Expand Down

0 comments on commit 9e03b95

Please sign in to comment.