Skip to content

Commit

Permalink
Merge pull request #215 from josharian/dispatch-queue
Browse files Browse the repository at this point in the history
dispatch queue
  • Loading branch information
rjeczalik committed Jan 12, 2023
2 parents cbbb208 + f02e83b commit bc7f94f
Showing 1 changed file with 11 additions and 46 deletions.
57 changes: 11 additions & 46 deletions watcher_fsevents_cgo.go
Expand Up @@ -9,10 +9,8 @@ package notify

/*
#include <CoreServices/CoreServices.h>
#include <dispatch/dispatch.h>
typedef void (*CFRunLoopPerformCallBack)(void*);
void gosource(void *);
void gostream(uintptr_t, uintptr_t, size_t, uintptr_t, uintptr_t, uintptr_t);
static FSEventStreamRef EventStreamCreate(FSEventStreamContext * context, uintptr_t info, CFArrayRef paths, FSEventStreamEventId since, CFTimeInterval latency, FSEventStreamCreateFlags flags) {
Expand All @@ -27,7 +25,6 @@ import "C"
import (
"errors"
"os"
"runtime"
"sync"
"sync/atomic"
"unsafe"
Expand All @@ -42,45 +39,18 @@ var (
since = uint64(C.FSEventsGetCurrentEventId())
)

var runloop C.CFRunLoopRef // global runloop which all streams are registered with
var wg sync.WaitGroup // used to wait until the runloop starts

// source is used for synchronization purposes - it signals when runloop has
// started and is ready via the wg. It also serves purpose of a dummy source,
// thanks to it the runloop does not return as it also has at least one source
// registered.
var source = C.CFRunLoopSourceCreate(C.kCFAllocatorDefault, 0, &C.CFRunLoopSourceContext{
perform: (C.CFRunLoopPerformCallBack)(C.gosource),
})
// global dispatch queue which all streams are registered with
var q C.dispatch_queue_t = C.dispatch_queue_create(
C.CString("com.github.rjeczalik.notify"),
(C.dispatch_queue_attr_t)(C.DISPATCH_QUEUE_SERIAL),
)

// Errors returned when FSEvents functions fail.
var (
errCreate = os.NewSyscallError("FSEventStreamCreate", errors.New("NULL"))
errStart = os.NewSyscallError("FSEventStreamStart", errors.New("false"))
)

// initializes the global runloop and ensures any created stream awaits its
// readiness.
func init() {
wg.Add(1)
go func() {
// There is exactly one run loop per thread. Lock this goroutine to its
// thread to ensure that it's not rescheduled on a different thread while
// setting up the run loop.
runtime.LockOSThread()
runloop = C.CFRunLoopGetCurrent()
C.CFRunLoopAddSource(runloop, source, C.kCFRunLoopDefaultMode)
C.CFRunLoopRun()
panic("runloop has just unexpectedly stopped")
}()
C.CFRunLoopSourceSignal(source)
}

//export gosource
func gosource(unsafe.Pointer) {
wg.Done()
}

//export gostream
func gostream(_, info uintptr, n C.size_t, paths, flags, ids uintptr) {
const (
Expand Down Expand Up @@ -143,8 +113,7 @@ func (r *streamFuncRegistry) delete(id uintptr) {
delete(r.m, id)
}

// Stream represents single watch-point which listens for events scheduled by
// the global runloop.
// Stream represents a single watch-point which listens for events scheduled on the global dispatch queue.
type stream struct {
path string
ref C.FSEventStreamRef
Expand All @@ -160,39 +129,35 @@ func newStream(path string, fn streamFunc) *stream {
}
}

// Start creates a FSEventStream for the given path and schedules it with
// global runloop. It's a nop if the stream was already started.
// Start creates a FSEventStream for the given path and schedules on the global dispatch queue.
// It's a nop if the stream was already started.
func (s *stream) Start() error {
if s.ref != nilstream {
return nil
}
wg.Wait()
p := C.CFStringCreateWithCStringNoCopy(C.kCFAllocatorDefault, C.CString(s.path), C.kCFStringEncodingUTF8, C.kCFAllocatorDefault)
path := C.CFArrayCreate(C.kCFAllocatorDefault, (*unsafe.Pointer)(unsafe.Pointer(&p)), 1, nil)
ctx := C.FSEventStreamContext{}
ref := C.EventStreamCreate(&ctx, C.uintptr_t(s.info), path, C.FSEventStreamEventId(atomic.LoadUint64(&since)), latency, flags)
if ref == nilstream {
return errCreate
}
C.FSEventStreamScheduleWithRunLoop(ref, runloop, C.kCFRunLoopDefaultMode)
C.FSEventStreamSetDispatchQueue(ref, q)
if C.FSEventStreamStart(ref) == C.Boolean(0) {
C.FSEventStreamInvalidate(ref)
return errStart
}
C.CFRunLoopWakeUp(runloop)
s.ref = ref
return nil
}

// Stop stops underlying FSEventStream and unregisters it from global runloop.
// Stop stops underlying FSEventStream and unregisters it from the global dispatch queue.
func (s *stream) Stop() {
if s.ref == nilstream {
return
}
wg.Wait()
C.FSEventStreamStop(s.ref)
C.FSEventStreamInvalidate(s.ref)
C.CFRunLoopWakeUp(runloop)
s.ref = nilstream
streamFuncs.delete(s.info)
}

0 comments on commit bc7f94f

Please sign in to comment.