Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add qstat -g d #30

Merged
merged 7 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions pkg/qacct/v9.0/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,35 @@ var _ = Describe("File", func() {

It("returns a channel that emits JobDetail objects for 10 jobs", func() {

jobDetailsChan, err := qacct.WatchFile(context.Background(),
qacct.GetDefaultQacctFile(), 0)
Expect(err).NotTo(HaveOccurred())
Expect(jobDetailsChan).NotTo(BeNil())

qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{})
Expect(err).NotTo(HaveOccurred())

jobIDs := make([]int, 10)
for i := 0; i < 10; i++ {
jobID, _, err := qs.Submit(context.Background(), qsub.JobOptions{
Command: "echo",
CommandArgs: []string{fmt.Sprintf("job %d", i+1)},
Binary: qsub.ToPtr(true),
})
jobID, _, err := qs.Submit(context.Background(),
qsub.JobOptions{
Command: "/bin/bash",
CommandArgs: []string{"-c", fmt.Sprintf("echo job %d; sleep 0", i+1)},
Binary: qsub.ToPtr(true),
})
Expect(err).NotTo(HaveOccurred())
log.Printf("jobID: %d", jobID)
jobIDs[i] = int(jobID)
}

jobDetailsChan, err := qacct.WatchFile(context.Background(),
qacct.GetDefaultQacctFile(), 0)
Expect(err).NotTo(HaveOccurred())
Expect(jobDetailsChan).NotTo(BeNil())

receivedJobs := make(map[int]bool)
Eventually(func() bool {
select {
case jd := <-jobDetailsChan:
log.Printf("job: %+v", jd.JobNumber)
// check if jobID is in the jobIDs list
if slices.Contains(jobIDs, int(jd.JobNumber)) {
Expect(jd.SubmitCommandLine).To(ContainSubstring("echo 'job"))
Expect(jd.SubmitCommandLine).To(ContainSubstring("bash"))
Expect(jd.JobUsage.Usage.Memory).To(BeNumerically(">=", 0))
receivedJobs[int(jd.JobNumber)] = true
}
Expand Down
108 changes: 108 additions & 0 deletions pkg/qstat/v9.0/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,111 @@ func ParseClusterQueueSummary(out string) ([]ClusterQueueSummary, error) {

return summaries, nil
}

/*
qstat -g d
job-ID prior name user state submit/start at queue slots ja-task-ID
-----------------------------------------------------------------------------------------------------------------

33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 1
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 3
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 5
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 7
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 25
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 27
36 0.60500 sleep root qw 2025-02-10 16:52:21 2
37 0.60500 sleep root qw 2025-02-10 16:52:35 2
38 0.60500 sleep root qw 2025-02-10 16:52:49 2
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 1
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 2
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 3
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 8
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 9
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 10
33 0.50500 sleep root qw 2025-02-10 16:47:18 1 29
33 0.50500 sleep root qw 2025-02-10 16:47:18 1 31
33 0.50500 sleep root qw 2025-02-10 16:47:18 1 99
34 0.50500 sleep root qw 2025-02-10 16:51:51 1
*/
func ParseJobArrayTask(out string) ([]JobArrayTask, error) {
lines := strings.Split(out, "\n")

jobArrayTasks := make([]JobArrayTask, 0, len(lines)-3)

for _, line := range lines[2:] {
fields := strings.Fields(line)
if len(fields) < 8 {
continue
}
jobID, err := strconv.Atoi(fields[0])
if err != nil {
return nil, fmt.Errorf("failed to parse jobID: %v", err)
}
priority, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, fmt.Errorf("failed to parse priority: %v", err)
}
name := fields[2]
user := fields[3]
state := fields[4]
timeString := fields[5] + " " + fields[6]
jobTime, err := time.Parse("2006-01-02 15:04:05", timeString)
if err != nil {
return nil, fmt.Errorf("failed to parse submit time: %v", err)
}
var submitTime time.Time
var startTime time.Time
if strings.Contains(state, "qw") {
startTime = jobTime
} else {
submitTime = jobTime
}

// if fields[7] is not a number, it is the queue name
var slots int
var taskID int
var queue string

// when waiting there is no queue name
if slotsInt, err := strconv.Atoi(fields[7]); err != nil {
queue = fields[7]
if len(fields) > 8 {
slots, _ = strconv.Atoi(fields[8])
}
if len(fields) > 9 {
taskID, _ = strconv.Atoi(fields[9])
}
} else {
slots = slotsInt
// waiting jobs
if len(fields) > 8 {
slots, _ = strconv.Atoi(fields[8])
}
if len(fields) > 9 {
taskID, err = strconv.Atoi(fields[9])
if err != nil {
// a single job and parallel job has no taskID
taskID = 0
}
}
}

jobInfo := JobInfo{
JobID: jobID,
Priority: priority,
Name: name,
User: user,
State: state,
SubmitTime: submitTime,
StartTime: startTime,
Queue: queue,
Slots: slots,
JaTaskIDs: []int64{int64(taskID)},
}
jobArrayTasks = append(jobArrayTasks, JobArrayTask{
JobInfo: jobInfo,
})

}
return jobArrayTasks, nil
}
43 changes: 43 additions & 0 deletions pkg/qstat/v9.0/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,4 +452,47 @@ test.q 0.08 0 0 2 2 0 0

})

Describe("JobArrayTask", func() {

It("should parse the output of qstat -g d", func() {
input := `job-ID prior name user state submit/start at queue slots ja-task-ID
-----------------------------------------------------------------------------------------------------------------
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 1
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 3
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 5
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 23
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 25
33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 27
36 0.60500 sleep root qw 2025-02-10 16:52:21 2
37 0.60500 sleep root qw 2025-02-10 16:52:35 2
38 0.60500 sleep root qw 2025-02-10 16:52:49 2
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 1
39 0.60500 sleep root qw 2025-02-10 16:53:23 2 2
33 0.50500 sleep root qw 2025-02-10 16:47:18 1 95
33 0.50500 sleep root qw 2025-02-10 16:47:18 1 97
33 0.50500 sleep root qw 2025-02-10 16:47:18 1 99
34 0.50500 sleep root qw 2025-02-10 16:51:51 1
`
jobArrayTasks, err := qstat.ParseJobArrayTask(input)
Expect(err).NotTo(HaveOccurred())
Expect(len(jobArrayTasks)).To(Equal(15))

Expect(jobArrayTasks).To(ContainElement(qstat.JobArrayTask{
JobInfo: qstat.JobInfo{
JobID: 33,
Priority: 0.505,
Name: "sleep",
User: "root",
State: "r",
SubmitTime: time.Date(2025, 2, 10, 16, 47, 18, 0, time.UTC),
StartTime: time.Time{},
Queue: "all.q@master",
Slots: 1,
JaTaskIDs: []int64{1},
},
}))
})

})

})
2 changes: 2 additions & 0 deletions pkg/qstat/v9.0/qstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ type QStat interface {
ShowFullOutputWithResources(resourceAttributes string) ([]JobInfo, error)
// qstat -g c
DisplayClusterQueueSummary() ([]ClusterQueueSummary, error)
// qstat -g d shows all job array tasks individually
DisplayAllJobArrayTasks() ([]JobArrayTask, error)
// qstat -g p shows all parallel job tasks individually
DisplayAllParallelJobTasks() ([]ParallelJobTask, error)
// qstat -help
ShowHelp() (string, error)
Expand Down
12 changes: 11 additions & 1 deletion pkg/qstat/v9.0/qstat_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,18 @@ func (q *QStatImpl) DisplayClusterQueueSummary() ([]ClusterQueueSummary, error)
return ParseClusterQueueSummary(out)
}

// DisplayAllJobArrayTasks is equivalent to "qstat -g d"
func (q *QStatImpl) DisplayAllJobArrayTasks() ([]JobArrayTask, error) {
return nil, fmt.Errorf("not implemented")
out, err := q.NativeSpecification([]string{"-g", "d"})
if err != nil {
return nil, fmt.Errorf("failed to get output of qstat: %w", err)
}
jobArrayTasks, err := ParseJobArrayTask(out)
if err != nil {
return nil, fmt.Errorf("failed to parse job array tasks: %w", err)
}

return jobArrayTasks, nil
}

func (q *QStatImpl) DisplayAllParallelJobTasks() ([]ParallelJobTask, error) {
Expand Down
Loading