filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
manager.go (6008B)
1 package main
2
3 import (
4 "bytes"
5 "context"
6 "git.sr.ht/~marcc/filed/store"
7 "strings"
8
9 "fmt"
10 "log/slog"
11 "math"
12 "os/exec"
13 "sync"
14 "time"
15 )
16
17 type RestrictionsArg struct {
18 rwFilesArg []string
19 roFilesArg []string
20 rwDirArg []string
21 roDirArg []string
22 }
23
24 type JobManager struct {
25 store *store.Store
26 activeJobs sync.Map
27 // Command to use for executing jobs in pending
28 execCmd string
29 restrictions *RestrictionsArg
30
31 // Used to notify that a job should be run with JobManager.notify
32 notifyChan chan struct{}
33 }
34
35 type ActiveJob struct {
36 mu sync.Mutex
37 cancel context.CancelFunc
38 output bytes.Buffer
39 }
40
41 func NewJobManager(s *store.Store, filedLaunchExecutablePath string, restrictions *Restrictions) *JobManager {
42
43 argRestrictions := RestrictionsArg{}
44 if restrictions != nil {
45 argRestrictions.roFilesArg = toArg(restrictions.roFiles, "-rof")
46 argRestrictions.roDirArg = toArg(restrictions.roDir, "-ro")
47 argRestrictions.rwFilesArg = toArg(restrictions.rwFiles, "-rwf")
48 argRestrictions.rwDirArg = toArg(restrictions.rwDir, "-rw")
49 }
50 slog.Info("restrictions", "rest", argRestrictions)
51
52 return &JobManager{
53 store: s,
54 execCmd: filedLaunchExecutablePath,
55 restrictions: &argRestrictions,
56
57 // Buffer size 1 = Ignore more than one signal so we don't process jobs too many times
58 notifyChan: make(chan struct{}, 1),
59 }
60 }
61
62 type signal struct{}
63
64 // Notifies to process jobs
65 func (jm *JobManager) notify() {
66 select {
67 case jm.notifyChan <- signal{}:
68 default:
69 // Already signaled to process jobs, ignore.
70 }
71 }
72
73 func toArg(lst []string, argParam string) []string {
74 var result []string
75
76 for _, v := range lst {
77 result = append(result, argParam, v)
78 }
79
80 return result
81 }
82
83 func (jm *JobManager) StartWorker(ctx context.Context) {
84 jobs, err := jm.store.ListJobsByState(store.StateRunning)
85 if err != nil {
86 panic(err)
87 }
88
89 for _, job := range jobs {
90 slog.Warn("Found dangling job, moving back to pending", "job", job.ID)
91 err := jm.store.RestartJob(job.ID, job.Output)
92 if err != nil {
93 panic(err)
94 }
95 }
96
97 go func() {
98 jm.notify()
99 for {
100 nextRetryIn := jm.processPendingJobs()
101 var timerChan <-chan time.Time
102 if nextRetryIn > 0 {
103 timerChan = time.After(nextRetryIn)
104 }
105
106 select {
107 case <-ctx.Done():
108 return
109 case <-jm.notifyChan:
110 // Wake up because a job was added or finished
111 case <-timerChan:
112 // Wake up because a backoff period ended
113 }
114
115 }
116 }()
117 }
118
119 func (jm *JobManager) processPendingJobs() time.Duration {
120 var minWait time.Duration = 0
121 conf := jm.store.GetConfig()
122 activeJobs, err := jm.store.ListJobsByState(store.StateRunning)
123 if err != nil {
124 slog.Error("Worker: Failed to list jobs", "error", err)
125 return minWait
126 }
127 activeJobCount := len(activeJobs)
128
129 jobs, err := jm.store.ListJobsByState(store.StatePending)
130 if err != nil {
131 slog.Error("Worker: Failed to list jobs", "error", err)
132 return minWait
133 }
134
135 for _, job := range jobs {
136 if activeJobCount >= conf.MaxJobCount {
137 break
138 }
139
140 if job.Attempts >= conf.MaxAttempts {
141 slog.Warn("Worker: job exceeded max attempts. Moving to failed", "Job", job.ID)
142 jm.store.FailJob(job.ID)
143 continue
144 }
145 ready, waitTime := jm.isReadyWithWait(&conf, &job)
146 if ready {
147 slog.Info("Worker: Starting job", "id", job.ID, "cmd", job.Command, "attempt", job.Attempts+1)
148 if err := jm.store.AttemptJob(job.ID); err != nil {
149 slog.Error("Worker: Failed to claim job", "id", job.ID, "error", err)
150 continue
151 }
152
153 go jm.runJob(job.ID, job.Command)
154 activeJobCount++
155 } else {
156 if minWait == 0 || waitTime < minWait {
157 minWait = waitTime
158 }
159 }
160 }
161 return minWait
162 }
163
164 func (jm *JobManager) isReadyWithWait(config *store.Config, job *store.Job) (bool, time.Duration) {
165 currentTime := time.Now().Unix()
166 lastModified := job.UpdatedAt.Unix()
167 waitTimeSec := int64(float64(config.BackoffBase) * math.Pow(float64(config.BackoffMult), float64(job.Attempts)))
168 readyAt := lastModified + waitTimeSec
169
170 remaining := readyAt - currentTime
171 if remaining <= 0 {
172 return true, 0
173 }
174 return false, time.Duration(remaining) * time.Second
175 }
176
177 func (jm *JobManager) runJob(id, commandStr string) {
178 defer jm.notify()
179 timeout_seconds := jm.store.GetConfig().TimeoutSec
180
181 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout_seconds)*time.Second)
182 defer cancel()
183
184 active := &ActiveJob{cancel: cancel}
185 jm.activeJobs.Store(id, active)
186 defer jm.activeJobs.Delete(id)
187
188 args := strings.Fields(commandStr)
189
190 cmd := exec.CommandContext(ctx, jm.execCmd)
191 if jm.restrictions != nil {
192 cmd.Args = append(cmd.Args, jm.restrictions.roFilesArg...)
193 cmd.Args = append(cmd.Args, jm.restrictions.rwFilesArg...)
194 cmd.Args = append(cmd.Args, jm.restrictions.rwDirArg...)
195 cmd.Args = append(cmd.Args, jm.restrictions.roDirArg...)
196 }
197 cmd.Args = append(cmd.Args, "--")
198 cmd.Args = append(cmd.Args, args...)
199
200 writer := &SafeBuffer{target: active}
201 cmd.Stdout = writer
202 cmd.Stderr = writer
203
204 err := cmd.Run()
205
206 jobOutput := active.output.Bytes()
207
208 exitCode := cmd.ProcessState.ExitCode()
209
210 if err != nil || exitCode != 0 {
211 slog.Warn("Worker: Job failed", "id", id, "error", err, "exitCode", exitCode)
212 errMsg := fmt.Sprintf("\n\n[System Error]: %v\n", err)
213 jobOutput = append(jobOutput, []byte(errMsg)...)
214 // This can return an error if the job is cancelled by removing it.
215 // Probably we should handle it more gracefully
216 jm.store.RestartJob(id, jobOutput)
217 } else {
218 slog.Info("Worker: Job completed", "id", id, "exitCode", exitCode)
219 jm.store.CompleteJob(id, jobOutput)
220 }
221 }
222
223 type SafeBuffer struct {
224 target *ActiveJob
225 }
226
227 func (sb *SafeBuffer) Write(p []byte) (n int, err error) {
228 sb.target.mu.Lock()
229 defer sb.target.mu.Unlock()
230 return sb.target.output.Write(p)
231 }
232
233 func (jm *JobManager) GetLiveLog(id string) []byte {
234 val, ok := jm.activeJobs.Load(id)
235 if !ok {
236 return nil
237 }
238 job := val.(*ActiveJob)
239 job.mu.Lock()
240 defer job.mu.Unlock()
241 b := make([]byte, job.output.Len())
242 copy(b, job.output.Bytes())
243 return b
244 }