filed

Job queue using FUSE

git clone git://mccd.space/filed

pendingdir.go (4468B)

      1 package main
      2 
      3 import (
      4 	"bytes"
      5 	"context"
      6 	"git.sr.ht/~marcc/filed/store"
      7 	"log/slog"
      8 	"os"
      9 	"syscall"
     10 
     11 	"bazil.org/fuse"
     12 	"bazil.org/fuse/fs"
     13 )
     14 
     15 type PendingDir struct {
     16 	manager *JobManager
     17 	inode   uint64
     18 }
     19 
     20 func (pd PendingDir) Attr(ctx context.Context, a *fuse.Attr) error {
     21 	fileMeta, err := pd.manager.store.GetFileMeta(pd.inode)
     22 	if err != nil {
     23 		slog.Error("Could not retrieve file metadata", "error", err)
     24 		return syscall.EIO
     25 	} else if fileMeta != nil {
     26 		a.Mode = os.FileMode(fileMeta.Mode)
     27 		a.Gid = fileMeta.GID
     28 		a.Uid = fileMeta.UID
     29 		a.Inode = pd.inode
     30 
     31 	} else {
     32 		a.Mode = os.ModeDir | 0o750
     33 		a.Gid = uint32(os.Getgid())
     34 		a.Uid = uint32(os.Getuid())
     35 		a.Inode = pd.inode
     36 	}
     37 	return nil
     38 }
     39 
     40 func (pd *PendingDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
     41 	slog.Warn("FUSE: Changing access permissions")
     42 	defaultMode, err := pd.manager.store.GetFileMeta(pd.inode)
     43 	if defaultMode == nil {
     44 		defaultMode = &store.FileMeta{
     45 			Inode: pd.inode,
     46 			Mode:  uint32(os.ModeDir | 0o750),
     47 			UID:   uint32(os.Getgid()),
     48 			GID:   uint32(os.Getuid()),
     49 		}
     50 
     51 	}
     52 	err = Setattr(pd.manager.store, defaultMode, ctx, req, resp)
     53 	if err != nil {
     54 		return err
     55 	}
     56 
     57 	return nil
     58 }
     59 
     60 func (pd PendingDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
     61 	jobs, err := pd.manager.store.ListJobsByState(store.StatePending)
     62 	if err != nil {
     63 		slog.Error("FUSE: Could not find jobs", "error", err)
     64 		return entries, nil
     65 	}
     66 	for _, job := range jobs {
     67 		entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
     68 	}
     69 	return entries, nil
     70 }
     71 func (pd PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
     72 	slog.Info("Creating job file", "name", req.Name)
     73 
     74 	f := &JobCreationFile{
     75 		id:      req.Name,
     76 		manager: pd.manager,
     77 		uid:     req.Uid,
     78 		gid:     req.Gid,
     79 		mode:    req.Mode,
     80 	}
     81 	return f, f, nil
     82 }
     83 func (pd PendingDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
     84 	slog.Debug("FUSE: Lookup", "name", name)
     85 	job, err := pd.manager.store.GetJob(name)
     86 	if err != nil {
     87 		return nil, syscall.ENOENT
     88 	}
     89 	if job.State == store.StatePending {
     90 		slog.Debug("FUSE: Found job", "id", job.ID)
     91 		return &File{job, pd.manager}, nil
     92 	} else {
     93 		return nil, syscall.ENOENT
     94 	}
     95 }
     96 
     97 type File struct {
     98 	job     *store.Job
     99 	manager *JobManager
    100 }
    101 
    102 func (f File) Attr(ctx context.Context, a *fuse.Attr) error {
    103 	// Append 20 to avoid collission with static files
    104 	a.Inode = uint64(f.job.INode + 20)
    105 	slog.Debug("FUSE", "inode", a.Inode)
    106 	a.Mode = 0o750
    107 	a.Gid = uint32(os.Getgid())
    108 	a.Uid = uint32(os.Getuid())
    109 	a.Mtime = f.job.UpdatedAt
    110 	a.Ctime = f.job.CreatedAt
    111 	res, err := f.readContent()
    112 	if err != nil {
    113 		a.Size = 0
    114 		return nil
    115 	}
    116 	a.Size = uint64(len(res))
    117 	return nil
    118 }
    119 
    120 func (f *File) ReadAll(ctx context.Context) ([]byte, error) {
    121 	slog.Debug("FUSE: Read file content")
    122 	return f.readContent()
    123 }
    124 
    125 // Combines Command + Output (Live or DB)
    126 func (f *File) readContent() ([]byte, error) {
    127 	var output []byte
    128 
    129 	liveLog := f.manager.GetLiveLog(f.job.ID)
    130 
    131 	if liveLog != nil {
    132 		slog.Debug("FUSE: Using live output")
    133 		output = liveLog
    134 	} else {
    135 		slog.Debug("FUSE: Using job output")
    136 		output = f.job.Output
    137 	}
    138 
    139 	var buf bytes.Buffer
    140 	buf.WriteString(">>> ")
    141 	buf.WriteString(f.job.Command)
    142 	buf.WriteString("\n")
    143 	buf.Write(output)
    144 
    145 	return buf.Bytes(), nil
    146 }
    147 
    148 // Used before adding to the database. Lives in memory until the file is closed.
    149 type JobCreationFile struct {
    150 	id      string
    151 	manager *JobManager
    152 	buf     bytes.Buffer
    153 	uid     uint32
    154 	gid     uint32
    155 	mode    os.FileMode
    156 }
    157 
    158 func (f *JobCreationFile) Attr(ctx context.Context, a *fuse.Attr) error {
    159 	a.Mode = f.mode
    160 	a.Size = uint64(f.buf.Len())
    161 	a.Uid = f.uid
    162 	a.Gid = f.gid
    163 	return nil
    164 }
    165 
    166 func (f *JobCreationFile) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
    167 	n, err := f.buf.Write(req.Data)
    168 	resp.Size = n
    169 	return err
    170 }
    171 
    172 func (f *JobCreationFile) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
    173 	command := f.buf.String()
    174 
    175 	slog.Info("New job", "id", f.id, "command", command)
    176 
    177 	_, err := f.manager.store.CreateJob(f.id, command)
    178 	f.manager.notify()
    179 	if err != nil {
    180 		// XXX Should check specifically for file exist error
    181 		slog.Error("Failed to create job. Is the job name unique? I.E. no pending/completed/failed jobs with same name.", "error", err)
    182 		return syscall.EEXIST
    183 	}
    184 	return nil
    185 }