filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
jobs.go (5542B)
1 package store
2
3 import (
4 "database/sql"
5 "log/slog"
6 "os"
7 "sync"
8 "time"
9
10 _ "modernc.org/sqlite"
11 )
12
13 const (
14 StatePending = "pending"
15 StateRunning = "active"
16 StateCompleted = "complete"
17 StateFailed = "failed"
18 )
19
20 // A single unit of work
21 type Job struct {
22 ID string
23 State string
24 INode int
25 Command string
26 // Command output on completion
27 Output []byte
28 Attempts int
29 CreatedAt time.Time
30 UpdatedAt time.Time
31 }
32
33 type Store struct {
34 db *sql.DB
35 mu sync.RWMutex
36 config Config
37 }
38
39 func NewStore(filepath string) (*Store, error) {
40 slog.Info("STORE: Connecting", "filepath", filepath)
41 db, err := sql.Open("sqlite", filepath)
42 if err != nil {
43 return nil, err
44 }
45
46 // Otherwise there are strange race conditions
47 db.SetMaxOpenConns(1)
48
49 if _, err := db.Exec("PRAGMA journal_mode=WAL;PRAGMA busy_timeout=20000;"); err != nil {
50 return nil, err
51 }
52
53 s := &Store{db: db}
54 if err := s.initSchema(); err != nil {
55 db.Close()
56 return nil, err
57 }
58 s.PopulateConfig()
59
60 return s, nil
61 }
62
63 func (s *Store) initSchema() error {
64 query := `
65 CREATE TABLE IF NOT EXISTS jobs (
66 id TEXT PRIMARY KEY,
67 inode INTEGER,
68 state TEXT NOT NULL,
69 command TEXT NOT NULL,
70 output BLOB,
71 attempts INTEGER DEFAULT 0,
72 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
73 updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
74 );
75 CREATE INDEX IF NOT EXISTS idx_state ON jobs(state);
76 CREATE TABLE IF NOT EXISTS config (
77 id INTEGER PRIMARY KEY CHECK (id = 0),
78 max_attempts INTEGER DEFAULT 3,
79 timeout_sec INTEGER DEFAULT 180,
80 max_job_count INTEGER DEFAULT 20,
81 backoff_mult INTEGER DEFAULT 2,
82 backoff_base INTEGER DEFAULT 2
83 );
84 INSERT INTO config (id) VALUES (0) ON CONFLICT DO NOTHING;
85 CREATE TRIGGER IF NOT EXISTS auto_increment_inode_trigger
86 AFTER INSERT ON jobs
87 WHEN new.inode IS NULL
88 BEGIN
89 UPDATE jobs
90 SET inode = (SELECT IFNULL(MAX(inode), 0) + 1 FROM jobs)
91 WHERE id = new.id;
92 END;
93 CREATE TABLE IF NOT EXISTS file_meta (
94 inode INTEGER PRIMARY KEY,
95 mode INTEGER,
96 uid INTEGER,
97 gid INTEGER
98 );
99
100 `
101 _, err := s.db.Exec(query)
102 return err
103 }
104
105 func (s *Store) Close() error {
106 return s.db.Close()
107 }
108
109 // Adds a new job with the 'pending' state.
110 func (s *Store) CreateJob(id string, command string) (*Job, error) {
111 query := `
112 INSERT INTO jobs (id, state, command, created_at, updated_at)
113 VALUES (?, ?, ?, ?, ?)
114 `
115 now := time.Now()
116 j := &Job{
117 ID: id,
118 State: StatePending,
119 Command: command,
120 CreatedAt: now,
121 UpdatedAt: now,
122 Attempts: 0,
123 }
124
125 _, err := s.db.Exec(query, id, StatePending, command, now, now)
126 if err != nil {
127 return nil, err
128 }
129 return j, nil
130 }
131
132 func (s *Store) GetJob(id string) (*Job, error) {
133 query := `SELECT id, state, inode, command, output, attempts, created_at FROM jobs WHERE id = ?`
134 j := &Job{}
135 err := s.db.QueryRow(query, id).Scan(
136 &j.ID, &j.State, &j.INode, &j.Command, &j.Output, &j.Attempts, &j.CreatedAt,
137 )
138 if err == sql.ErrNoRows {
139 return nil, os.ErrNotExist
140 }
141 return j, err
142 }
143
144 func (s *Store) DeleteJob(id string) error {
145 res, err := s.db.Exec("DELETE FROM jobs WHERE id = ?", id)
146 if err != nil {
147 return err
148 }
149 rows, _ := res.RowsAffected()
150 if rows == 0 {
151 return os.ErrNotExist
152 }
153 return nil
154 }
155
156 func (s *Store) ListJobsByState(state string) ([]Job, error) {
157 // Since it's a queue, it should be first in first out.
158 rows, err := s.db.Query("SELECT id, command, attempts, created_at,updated_at FROM jobs WHERE state = ? ORDER BY created_at", state)
159 if err != nil {
160 return nil, err
161 }
162 defer rows.Close()
163
164 var jobs []Job
165 for rows.Next() {
166 var j Job
167 if err := rows.Scan(&j.ID, &j.Command, &j.Attempts, &j.CreatedAt, &j.UpdatedAt); err != nil {
168 return nil, err
169 }
170 j.State = state
171 jobs = append(jobs, j)
172 }
173 return jobs, nil
174 }
175
176 // Picks up a pending job and moves it to 'active'.
177 func (s *Store) AttemptJob(id string) error {
178 tx, err := s.db.Begin()
179 if err != nil {
180 return err
181 }
182 defer tx.Rollback()
183
184 var currentState string
185 err = tx.QueryRow("SELECT state FROM jobs WHERE id = ?", id).Scan(¤tState)
186 if err != nil {
187 return err
188 }
189
190 query := `
191 UPDATE jobs
192 SET state = ?, attempts = attempts + 1, updated_at = ?
193 WHERE id = ?
194 `
195 _, err = tx.Exec(query, StateRunning, time.Now(), id)
196 if err != nil {
197 return err
198 }
199
200 return tx.Commit()
201 }
202
203 // Moves a job back to 'pending'.
204 func (s *Store) RestartJob(id string, output []byte) error {
205 query := `
206 UPDATE jobs
207 SET state = ?, output = ?, updated_at = ?
208 WHERE id = ?
209 `
210 res, err := s.db.Exec(query, StatePending, output, time.Now(), id)
211 if err != nil {
212 return err
213 }
214 rows, _ := res.RowsAffected()
215 if rows == 0 {
216 return os.ErrNotExist
217 }
218 return nil
219 }
220
221 func (s *Store) ResetJob(id string) error {
222 query := `
223 UPDATE jobs
224 SET state = ?, attempts = 0, updated_at = ?
225 WHERE id = ?
226 `
227 res, err := s.db.Exec(query, StatePending, time.Now(), id)
228 if err != nil {
229 return err
230 }
231 rows, _ := res.RowsAffected()
232 if rows == 0 {
233 return os.ErrNotExist
234 }
235 return nil
236 }
237
238 func (s *Store) CompleteJob(id string, output []byte) error {
239 _, err := s.db.Exec("UPDATE jobs SET state = ?, output = ?, updated_at = ? WHERE id = ?", StateCompleted, output, time.Now(), id)
240 return err
241 }
242
243 func (s *Store) WriteOutput(id string, output []byte) error {
244 _, err := s.db.Exec("UPDATE jobs SET output = ?, updated_at = ? WHERE id = ?", output, time.Now(), id)
245 return err
246 }
247
248 func (s *Store) FailJob(id string) error {
249 _, err := s.db.Exec("UPDATE jobs SET state = ?, updated_at = ? WHERE id = ?", StateFailed, time.Now(), id)
250 return err
251 }