filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
commit 1cf873b22d027eb29c636fc84ce2b94c90b310a0 parent bce898b76d81bd28fc76eb0716a1822d2f8558bb Author: Marc Coquand <marc@coquand.email> Date: Sun, 14 Dec 2025 13:55:16 +0100 Working retries Diffstat:
| M | main.go | | | 8 | ++++---- |
| M | manager.go | | | 5 | ++++- |
| M | store/store.go | | | 8 | ++++---- |
3 files changed, 12 insertions(+), 9 deletions(-)
diff --git a/main.go b/main.go
@@ -90,11 +90,11 @@ func (rd RootDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
case "pending":
return PendingDir{manager: rd.manager}, nil
case "complete":
- return JobDir{state: name, manager: rd.manager, inode: 2}, nil
- case "failed":
return JobDir{state: name, manager: rd.manager, inode: 3}, nil
- case "active":
+ case "failed":
return JobDir{state: name, manager: rd.manager, inode: 4}, nil
+ case "active":
+ return JobDir{state: name, manager: rd.manager, inode: 5}, nil
default:
return nil, syscall.ENOENT
}
@@ -119,7 +119,7 @@ func (PendingDir) Attr(ctx context.Context, a *fuse.Attr) error {
a.Mode = os.ModeDir | 0o775
a.Gid = uint32(os.Getgid())
a.Uid = uint32(os.Getuid())
- a.Inode = 1
+ a.Inode = 2
return nil
}
diff --git a/manager.go b/manager.go
@@ -47,8 +47,11 @@ func (jm *JobManager) processPendingJobs() {
}
for _, job := range jobs {
- if job.Attempts > 4 {
+ slog.Info("Worker", "j", job)
+ if job.Attempts >= 2 {
+ slog.Warn("WORK job exceeded failure rate.", "Job", job.ID)
jm.store.FailJob(job.ID)
+ continue
}
if err := jm.store.AttemptJob(job.ID); err != nil {
slog.Error("Worker: Failed to claim job", "id", job.ID, "error", err)
diff --git a/store/store.go b/store/store.go
@@ -122,7 +122,7 @@ func (s *Store) DeleteJob(id string) error {
}
func (s *Store) ListJobsByState(state string) ([]Job, error) {
- rows, err := s.db.Query("SELECT id, command, created_at,updated_at FROM jobs WHERE state = ?", state)
+ rows, err := s.db.Query("SELECT id, command, attempts, created_at,updated_at FROM jobs WHERE state = ?", state)
if err != nil {
return nil, err
}
@@ -131,7 +131,7 @@ func (s *Store) ListJobsByState(state string) ([]Job, error) {
var jobs []Job
for rows.Next() {
var j Job
- if err := rows.Scan(&j.ID, &j.Command, &j.CreatedAt, &j.UpdatedAt); err != nil {
+ if err := rows.Scan(&j.ID, &j.Command, &j.Attempts, &j.CreatedAt, &j.UpdatedAt); err != nil {
return nil, err
}
j.State = state
@@ -174,7 +174,7 @@ func (s *Store) RestartJob(id string, output []byte) error {
SET state = ?, output = ?, updated_at = ?
WHERE id = ?
`
- res, err := s.db.Exec(query, output, StatePending, time.Now(), id)
+ res, err := s.db.Exec(query, StatePending, output, time.Now(), id)
if err != nil {
return err
}
@@ -188,7 +188,7 @@ func (s *Store) RestartJob(id string, output []byte) error {
func (s *Store) ResetJob(id string) error {
query := `
UPDATE jobs
- state = ?, attempts = 0, updated_at = ?
+ SET state = ?, attempts = 0, updated_at = ?
WHERE id = ?
`
res, err := s.db.Exec(query, StatePending, time.Now(), id)