filed

Job queue using FUSE

git clone git://mccd.space/filed

jobs.go (5542B)

      1 package store
      2 
      3 import (
      4 	"database/sql"
      5 	"log/slog"
      6 	"os"
      7 	"sync"
      8 	"time"
      9 
     10 	_ "modernc.org/sqlite"
     11 )
     12 
     13 const (
     14 	StatePending   = "pending"
     15 	StateRunning   = "active"
     16 	StateCompleted = "complete"
     17 	StateFailed    = "failed"
     18 )
     19 
     20 // A single unit of work
     21 type Job struct {
     22 	ID      string
     23 	State   string
     24 	INode   int
     25 	Command string
     26 	// Command output on completion
     27 	Output    []byte
     28 	Attempts  int
     29 	CreatedAt time.Time
     30 	UpdatedAt time.Time
     31 }
     32 
     33 type Store struct {
     34 	db     *sql.DB
     35 	mu     sync.RWMutex
     36 	config Config
     37 }
     38 
     39 func NewStore(filepath string) (*Store, error) {
     40 	slog.Info("STORE: Connecting", "filepath", filepath)
     41 	db, err := sql.Open("sqlite", filepath)
     42 	if err != nil {
     43 		return nil, err
     44 	}
     45 
     46 	// Otherwise there are strange race conditions
     47 	db.SetMaxOpenConns(1)
     48 
     49 	if _, err := db.Exec("PRAGMA journal_mode=WAL;PRAGMA busy_timeout=20000;"); err != nil {
     50 		return nil, err
     51 	}
     52 
     53 	s := &Store{db: db}
     54 	if err := s.initSchema(); err != nil {
     55 		db.Close()
     56 		return nil, err
     57 	}
     58 	s.PopulateConfig()
     59 
     60 	return s, nil
     61 }
     62 
     63 func (s *Store) initSchema() error {
     64 	query := `
     65 	CREATE TABLE IF NOT EXISTS jobs (
     66 		id TEXT PRIMARY KEY,
     67 		inode INTEGER,
     68 		state TEXT NOT NULL,
     69 		command TEXT NOT NULL,
     70 		output BLOB,
     71 		attempts INTEGER DEFAULT 0,
     72 		created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
     73 		updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
     74 	);
     75 	CREATE INDEX IF NOT EXISTS idx_state ON jobs(state);
     76 	CREATE TABLE IF NOT EXISTS config (
     77 		id INTEGER PRIMARY KEY CHECK (id = 0),
     78 		max_attempts INTEGER DEFAULT 3,
     79 		timeout_sec INTEGER DEFAULT 180,
     80 		max_job_count INTEGER DEFAULT 20,
     81 		backoff_mult INTEGER DEFAULT 2,
     82 		backoff_base INTEGER DEFAULT 2
     83 	);
     84 	INSERT INTO config (id) VALUES (0) ON CONFLICT DO NOTHING;
     85 	CREATE TRIGGER IF NOT EXISTS auto_increment_inode_trigger
     86 		AFTER INSERT ON jobs
     87 		WHEN new.inode IS NULL
     88 		BEGIN
     89 			UPDATE jobs
     90 			SET inode = (SELECT IFNULL(MAX(inode), 0) + 1 FROM jobs)
     91 			WHERE id = new.id;
     92 	END;
     93 	CREATE TABLE IF NOT EXISTS file_meta (
     94 		inode INTEGER PRIMARY KEY,
     95 		mode INTEGER,
     96 		uid INTEGER,
     97 		gid INTEGER
     98 	);
     99 	
    100 	`
    101 	_, err := s.db.Exec(query)
    102 	return err
    103 }
    104 
    105 func (s *Store) Close() error {
    106 	return s.db.Close()
    107 }
    108 
    109 // Adds a new job with the 'pending' state.
    110 func (s *Store) CreateJob(id string, command string) (*Job, error) {
    111 	query := `
    112 		INSERT INTO jobs (id, state, command, created_at, updated_at)
    113 		VALUES (?, ?, ?, ?, ?)
    114 	`
    115 	now := time.Now()
    116 	j := &Job{
    117 		ID:        id,
    118 		State:     StatePending,
    119 		Command:   command,
    120 		CreatedAt: now,
    121 		UpdatedAt: now,
    122 		Attempts:  0,
    123 	}
    124 
    125 	_, err := s.db.Exec(query, id, StatePending, command, now, now)
    126 	if err != nil {
    127 		return nil, err
    128 	}
    129 	return j, nil
    130 }
    131 
    132 func (s *Store) GetJob(id string) (*Job, error) {
    133 	query := `SELECT id, state, inode, command, output, attempts, created_at FROM jobs WHERE id = ?`
    134 	j := &Job{}
    135 	err := s.db.QueryRow(query, id).Scan(
    136 		&j.ID, &j.State, &j.INode, &j.Command, &j.Output, &j.Attempts, &j.CreatedAt,
    137 	)
    138 	if err == sql.ErrNoRows {
    139 		return nil, os.ErrNotExist
    140 	}
    141 	return j, err
    142 }
    143 
    144 func (s *Store) DeleteJob(id string) error {
    145 	res, err := s.db.Exec("DELETE FROM jobs WHERE id = ?", id)
    146 	if err != nil {
    147 		return err
    148 	}
    149 	rows, _ := res.RowsAffected()
    150 	if rows == 0 {
    151 		return os.ErrNotExist
    152 	}
    153 	return nil
    154 }
    155 
    156 func (s *Store) ListJobsByState(state string) ([]Job, error) {
    157 	// Since it's a queue, it should be first in first out.
    158 	rows, err := s.db.Query("SELECT id, command, attempts, created_at,updated_at FROM jobs WHERE state = ? ORDER BY created_at", state)
    159 	if err != nil {
    160 		return nil, err
    161 	}
    162 	defer rows.Close()
    163 
    164 	var jobs []Job
    165 	for rows.Next() {
    166 		var j Job
    167 		if err := rows.Scan(&j.ID, &j.Command, &j.Attempts, &j.CreatedAt, &j.UpdatedAt); err != nil {
    168 			return nil, err
    169 		}
    170 		j.State = state
    171 		jobs = append(jobs, j)
    172 	}
    173 	return jobs, nil
    174 }
    175 
    176 // Picks up a pending job and moves it to 'active'.
    177 func (s *Store) AttemptJob(id string) error {
    178 	tx, err := s.db.Begin()
    179 	if err != nil {
    180 		return err
    181 	}
    182 	defer tx.Rollback()
    183 
    184 	var currentState string
    185 	err = tx.QueryRow("SELECT state FROM jobs WHERE id = ?", id).Scan(&currentState)
    186 	if err != nil {
    187 		return err
    188 	}
    189 
    190 	query := `
    191 		UPDATE jobs 
    192 		SET state = ?, attempts = attempts + 1, updated_at = ? 
    193 		WHERE id = ?
    194 	`
    195 	_, err = tx.Exec(query, StateRunning, time.Now(), id)
    196 	if err != nil {
    197 		return err
    198 	}
    199 
    200 	return tx.Commit()
    201 }
    202 
    203 // Moves a job back to 'pending'.
    204 func (s *Store) RestartJob(id string, output []byte) error {
    205 	query := `
    206 		UPDATE jobs 
    207 		SET state = ?, output = ?, updated_at = ?
    208 		WHERE id = ?
    209 	`
    210 	res, err := s.db.Exec(query, StatePending, output, time.Now(), id)
    211 	if err != nil {
    212 		return err
    213 	}
    214 	rows, _ := res.RowsAffected()
    215 	if rows == 0 {
    216 		return os.ErrNotExist
    217 	}
    218 	return nil
    219 }
    220 
    221 func (s *Store) ResetJob(id string) error {
    222 	query := `
    223 		UPDATE jobs 
    224 		SET state = ?, attempts = 0, updated_at = ?
    225 		WHERE id = ?
    226 	`
    227 	res, err := s.db.Exec(query, StatePending, time.Now(), id)
    228 	if err != nil {
    229 		return err
    230 	}
    231 	rows, _ := res.RowsAffected()
    232 	if rows == 0 {
    233 		return os.ErrNotExist
    234 	}
    235 	return nil
    236 }
    237 
    238 func (s *Store) CompleteJob(id string, output []byte) error {
    239 	_, err := s.db.Exec("UPDATE jobs SET state = ?, output = ?, updated_at = ? WHERE id = ?", StateCompleted, output, time.Now(), id)
    240 	return err
    241 }
    242 
    243 func (s *Store) WriteOutput(id string, output []byte) error {
    244 	_, err := s.db.Exec("UPDATE jobs SET output = ?, updated_at = ? WHERE id = ?", output, time.Now(), id)
    245 	return err
    246 }
    247 
    248 func (s *Store) FailJob(id string) error {
    249 	_, err := s.db.Exec("UPDATE jobs SET state = ?, updated_at = ? WHERE id = ?", StateFailed, time.Now(), id)
    250 	return err
    251 }