Skip to content

Commit

Permalink
Split out Watcher and backends (#632)
Browse files Browse the repository at this point in the history
Rather than have a bunch of Watcher types guarded by build tags, have
one Watcher that's always available which proxies to a backend
interface.

This will allow adding a polling watcher, fanotify, fsevents, and things
like that. There are no backends to select from yet, and I'm not 100%
sure yet what an API for that would look like, but this sets up the
scaffolding for all of it.

Backends are per-watcher; originally I prototyped something that allows
selecting it per-Add() call, but the bookkeeping on that became rather
complex, and this use case is probably far too rare to spend a lot of
effort on. People can still use different backends by using different
Watchers; they'll just have to do the bookkeeping themselves.
  • Loading branch information
arp242 committed May 2, 2024
1 parent a618f07 commit bec8903
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 1,284 deletions.
220 changes: 22 additions & 198 deletions backend_fen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
// FEN backend for illumos (supported) and Solaris (untested, but should work).
//
// See port_create(3c) etc. for docs. https://www.illumos.org/man/3C/port_create
//
// Note: the documentation on the Watcher type and methods is generated from
// mkdoc.zsh

package fsnotify

Expand All @@ -21,112 +18,8 @@ import (
"golang.org/x/sys/unix"
)

// Watcher watches a set of paths, delivering events on a channel.
//
// A watcher should not be copied (e.g. pass it by pointer, rather than by
// value).
//
// # Linux notes
//
// When a file is removed a Remove event won't be emitted until all file
// descriptors are closed, and deletes will always emit a Chmod. For example:
//
// fp := os.Open("file")
// os.Remove("file") // Triggers Chmod
// fp.Close() // Triggers Remove
//
// This is the event that inotify sends, so not much can be changed about this.
//
// The fs.inotify.max_user_watches sysctl variable specifies the upper limit
// for the number of watches per user, and fs.inotify.max_user_instances
// specifies the maximum number of inotify instances per user. Every Watcher you
// create is an "instance", and every path you add is a "watch".
//
// These are also exposed in /proc as /proc/sys/fs/inotify/max_user_watches and
// /proc/sys/fs/inotify/max_user_instances
//
// To increase them you can use sysctl or write the value to the /proc file:
//
// # Default values on Linux 5.18
// sysctl fs.inotify.max_user_watches=124983
// sysctl fs.inotify.max_user_instances=128
//
// To make the changes persist on reboot edit /etc/sysctl.conf or
// /usr/lib/sysctl.d/50-default.conf (details differ per Linux distro; check
// your distro's documentation):
//
// fs.inotify.max_user_watches=124983
// fs.inotify.max_user_instances=128
//
// Reaching the limit will result in a "no space left on device" or "too many open
// files" error.
//
// # kqueue notes (macOS, BSD)
//
// kqueue requires opening a file descriptor for every file that's being watched;
// so if you're watching a directory with five files then that's six file
// descriptors. You will run in to your system's "max open files" limit faster on
// these platforms.
//
// The sysctl variables kern.maxfiles and kern.maxfilesperproc can be used to
// control the maximum number of open files, as well as /etc/login.conf on BSD
// systems.
//
// # Windows notes
//
// Paths can be added as "C:\path\to\dir", but forward slashes
// ("C:/path/to/dir") will also work.
//
// When a watched directory is removed it will always send an event for the
// directory itself, but may not send events for all files in that directory.
// Sometimes it will send events for all times, sometimes it will send no
// events, and often only for some files.
//
// The default ReadDirectoryChangesW() buffer size is 64K, which is the largest
// value that is guaranteed to work with SMB filesystems. If you have many
// events in quick succession this may not be enough, and you will have to use
// [WithBufferSize] to increase the value.
type Watcher struct {
// Events sends the filesystem change events.
//
// fsnotify can send the following events; a "path" here can refer to a
// file, directory, symbolic link, or special file like a FIFO.
//
// fsnotify.Create A new path was created; this may be followed by one
// or more Write events if data also gets written to a
// file.
//
// fsnotify.Remove A path was removed.
//
// fsnotify.Rename A path was renamed. A rename is always sent with the
// old path as Event.Name, and a Create event will be
// sent with the new name. Renames are only sent for
// paths that are currently watched; e.g. moving an
// unmonitored file into a monitored directory will
// show up as just a Create. Similarly, renaming a file
// to outside a monitored directory will show up as
// only a Rename.
//
// fsnotify.Write A file or named pipe was written to. A Truncate will
// also trigger a Write. A single "write action"
// initiated by the user may show up as one or multiple
// writes, depending on when the system syncs things to
// disk. For example when compiling a large Go program
// you may get hundreds of Write events, and you may
// want to wait until you've stopped receiving them
// (see the dedup example in cmd/fsnotify).
//
// Some systems may send Write event for directories
// when the directory content changes.
//
// fsnotify.Chmod Attributes were changed. On Linux this is also sent
// when a file is removed (or more accurately, when a
// link to an inode is removed). On kqueue it's sent
// when a file is truncated. On Windows it's never
// sent.
type fen struct {
Events chan Event

// Errors sends any errors.
Errors chan error

mu sync.Mutex
Expand All @@ -136,23 +29,14 @@ type Watcher struct {
watches map[string]Op // Explicitly watched non-directories
}

// NewWatcher creates a new Watcher.
func NewWatcher() (*Watcher, error) {
return NewBufferedWatcher(0)
func newBackend(ev chan Event, errs chan error) (backend, error) {
return newBufferedBackend(0, ev, errs)
}

// NewBufferedWatcher creates a new Watcher with a buffered Watcher.Events
// channel.
//
// The main use case for this is situations with a very large number of events
// where the kernel buffer size can't be increased (e.g. due to lack of
// permissions). An unbuffered Watcher will perform better for almost all use
// cases, and whenever possible you will be better off increasing the kernel
// buffers instead of adding a large userspace buffer.
func NewBufferedWatcher(sz uint) (*Watcher, error) {
w := &Watcher{
Events: make(chan Event, sz),
Errors: make(chan error),
func newBufferedBackend(sz uint, ev chan Event, errs chan error) (backend, error) {
w := &fen{
Events: ev,
Errors: errs,
dirs: make(map[string]Op),
watches: make(map[string]Op),
done: make(chan struct{}),
Expand All @@ -170,7 +54,7 @@ func NewBufferedWatcher(sz uint) (*Watcher, error) {

// sendEvent attempts to send an event to the user, returning true if the event
// was put in the channel successfully and false if the watcher has been closed.
func (w *Watcher) sendEvent(name string, op Op) (sent bool) {
func (w *fen) sendEvent(name string, op Op) (sent bool) {
select {
case <-w.done:
return false
Expand All @@ -181,7 +65,7 @@ func (w *Watcher) sendEvent(name string, op Op) (sent bool) {

// sendError attempts to send an error to the user, returning true if the error
// was put in the channel successfully and false if the watcher has been closed.
func (w *Watcher) sendError(err error) (sent bool) {
func (w *fen) sendError(err error) (sent bool) {
if err == nil {
return true
}
Expand All @@ -193,7 +77,7 @@ func (w *Watcher) sendError(err error) (sent bool) {
}
}

func (w *Watcher) isClosed() bool {
func (w *fen) isClosed() bool {
select {
case <-w.done:
return true
Expand All @@ -202,8 +86,7 @@ func (w *Watcher) isClosed() bool {
}
}

// Close removes all watches and closes the Events channel.
func (w *Watcher) Close() error {
func (w *fen) Close() error {
// Take the lock used by associateFile to prevent lingering events from
// being processed after the close
w.mu.Lock()
Expand All @@ -215,52 +98,9 @@ func (w *Watcher) Close() error {
return w.port.Close()
}

// Add starts monitoring the path for changes.
//
// A path can only be watched once; watching it more than once is a no-op and will
// not return an error. Paths that do not yet exist on the filesystem cannot be
// watched.
//
// A watch will be automatically removed if the watched path is deleted or
// renamed. The exception is the Windows backend, which doesn't remove the
// watcher on renames.
//
// Notifications on network filesystems (NFS, SMB, FUSE, etc.) or special
// filesystems (/proc, /sys, etc.) generally don't work.
//
// Returns [ErrClosed] if [Watcher.Close] was called.
//
// See [Watcher.AddWith] for a version that allows adding options.
//
// # Watching directories
//
// All files in a directory are monitored, including new files that are created
// after the watcher is started. Subdirectories are not watched (i.e. it's
// non-recursive).
//
// # Watching files
//
// Watching individual files (rather than directories) is generally not
// recommended as many programs (especially editors) update files atomically: it
// will write to a temporary file which is then moved to to destination,
// overwriting the original (or some variant thereof). The watcher on the
// original file is now lost, as that no longer exists.
//
// The upshot of this is that a power failure or crash won't leave a
// half-written file.
//
// Watch the parent directory and use Event.Name to filter out files you're not
// interested in. There is an example of this in cmd/fsnotify/file.go.
func (w *Watcher) Add(name string) error { return w.AddWith(name) }
func (w *fen) Add(name string) error { return w.AddWith(name) }

// AddWith is like [Watcher.Add], but allows adding options. When using Add()
// the defaults described below are used.
//
// Possible options are:
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
func (w *Watcher) AddWith(name string, opts ...addOpt) error {
func (w *fen) AddWith(name string, opts ...addOpt) error {
if w.isClosed() {
return ErrClosed
}
Expand Down Expand Up @@ -305,15 +145,7 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
return nil
}

// Remove stops monitoring the path for changes.
//
// Directories are always removed non-recursively. For example, if you added
// /tmp/dir and /tmp/dir/subdir then you will need to remove both.
//
// Removing a path that has not yet been added returns [ErrNonExistentWatch].
//
// Returns nil if [Watcher.Close] was called.
func (w *Watcher) Remove(name string) error {
func (w *fen) Remove(name string) error {
if w.isClosed() {
return nil
}
Expand Down Expand Up @@ -356,7 +188,7 @@ func (w *Watcher) Remove(name string) error {
}

// readEvents contains the main loop that runs in a goroutine watching for events.
func (w *Watcher) readEvents() {
func (w *fen) readEvents() {
// If this function returns, the watcher has been closed and we can close
// these channels
defer func() {
Expand Down Expand Up @@ -404,7 +236,7 @@ func (w *Watcher) readEvents() {
}
}

func (w *Watcher) handleDirectory(path string, stat os.FileInfo, follow bool, handler func(string, os.FileInfo, bool) error) error {
func (w *fen) handleDirectory(path string, stat os.FileInfo, follow bool, handler func(string, os.FileInfo, bool) error) error {
files, err := os.ReadDir(path)
if err != nil {
return err
Expand All @@ -430,7 +262,7 @@ func (w *Watcher) handleDirectory(path string, stat os.FileInfo, follow bool, ha
// bitmap matches more than one event type (e.g. the file was both modified and
// had the attributes changed between when the association was created and the
// when event was returned)
func (w *Watcher) handleEvent(event *unix.PortEvent) error {
func (w *fen) handleEvent(event *unix.PortEvent) error {
var (
events = event.Events
path = event.Path
Expand Down Expand Up @@ -549,7 +381,7 @@ func (w *Watcher) handleEvent(event *unix.PortEvent) error {
return nil
}

func (w *Watcher) updateDirectory(path string) error {
func (w *fen) updateDirectory(path string) error {
// The directory was modified, so we must find unwatched entities and watch
// them. If something was removed from the directory, nothing will happen,
// as everything else should still be watched.
Expand Down Expand Up @@ -579,7 +411,7 @@ func (w *Watcher) updateDirectory(path string) error {
return nil
}

func (w *Watcher) associateFile(path string, stat os.FileInfo, follow bool) error {
func (w *fen) associateFile(path string, stat os.FileInfo, follow bool) error {
if w.isClosed() {
return ErrClosed
}
Expand Down Expand Up @@ -617,18 +449,14 @@ func (w *Watcher) associateFile(path string, stat os.FileInfo, follow bool) erro
return w.port.AssociatePath(path, stat, events, stat.Mode())
}

func (w *Watcher) dissociateFile(path string, stat os.FileInfo, unused bool) error {
func (w *fen) dissociateFile(path string, stat os.FileInfo, unused bool) error {
if !w.port.PathIsWatched(path) {
return nil
}
return w.port.DissociatePath(path)
}

// WatchList returns all paths explicitly added with [Watcher.Add] (and are not
// yet removed).
//
// Returns nil if [Watcher.Close] was called.
func (w *Watcher) WatchList() []string {
func (w *fen) WatchList() []string {
if w.isClosed() {
return nil
}
Expand All @@ -647,11 +475,7 @@ func (w *Watcher) WatchList() []string {
return entries
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
func (w *fen) xSupports(op Op) bool {
if op.Has(xUnportableOpen) || op.Has(xUnportableRead) ||
op.Has(xUnportableCloseWrite) || op.Has(xUnportableCloseRead) {
return false
Expand Down
12 changes: 6 additions & 6 deletions backend_fen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ func TestRemoveState(t *testing.T) {

check := func(wantDirs, wantFiles int) {
t.Helper()
if len(w.watches) != wantFiles {
if len(w.b.(*fen).watches) != wantFiles {
var d []string
for k, v := range w.watches {
for k, v := range w.b.(*fen).watches {
d = append(d, fmt.Sprintf("%#v = %#v", k, v))
}
t.Errorf("unexpected number of entries in w.watches (have %d, want %d):\n%v",
len(w.watches), wantFiles, strings.Join(d, "\n"))
len(w.b.(*fen).watches), wantFiles, strings.Join(d, "\n"))
}
if len(w.dirs) != wantDirs {
if len(w.b.(*fen).dirs) != wantDirs {
var d []string
for k, v := range w.dirs {
for k, v := range w.b.(*fen).dirs {
d = append(d, fmt.Sprintf("%#v = %#v", k, v))
}
t.Errorf("unexpected number of entries in w.dirs (have %d, want %d):\n%v",
len(w.dirs), wantDirs, strings.Join(d, "\n"))
len(w.b.(*fen).dirs), wantDirs, strings.Join(d, "\n"))
}
}

Expand Down

0 comments on commit bec8903

Please sign in to comment.