filed

Job queue using FUSE

git clone git://mccd.space/filed

manager.go (6008B)

      1 package main
      2 
      3 import (
      4 	"bytes"
      5 	"context"
      6 	"git.sr.ht/~marcc/filed/store"
      7 	"strings"
      8 
      9 	"fmt"
     10 	"log/slog"
     11 	"math"
     12 	"os/exec"
     13 	"sync"
     14 	"time"
     15 )
     16 
     17 type RestrictionsArg struct {
     18 	rwFilesArg []string
     19 	roFilesArg []string
     20 	rwDirArg   []string
     21 	roDirArg   []string
     22 }
     23 
     24 type JobManager struct {
     25 	store      *store.Store
     26 	activeJobs sync.Map
     27 	// Command to use for executing jobs in pending
     28 	execCmd      string
     29 	restrictions *RestrictionsArg
     30 
     31 	// Used to notify that a job should be run with JobManager.notify
     32 	notifyChan chan struct{}
     33 }
     34 
     35 type ActiveJob struct {
     36 	mu     sync.Mutex
     37 	cancel context.CancelFunc
     38 	output bytes.Buffer
     39 }
     40 
     41 func NewJobManager(s *store.Store, filedLaunchExecutablePath string, restrictions *Restrictions) *JobManager {
     42 
     43 	argRestrictions := RestrictionsArg{}
     44 	if restrictions != nil {
     45 		argRestrictions.roFilesArg = toArg(restrictions.roFiles, "-rof")
     46 		argRestrictions.roDirArg = toArg(restrictions.roDir, "-ro")
     47 		argRestrictions.rwFilesArg = toArg(restrictions.rwFiles, "-rwf")
     48 		argRestrictions.rwDirArg = toArg(restrictions.rwDir, "-rw")
     49 	}
     50 	slog.Info("restrictions", "rest", argRestrictions)
     51 
     52 	return &JobManager{
     53 		store:        s,
     54 		execCmd:      filedLaunchExecutablePath,
     55 		restrictions: &argRestrictions,
     56 
     57 		// Buffer size 1 = Ignore more than one signal so we don't process jobs too many times
     58 		notifyChan: make(chan struct{}, 1),
     59 	}
     60 }
     61 
     62 type signal struct{}
     63 
     64 // Notifies to process jobs
     65 func (jm *JobManager) notify() {
     66 	select {
     67 	case jm.notifyChan <- signal{}:
     68 	default:
     69 		// Already signaled to process jobs, ignore.
     70 	}
     71 }
     72 
     73 func toArg(lst []string, argParam string) []string {
     74 	var result []string
     75 
     76 	for _, v := range lst {
     77 		result = append(result, argParam, v)
     78 	}
     79 
     80 	return result
     81 }
     82 
     83 func (jm *JobManager) StartWorker(ctx context.Context) {
     84 	jobs, err := jm.store.ListJobsByState(store.StateRunning)
     85 	if err != nil {
     86 		panic(err)
     87 	}
     88 
     89 	for _, job := range jobs {
     90 		slog.Warn("Found dangling job, moving back to pending", "job", job.ID)
     91 		err := jm.store.RestartJob(job.ID, job.Output)
     92 		if err != nil {
     93 			panic(err)
     94 		}
     95 	}
     96 
     97 	go func() {
     98 		jm.notify()
     99 		for {
    100 			nextRetryIn := jm.processPendingJobs()
    101 			var timerChan <-chan time.Time
    102 			if nextRetryIn > 0 {
    103 				timerChan = time.After(nextRetryIn)
    104 			}
    105 
    106 			select {
    107 			case <-ctx.Done():
    108 				return
    109 			case <-jm.notifyChan:
    110 				// Wake up because a job was added or finished
    111 			case <-timerChan:
    112 				// Wake up because a backoff period ended
    113 			}
    114 
    115 		}
    116 	}()
    117 }
    118 
    119 func (jm *JobManager) processPendingJobs() time.Duration {
    120 	var minWait time.Duration = 0
    121 	conf := jm.store.GetConfig()
    122 	activeJobs, err := jm.store.ListJobsByState(store.StateRunning)
    123 	if err != nil {
    124 		slog.Error("Worker: Failed to list jobs", "error", err)
    125 		return minWait
    126 	}
    127 	activeJobCount := len(activeJobs)
    128 
    129 	jobs, err := jm.store.ListJobsByState(store.StatePending)
    130 	if err != nil {
    131 		slog.Error("Worker: Failed to list jobs", "error", err)
    132 		return minWait
    133 	}
    134 
    135 	for _, job := range jobs {
    136 		if activeJobCount >= conf.MaxJobCount {
    137 			break
    138 		}
    139 
    140 		if job.Attempts >= conf.MaxAttempts {
    141 			slog.Warn("Worker: job exceeded max attempts. Moving to failed", "Job", job.ID)
    142 			jm.store.FailJob(job.ID)
    143 			continue
    144 		}
    145 		ready, waitTime := jm.isReadyWithWait(&conf, &job)
    146 		if ready {
    147 			slog.Info("Worker: Starting job", "id", job.ID, "cmd", job.Command, "attempt", job.Attempts+1)
    148 			if err := jm.store.AttemptJob(job.ID); err != nil {
    149 				slog.Error("Worker: Failed to claim job", "id", job.ID, "error", err)
    150 				continue
    151 			}
    152 
    153 			go jm.runJob(job.ID, job.Command)
    154 			activeJobCount++
    155 		} else {
    156 			if minWait == 0 || waitTime < minWait {
    157 				minWait = waitTime
    158 			}
    159 		}
    160 	}
    161 	return minWait
    162 }
    163 
    164 func (jm *JobManager) isReadyWithWait(config *store.Config, job *store.Job) (bool, time.Duration) {
    165 	currentTime := time.Now().Unix()
    166 	lastModified := job.UpdatedAt.Unix()
    167 	waitTimeSec := int64(float64(config.BackoffBase) * math.Pow(float64(config.BackoffMult), float64(job.Attempts)))
    168 	readyAt := lastModified + waitTimeSec
    169 
    170 	remaining := readyAt - currentTime
    171 	if remaining <= 0 {
    172 		return true, 0
    173 	}
    174 	return false, time.Duration(remaining) * time.Second
    175 }
    176 
    177 func (jm *JobManager) runJob(id, commandStr string) {
    178 	defer jm.notify()
    179 	timeout_seconds := jm.store.GetConfig().TimeoutSec
    180 
    181 	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout_seconds)*time.Second)
    182 	defer cancel()
    183 
    184 	active := &ActiveJob{cancel: cancel}
    185 	jm.activeJobs.Store(id, active)
    186 	defer jm.activeJobs.Delete(id)
    187 
    188 	args := strings.Fields(commandStr)
    189 
    190 	cmd := exec.CommandContext(ctx, jm.execCmd)
    191 	if jm.restrictions != nil {
    192 		cmd.Args = append(cmd.Args, jm.restrictions.roFilesArg...)
    193 		cmd.Args = append(cmd.Args, jm.restrictions.rwFilesArg...)
    194 		cmd.Args = append(cmd.Args, jm.restrictions.rwDirArg...)
    195 		cmd.Args = append(cmd.Args, jm.restrictions.roDirArg...)
    196 	}
    197 	cmd.Args = append(cmd.Args, "--")
    198 	cmd.Args = append(cmd.Args, args...)
    199 
    200 	writer := &SafeBuffer{target: active}
    201 	cmd.Stdout = writer
    202 	cmd.Stderr = writer
    203 
    204 	err := cmd.Run()
    205 
    206 	jobOutput := active.output.Bytes()
    207 
    208 	exitCode := cmd.ProcessState.ExitCode()
    209 
    210 	if err != nil || exitCode != 0 {
    211 		slog.Warn("Worker: Job failed", "id", id, "error", err, "exitCode", exitCode)
    212 		errMsg := fmt.Sprintf("\n\n[System Error]: %v\n", err)
    213 		jobOutput = append(jobOutput, []byte(errMsg)...)
    214 		// This can return an error if the job is cancelled by removing it.
    215 		// Probably we should handle it more gracefully
    216 		jm.store.RestartJob(id, jobOutput)
    217 	} else {
    218 		slog.Info("Worker: Job completed", "id", id, "exitCode", exitCode)
    219 		jm.store.CompleteJob(id, jobOutput)
    220 	}
    221 }
    222 
    223 type SafeBuffer struct {
    224 	target *ActiveJob
    225 }
    226 
    227 func (sb *SafeBuffer) Write(p []byte) (n int, err error) {
    228 	sb.target.mu.Lock()
    229 	defer sb.target.mu.Unlock()
    230 	return sb.target.output.Write(p)
    231 }
    232 
    233 func (jm *JobManager) GetLiveLog(id string) []byte {
    234 	val, ok := jm.activeJobs.Load(id)
    235 	if !ok {
    236 		return nil
    237 	}
    238 	job := val.(*ActiveJob)
    239 	job.mu.Lock()
    240 	defer job.mu.Unlock()
    241 	b := make([]byte, job.output.Len())
    242 	copy(b, job.output.Bytes())
    243 	return b
    244 }