filed

Job queue using FUSE

git clone git://mccd.space/filed

jobdir.go (3412B)

      1 package main
      2 
      3 import (
      4 	"context"
      5 	"git.sr.ht/~marcc/filed/store"
      6 	"log/slog"
      7 	"os"
      8 	"os/exec"
      9 	"runtime"
     10 	"syscall"
     11 
     12 	"bazil.org/fuse"
     13 	"bazil.org/fuse/fs"
     14 )
     15 
     16 type JobDir struct {
     17 	state   string
     18 	manager *JobManager
     19 	inode   uint64
     20 }
     21 
     22 func (jd JobDir) Attr(ctx context.Context, a *fuse.Attr) error {
     23 	fileMeta, err := jd.manager.store.GetFileMeta(jd.inode)
     24 	if err != nil {
     25 		slog.Error("Could not retrieve file metadata", "error", err)
     26 		return syscall.EIO
     27 	} else if fileMeta != nil {
     28 		a.Mode = os.FileMode(fileMeta.Mode)
     29 		a.Gid = fileMeta.GID
     30 		a.Uid = fileMeta.UID
     31 		a.Inode = jd.inode
     32 
     33 	} else {
     34 		a.Mode = os.ModeDir | 0o750
     35 		a.Gid = uint32(os.Getgid())
     36 		a.Uid = uint32(os.Getuid())
     37 		a.Inode = jd.inode
     38 	}
     39 	return nil
     40 }
     41 
     42 func (jd JobDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
     43 	slog.Warn("FUSE: Changing access permissions")
     44 	defaultMode, err := jd.manager.store.GetFileMeta(jd.inode)
     45 	if defaultMode == nil {
     46 		defaultMode = &store.FileMeta{
     47 			Inode: jd.inode,
     48 			Mode:  uint32(os.ModeDir | 0o750),
     49 			UID:   uint32(os.Getgid()),
     50 			GID:   uint32(os.Getuid()),
     51 		}
     52 
     53 	}
     54 	err = Setattr(jd.manager.store, defaultMode, ctx, req, resp)
     55 	if err != nil {
     56 		return err
     57 	}
     58 
     59 	return nil
     60 }
     61 
     62 func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
     63 	jobs, err := jd.manager.store.ListJobsByState(jd.state)
     64 	if err != nil {
     65 		slog.Error("Could not find jobs", "error", err)
     66 		return entries, nil
     67 	}
     68 	for _, job := range jobs {
     69 		entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
     70 	}
     71 	return entries, nil
     72 }
     73 
     74 func (jd JobDir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
     75 	slog.Info("Removing job", "id", req.Name)
     76 	if jd.state == store.StateRunning {
     77 		val, ok := jd.manager.activeJobs.Load(req.Name)
     78 		if !ok {
     79 			// Assume that job is already complete
     80 			return jd.manager.store.DeleteJob(req.Name)
     81 		}
     82 		activeJob := val.(*ActiveJob)
     83 		activeJob.cancel()
     84 	}
     85 	return jd.manager.store.DeleteJob(req.Name)
     86 }
     87 
     88 func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
     89 	if d.state == store.StateRunning {
     90 		slog.Warn("Refusing to restart a currently running job", "id", req.OldName)
     91 		return syscall.EPERM
     92 	}
     93 
     94 	if _, ok := newDir.(PendingDir); !ok {
     95 		slog.Warn("Jobs can only be moved to 'pending' (to restart) or deleted", "dest", newDir)
     96 		return syscall.EPERM
     97 	}
     98 
     99 	if req.OldName != req.NewName {
    100 		// XXX Maybe support?
    101 		slog.Warn("Renaming job IDs is not supported", "old", req.OldName, "new", req.NewName)
    102 		return syscall.EPERM
    103 	}
    104 
    105 	slog.Info("Restarting job", "id", req.OldName)
    106 
    107 	err := d.manager.store.ResetJob(req.OldName)
    108 	d.manager.notify()
    109 
    110 	if err != nil {
    111 		slog.Error("Failed to restart job in DB", "error", err)
    112 		return syscall.ENOENT
    113 	}
    114 
    115 	return nil
    116 }
    117 
    118 func (jd JobDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
    119 	slog.Debug("FUSE: Jobdir Lookup", "name", name)
    120 	job, err := jd.manager.store.GetJob(name)
    121 	if err != nil {
    122 		slog.Warn("FUSE: Not found", "name", name)
    123 		return nil, syscall.ENOENT
    124 	}
    125 	if job.State == jd.state {
    126 		slog.Debug("FUSE: Found job", "id", job.ID)
    127 		return &File{job, jd.manager}, nil
    128 	} else {
    129 		return nil, syscall.ENOENT
    130 	}
    131 }
    132 
    133 func Unmount(dir string) error {
    134 	if runtime.GOOS == "linux" {
    135 		cmd := exec.Command("fusermount", "-u", "-z", dir)
    136 		return cmd.Run()
    137 	}
    138 	return fuse.Unmount(dir)
    139 }