From b922bb108f256bf40b70bb2c4973330afaa4794f Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Sun, 10 Sep 2023 16:46:05 +0200 Subject: [PATCH] feat: first implementation --- README.md | 83 ++++++++++++++++- Taskfile.yml | 2 + dedup.go | 90 ++++++++++++++++++ error.go | 37 ++++++++ go.mod | 11 ++- go.sum | 12 +++ hash.go | 150 ++++++++++++++++++++++++++++++ hash_api.go | 130 ++++++++++++++++++++++++++ hash_api_test.go | 170 ++++++++++++++++++++++++++++++++++ hash_test.go | 156 +++++++++++++++++++++++++++++++ hash_watcher.go | 79 ++++++++++++++++ hash_watcher_test.go | 55 +++++++++++ lighthouse.go | 212 +++++++++++++++++++++++++++++++++++++++++++ lighthouse_test.go | 86 ++++++++++++++++++ watch.go | 85 ----------------- watch_test.go | 47 ---------- watching.go | 130 ++++++++++++++++++++++++++ watching_test.go | 145 +++++++++++++++++++++++++++++ 18 files changed, 1545 insertions(+), 135 deletions(-) create mode 100644 dedup.go create mode 100644 error.go create mode 100644 hash.go create mode 100644 hash_api.go create mode 100644 hash_api_test.go create mode 100644 hash_test.go create mode 100644 hash_watcher.go create mode 100644 hash_watcher_test.go create mode 100644 lighthouse.go create mode 100644 lighthouse_test.go delete mode 100644 watch.go delete mode 100644 watch_test.go create mode 100644 watching.go create mode 100644 watching_test.go diff --git a/README.md b/README.md index 46fc6b1..72b9508 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,22 @@ -## Wathc +## Watch ## What does this library? -This library allows you to monitor and control files and define actions for changes. +Lighthouse Watch is a Go library for efficiently watching file and directory changes in your filesystem. +Built on top of the powerful fsnotify library, Lighthouse offers a simple, yet highly flexible, API to monitor +changes in real-time. + +**Features** +* Real-time file and directory change events +* Customizable event handlers for different types of operations (Create, Write, Rename, Remove) +* Debounce mechanism to group rapid events +* Thread-safe operations +* Error handling ## Installation +Install the package using Go's package manager: + ```shell go get gitlab.schukai.com/oss/libraries/go/utilities/watch ``` @@ -14,6 +25,74 @@ go get gitlab.schukai.com/oss/libraries/go/utilities/watch ## Usage +```go +package main + +import ( + "fmt" + "gitlab.schukai.com/oss/libraries/go/utilities/watch" +) + +func main() { + // Initialize Lighthouse + l := watch.NewLighthouse() + + // Define Watch instance + w1 := &watch.Watch{ + Path: "/path/to/watch", + OnCreate: func(name string) { + fmt.Println("File created:", name) + }, + OnChange: func(name string) { + fmt.Println("File changed:", name) + }, + OnRename: func(name string) { + fmt.Println("File renamed:", name) + }, + OnDelete: func(name string) { + fmt.Println("File deleted:", name) + }, + } + + // Add watch + err := l.Add(w1) + if err != nil { + fmt.Println("Error:", err) + return + } + + // Start watching + l.StartWatching() +} +``` + +Add Multiple Watches + +```go +w2 := &watch.Watch{ /*...*/ } +err = l.Add(w2) +``` + +Remove a Watch + +```go +err = l.Remove("/path/to/watch") +``` + +Error Handling + +```go +l.OnError = func(err error) { + fmt.Println("Error:", err) +} +``` + +Debounce + +```go +// Debounce time in milliseconds +l.Debounce = 1000 +``` ## Contributing diff --git a/Taskfile.yml b/Taskfile.yml index 7febe83..055abcf 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -12,6 +12,8 @@ tasks: cmds: - echo "Execute unit tests in Go." - go test -cover -v ./... + - go test -bench . + - go test -race . test-fuzz: desc: Conduct fuzzing tests.# diff --git a/dedup.go b/dedup.go new file mode 100644 index 0000000..b53b8ff --- /dev/null +++ b/dedup.go @@ -0,0 +1,90 @@ +package watch + +// +//// dedup is a function that watches a set of paths and prints a message when +//func (w *Watch) Decouple(waitFor time.Duration) { +// if waitFor == 0 { +// waitFor = 100 * time.Millisecond +// } +// go decoupleLoop(w.watcher, waitFor) +// <-make(chan struct{}) // Block forever +//} +// +//// dedupLoop is the main loop for dedup. +//func decoupleLoop(w *fsnotify.Watcher, waitFor time.Duration) { +// var ( +// mu sync.Mutex +// timers = make(map[string]*time.Timer) +// +// printEvent = func(e fsnotify.Event) { +// fmt.Printf("%s: %s\n", e.Op, e.Name) +// +// // Don't need to remove the timer if you don't have a lot of files. +// mu.Lock() +// delete(timers, e.Name) +// mu.Unlock() +// } +// ) +// +// for { +// select { +// // Read from Errors. +// case err, ok := <-w.Errors: +// if !ok { // Channel was closed (i.e. Watcher.Close() was called). +// return +// } +// fmt.Println("error:", err) +// // Read from Events. +// case e, ok := <-w.Events: +// if !ok { // Channel was closed (i.e. Watcher.Close() was called). +// return +// } +// +// if e.Op&fsnotify.Chmod == fsnotify.Chmod { +// //util.Remove(fullPath) +// } +// +// if e.Op&fsnotify.Write == fsnotify.Write { +// //util.Remove(fullPath) +// } +// +// if e.Op&fsnotify.Create == fsnotify.Create { +// //logging.LogInfo("watching: created file: %s", d) +// //scanDirectory(fullPath, "") +// //navigation.AddFile(fullPath) +// } +// +// if e.Op&fsnotify.Remove == fsnotify.Remove { +// //logging.LogInfo("watching: removed file: %s", d) +// //removeFromWatchlist(fullPath) +// //util.Remove(fullPath) +// //navigation.RemoveFile(fullPath) +// } +// +// if e.Op&fsnotify.Rename == fsnotify.Rename { +// //logging.LogInfo("watching: renamed file: %s", event.Name) +// //removeFromWatchlist(fullPath) +// //util.Remove(fullPath) +// //navigation.RemoveFile(fullPath) +// } +// +// // Get timer. +// mu.Lock() +// t, ok := timers[e.Name] +// mu.Unlock() +// +// // No timer yet, so create one. +// if !ok { +// t = time.AfterFunc(math.MaxInt64, func() { printEvent(e) }) +// t.Stop() +// +// mu.Lock() +// timers[e.Name] = t +// mu.Unlock() +// } +// +// // Reset the timer for this path, so it will start from 100ms again. +// t.Reset(waitFor) +// } +// } +//} diff --git a/error.go b/error.go new file mode 100644 index 0000000..47ef9f1 --- /dev/null +++ b/error.go @@ -0,0 +1,37 @@ +package watch + +import ( + "fmt" +) + +// WatchPathError represents an error when trying to remove an unwatched path. +type UnwatchedPathError struct { + UnwatchedPath string +} + +func (e UnwatchedPathError) Error() string { + return fmt.Sprintf("Path %s is not being watched", e.UnwatchedPath) +} + +// WatchPathError represents an error when trying to add a path that is already being watched. +type AlreadyWatchedPathError struct { + AlreadyWatched string +} + +func (e AlreadyWatchedPathError) Error() string { + return fmt.Sprintf("Path %s is already being watched", e.AlreadyWatched) +} + +// WatcherNotActiveError represents an error when trying to add a path that is already being watched. +type WatcherNotActiveError struct{} + +func (e WatcherNotActiveError) Error() string { + return "Watcher is not active" +} + +// LighthouseNotActiveError represents an error when trying to add a path that is already being watched. +type LighthouseNotActiveError struct{} + +func (e LighthouseNotActiveError) Error() string { + return "lighthouse is not active" +} diff --git a/go.mod b/go.mod index dae053a..ddce0ab 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,15 @@ module gitlab.schukai.com/oss/libraries/go/utilities/watch go 1.20 require ( + github.com/fsnotify/fsnotify v1.6.0 + github.com/stretchr/testify v1.8.4 + gopkg.in/fsnotify.v1 v1.4.7 + +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.12.0 // indirect - gopkg.in/fsnotify.v1 v1.6.0 + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8150683..543e7d6 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,16 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/hash.go b/hash.go new file mode 100644 index 0000000..53baaed --- /dev/null +++ b/hash.go @@ -0,0 +1,150 @@ +package watch + +import ( + "crypto/md5" + "encoding/hex" + "io" + "os" + "time" +) + +const hashExpiration = 24 * 3600 // 1 day in seconds +const FastHashSize = 8192 // 8 KB + +type fileStatInfo struct { + path string + info os.FileInfo + opened *os.File + lastError error +} + +func newFileStatInfo(path string) *fileStatInfo { + s := &fileStatInfo{path: path} + + info, err := os.Stat(path) + if err != nil { + s.lastError = err + return s + } + + s.info = info + + return s +} + +func (f *fileStatInfo) close() { + if f.opened != nil { + _ = f.opened.Close() + } +} + +func (f *fileStatInfo) open() { + if f.opened != nil { + return + } + + f.opened, f.lastError = os.Open(f.path) +} + +func (f *fileStatInfo) exists() bool { + return f.info != nil +} + +var ( + // watcherImpl is the singleton instance of the Watcher. + watcherImpl *Watcher +) + +func init() { + watcherImpl = NewWatcher() +} + +// buildHash generates both a complete and a "fast" hash. +// the fast hash is for hashing big files. +func buildHash(info *fileStatInfo) FileHash { + h := FileHash{Path: info.path} + + hash, fastHash := hashFile(info) + if info.lastError != nil { + return h + } + + h.Hash = hash + h.FastHash = fastHash + h.expiresAt = time.Now().Add(hashExpiration * time.Second) + h.lastModTime = info.info.ModTime() + return h +} + +// hashFile creates a full hash and a "fast" hash. +func hashFile(info *fileStatInfo) (*string, *string) { + // Full hash + fullHash := fullHashFile(info) + if info.lastError != nil { + return nil, nil + } + + // Fast hash + fastHash := fastHashFile(info) + if info.lastError != nil { + return nil, nil + } + + return fullHash, fastHash +} + +// fullHashFile creates a complete hash. +func fullHashFile(info *fileStatInfo) *string { + + info.open() + if info.lastError != nil { + return nil + } + + file := info.opened + + hash := md5.New() + if _, err := io.Copy(hash, file); err != nil { + info.lastError = err + return nil + } + + if _, err := file.Seek(0, 0); err != nil { + info.lastError = err + return nil + } + + hashString := hex.EncodeToString(hash.Sum(nil)[:16]) + return &hashString +} + +// fastHashFile creates a "fast" hash. +func fastHashFile(info *fileStatInfo) *string { + info.open() + if info.lastError != nil { + return nil + } + + file := info.opened + + fileSize := info.info.Size() + if fileSize < FastHashSize { + return fullHashFile(info) + } + + offset := fileSize/2 - FastHashSize/2 + + fastHash := md5.New() + buffer := make([]byte, FastHashSize) + + readPositions := []int64{0, offset, fileSize - FastHashSize} + for _, pos := range readPositions { + if _, err := file.ReadAt(buffer, pos); err != nil { + return nil + } + fastHash.Write(buffer) + } + + hashString := hex.EncodeToString(fastHash.Sum(nil)[:16]) + return &hashString +} diff --git a/hash_api.go b/hash_api.go new file mode 100644 index 0000000..8adbe65 --- /dev/null +++ b/hash_api.go @@ -0,0 +1,130 @@ +package watch + +import ( + "os" + "time" +) + +// FileHash contains the md5 hash of a file and the last modification time. +type FileHash struct { + Path string + Hash *string + FastHash *string + expiresAt time.Time + lastModTime time.Time +} + +// RemoveFileHash removes a file from the fileHashCache. +// if file not in cache return error that file not in cache +// This function uses the global fileHashCache. +func RemoveFileHash(path string) error { + return watcherImpl.Remove(path) +} + +// Remove removes a file from the fileHashCache. +// If the watcher is not active, it returns an error (WatcherNotActiveError). +func (w *Watcher) Remove(path string) error { + + if !w.IsActive() { + return WatcherNotActiveError{} + } + + // check if file in fileHashCache, if not return error that file not in cache + if _, ok := w.cache.Load(path); !ok { + return os.ErrNotExist + } + + w.cache.Delete(path) + return nil +} + +// AddFileHash adds a file to the fileHashCache. +// This function uses the global fileHashCache. +func AddFileHash(path string) error { + return watcherImpl.Add(path) +} + +// Add adds a file to the watcher cache. +func (w *Watcher) Add(path string) error { + + if !w.IsActive() { + return WatcherNotActiveError{} + } + + info := newFileStatInfo(path) + defer info.close() + + if !info.exists() { + return os.ErrNotExist + } + + h := buildHash(info) + if info.lastError != nil { + return info.lastError + } + + w.cache.Store(path, h) + return nil +} + +// HasFileChanged checks if a file has changed since the last time it was checked. +// It returns true if the file has changed, false if it hasn't and an error if +// something went wrong. +// This function uses the global fileHashCache. +func HasFileChanged(path string) (bool, error) { + return watcherImpl.Has(path) +} + +// Has checks if a file has changed since the last time it was checked. +// It returns true if the file has changed, false if it hasn't and an error if +// something went wrong. +// If the watcher is not active, it returns false and an error (WatcherNotActiveError). +func (w *Watcher) Has(path string) (bool, error) { + + if !w.IsActive() { + return false, WatcherNotActiveError{} + } + + value, ok := w.cache.Load(path) + if !ok { + return true, nil + } + + cachedHash, ok := value.(FileHash) + if !ok { + return true, nil + } + + info := newFileStatInfo(path) + defer info.close() + + if !info.exists() { + return true, nil + } + + modTime := info.info.ModTime() + if modTime.After(cachedHash.lastModTime) { + return true, nil + } + + currentFastHash := fastHashFile(info) + if info.lastError != nil { + return false, info.lastError + } + + if currentFastHash != nil && cachedHash.FastHash != nil && *currentFastHash != *cachedHash.FastHash { + return true, nil + } + + // Calculate full hash only if fast hash differs + currentFullHash := fullHashFile(info) + if info.lastError != nil { + return false, info.lastError + } + + if *currentFullHash != *cachedHash.Hash { + return true, nil + } + + return false, nil +} diff --git a/hash_api_test.go b/hash_api_test.go new file mode 100644 index 0000000..da1e065 --- /dev/null +++ b/hash_api_test.go @@ -0,0 +1,170 @@ +package watch + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestAddFileHash(t *testing.T) { + // Create a temporary test file + file, err := os.CreateTemp("", "testfile") + if err != nil { + t.Fatal("Failed to create test file:", err) + } + defer func() { + _ = os.Remove(file.Name()) + }() + + _ = file.Close() + + err = AddFileHash(file.Name()) + assert.Nil(t, err) + + _, ok := watcherImpl.cache.Load(file.Name()) + assert.True(t, ok) +} + +func TestRemoveFileHash(t *testing.T) { + watcherImpl.cache.Store("testfile", FileHash{Path: "testfile"}) + err := RemoveFileHash("testfile") + assert.Nil(t, err) + + _, ok := watcherImpl.cache.Load("testfile") + assert.False(t, ok) +} + +func TestRemoveFileHash_NotExist(t *testing.T) { + err := RemoveFileHash("nonexistent") + assert.Equal(t, os.ErrNotExist, err) +} + +func TestHasFileChanged(t *testing.T) { + // Create a temporary test file + file, err := os.CreateTemp("", "testfile") + if err != nil { + t.Fatal("Failed to create test file:", err) + } + defer func() { + _ = os.Remove(file.Name()) + }() + _ = file.Close() + + // Add it to the cache with a specific lastModTime + watcherImpl.cache.Store(file.Name(), FileHash{Path: file.Name(), lastModTime: time.Now().Add(-1 * time.Hour)}) + + // Test Has + changed, err := HasFileChanged(file.Name()) + assert.Nil(t, err) + assert.True(t, changed) +} +func TestHasFileChanged_NotExist(t *testing.T) { + changed, err := HasFileChanged("nonexistent") + assert.Nil(t, err) + assert.True(t, changed) +} + +func TestHasFileChanged_InvalidCacheType(t *testing.T) { + watcherImpl.cache.Store("testfile", "invalid type") + changed, err := HasFileChanged("testfile") + assert.Nil(t, err) + assert.True(t, changed) +} + +func TestAddFileHashWithInvalidPath(t *testing.T) { + err := AddFileHash("/invalid/path") + if err == nil { + t.Errorf("Expected an error for invalid path") + } +} + +func TestAddAndRemoveFileHash2(t *testing.T) { + file, err := os.CreateTemp("", "testfile") + if err != nil { + t.Fatal("Failed to create test file:", err) + } + defer func() { + _ = os.Remove(file.Name()) + }() + _ = file.Close() + + err = AddFileHash(file.Name()) + if err != nil { + t.Errorf("Error adding valid path: %s", err) + } + + err = RemoveFileHash(file.Name()) + if err != nil { + t.Errorf("Error removing valid path: %s", err) + } +} + +func TestAddFileHashRaceCondition(t *testing.T) { + for i := 0; i < 100; i++ { + go func() { + _ = AddFileHash("/some/path") + }() + } +} + +func TestRemoveFileHashRaceCondition(t *testing.T) { + for i := 0; i < 100; i++ { + go func() { + _ = RemoveFileHash("/some/path") + }() + } +} + +func TestHasFileChangedRaceCondition(t *testing.T) { + for i := 0; i < 100; i++ { + go func() { + _, _ = HasFileChanged("/some/path") + }() + } +} + +func BenchmarkAddFileHash(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + file, err := os.CreateTemp("", "testfile") + if err != nil { + b.Fatal("Failed to create test file:", err) + } + + _ = AddFileHash(file.Name()) + + if err := RemoveFileHash(file.Name()); err != nil { + b.Fatal("Failed to remove file hash:", err) + } + + if err := os.Remove(file.Name()); err != nil { + b.Fatal("Failed to remove test file:", err) + } + } +} + +func BenchmarkHasFileChanged(b *testing.B) { + b.ReportAllocs() + + file, err := os.CreateTemp("", "testfile") + if err != nil { + b.Fatal("Failed to create test file:", err) + } + _ = AddFileHash(file.Name()) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + + // change the file every 100 iterations + if i%100 == 0 { + _, _ = file.WriteString("test") + } + + _, _ = HasFileChanged(file.Name()) + } + +} diff --git a/hash_test.go b/hash_test.go new file mode 100644 index 0000000..b851a79 --- /dev/null +++ b/hash_test.go @@ -0,0 +1,156 @@ +package watch + +import ( + "os" + "testing" + "time" +) + +// getOrCreateFileHash returns the md5 hash of a file. If the hash is not in the cache, +// it will be calculated. If the file does not exist, an error is returned. +// This is a helper function for the tests. +func getOrCreateFileHash(info *fileStatInfo) (*string, *string) { + if !info.exists() { + return nil, nil + } + + p := info.path + if value, ok := watcherImpl.cache.Load(p); ok { + hash := value.(FileHash) + if time.Now().Before(hash.expiresAt) { + return hash.Hash, hash.FastHash + } + } + + h := buildHash(info) + if info.lastError != nil { + return nil, nil + } + + watcherImpl.cache.Store(p, h) + return h.Hash, h.FastHash +} + +func TestFileHashing(t *testing.T) { + // Create a temporary file + tempFile, err := os.CreateTemp("", "tempFile") + if err != nil { + t.Fatalf("Cannot create temporary file: %s", err) + } + defer func() { + _ = os.Remove(tempFile.Name()) + }() + // Write data to the temporary file + if _, err := tempFile.Write([]byte("Hello, world!")); err != nil { + t.Fatalf("Cannot write to temporary file: %s", err) + } + _ = tempFile.Close() + + // First hashing + info := newFileStatInfo(tempFile.Name()) + hash1, fastHash1 := getOrCreateFileHash(info) + if info.lastError != nil { + t.Fatalf("Error hashing file: %s", err) + } + + // Open again and change data + tempFile, err = os.OpenFile(tempFile.Name(), os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + t.Fatalf("Cannot open temporary file: %s", err) + } + if _, err := tempFile.Write([]byte(" More data.")); err != nil { + t.Fatalf("Cannot write to temporary file: %s", err) + } + _ = tempFile.Close() + + // Wait to make sure that the timestamps are different + time.Sleep(2 * time.Second) + + // Second hashing + info = newFileStatInfo(tempFile.Name()) + hash2, fastHash2 := getOrCreateFileHash(info) + if info.lastError != nil { + t.Fatalf("Error hashing file: %s", err) + } + + // Check the hashes + if hash1 != hash2 { + t.Errorf("Expected full hashes not to differ: %s == %s", *hash1, *hash2) + } + + if fastHash1 != fastHash2 { + t.Errorf("Expected fast hashes not to differ: %s == %s", *fastHash1, *fastHash2) + } + + // Check if the file has been changed + hasChanged, err := HasFileChanged(tempFile.Name()) + if err != nil { + t.Fatalf("Error checking file change: %s", err) + } + + if !hasChanged { + t.Errorf("Expected Has to return true, got false") + } +} + +func TestAddAndRemoveFileHash(t *testing.T) { + // Test cases + testCases := []struct { + path string + shouldExist bool + sleepTime time.Duration + expectedErr error + }{ + {"testfile1.txt", true, 0, nil}, + {"testfile2.txt", true, 0, nil}, + {"testfile3.txt", false, 0, os.ErrNotExist}, + {"testfile1.txt", true, 2 * time.Second, nil}, // Test expiration + } + + // Create test files + for _, tc := range testCases { + if tc.shouldExist { + file, err := os.Create(tc.path) + if err != nil { + t.Fatal(err) + } + _ = file.Close() + } + } + + for _, tc := range testCases { + if tc.shouldExist { + file, err := os.Create(tc.path) + if err != nil { + t.Fatal(err) // Stop the test if the file cannot be created + } + _ = file.Close() + } + + //Add hash + err := AddFileHash(tc.path) + if err != tc.expectedErr { + t.Errorf("Add(%q) error: got %v, want %v", tc.path, err, tc.expectedErr) + } + + if tc.sleepTime > 0 { + time.Sleep(tc.sleepTime) + } + + // Remove hash + err = RemoveFileHash(tc.path) + if err != tc.expectedErr { + t.Errorf("Remove(%q) error: got %v, want %v", tc.path, err, tc.expectedErr) + } + + _, ok := watcherImpl.cache.Load(tc.path) + if ok { + t.Errorf("Remove(%q) failed: value still exists in cache", tc.path) + } + + // Delete test files + if tc.shouldExist { + _ = os.Remove(tc.path) + } + } +} diff --git a/hash_watcher.go b/hash_watcher.go new file mode 100644 index 0000000..851be28 --- /dev/null +++ b/hash_watcher.go @@ -0,0 +1,79 @@ +package watch + +import ( + "context" + "sync" + "time" +) + +type Watcher struct { + cache sync.Map + mutex sync.Mutex + cancel context.CancelFunc + active bool +} + +// NewWatcher creates a new Watcher. +// This Function starts a goroutine that checks the cache every hour and removes expired entries. +// It is in your responsibility to call Stop() on the Watcher, after you are done with it. +func NewWatcher() *Watcher { + ctx, cancel := context.WithCancel(context.Background()) + + w := &Watcher{ + cache: sync.Map{}, + cancel: cancel, + mutex: sync.Mutex{}, + active: true, + } + + go w.cacheCleaner(ctx) + return w +} + +// Stop stops the cleaner goroutine. +func (w *Watcher) Stop() { + w.cancel() +} + +// IsActive returns true if the watcher is active, false if not. +// An not active watcher can not be used to add, remove or check files. +func (w *Watcher) IsActive() bool { + w.mutex.Lock() + defer w.mutex.Unlock() + return w.active +} + +func (w *Watcher) setActive() { + w.mutex.Lock() + defer w.mutex.Unlock() + w.active = true +} + +func (w *Watcher) setInactive() { + w.mutex.Lock() + defer w.mutex.Unlock() + w.active = false +} + +// cacheCleaner is a goroutine that checks the cache every hour and removes expired entries. +func (w *Watcher) cacheCleaner(ctx context.Context) { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + w.cache.Range(func(key, value interface{}) bool { + hash := value.(FileHash) + if hash.expiresAt.Before(now) { + w.cache.Delete(key) + } + return true + }) + case <-ctx.Done(): + w.setInactive() + return + } + } +} diff --git a/hash_watcher_test.go b/hash_watcher_test.go new file mode 100644 index 0000000..45cc3d2 --- /dev/null +++ b/hash_watcher_test.go @@ -0,0 +1,55 @@ +package watch + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestMultipleWatcher(t *testing.T) { + watcher1 := NewWatcher() + watcher2 := NewWatcher() + + defer watcher1.Stop() + defer watcher2.Stop() + + assert.NotEqual(t, watcher1, watcher2) +} + +func TestMultipleWatcherDoesNotShareCache(t *testing.T) { + watcher1 := NewWatcher() + watcher2 := NewWatcher() + + defer watcher1.Stop() + defer watcher2.Stop() + + testfiles := []string{} + for i := 0; i < 100; i++ { + testfile, err := os.CreateTemp("", "testfile") + if err != nil { + t.Fatal("Failed to create test file:", err) + } + testfiles = append(testfiles, testfile.Name()) + + defer func() { + _ = os.Remove(testfile.Name()) + }() + } + + for _, testfile := range testfiles { + _ = os.WriteFile(testfile, []byte("test"), 0644) + _ = watcher1.Add(testfile) + + _, ok := watcher2.cache.Load(testfile) + assert.False(t, ok) + } + + for _, testfile := range testfiles { + _ = watcher2.Remove(testfile) + _, ok := watcher2.cache.Load(testfile) + assert.False(t, ok) + _, ok = watcher1.cache.Load(testfile) + assert.True(t, ok) + } + +} diff --git a/lighthouse.go b/lighthouse.go new file mode 100644 index 0000000..3befc5d --- /dev/null +++ b/lighthouse.go @@ -0,0 +1,212 @@ +package watch + +import ( + "github.com/fsnotify/fsnotify" + "path/filepath" + "sync" + "time" +) + +type Watch struct { + Path string + OnCreate EventCallback + OnChange EventCallback + OnDelete EventCallback + OnRename EventCallback +} + +type EventErrorCallback func(err error) +type EventCallback func(path string) + +type lighthouse struct { + watchers map[string]*Watch + fsnotify *fsnotify.Watcher + mutex sync.Mutex + active bool + onError EventErrorCallback + debounce time.Duration +} + +// SetOnError Methode, um onError zu setzen +func (l *lighthouse) SetOnError(callback EventErrorCallback) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.onError = callback +} + +// SetDebounce Methode, um debounce zu setzen +func (l *lighthouse) SetDebounce(duration time.Duration) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.debounce = duration +} + +type Lighthouse interface { + Add(watch *Watch) error + Remove(path string) error + IsActiveWatched(path string) bool + IsWatched(path string) bool + IsRunning() bool + StartWatching() + + SetOnError(callback EventErrorCallback) + SetDebounce(duration time.Duration) +} + +// NewLighthouse creates a new lighthouse instance. +func NewLighthouse() Lighthouse { + l := &lighthouse{} + l.checkAndInit() + return l + +} + +func (l *lighthouse) checkAndInit() { + + if l.active { + return + } + + l.debounce = 500 * time.Millisecond + + if l.watchers == nil { + l.watchers = make(map[string]*Watch) + } + + if l.fsnotify == nil { + var err error + l.fsnotify, err = fsnotify.NewWatcher() + if err != nil { + return + } + } + + l.active = true + +} + +// IsRunning returns true if the watcher is active, false otherwise. +func (l *lighthouse) IsRunning() bool { + return l.active +} + +// Add adds a path to the watcher. If the path is already being watched or the +// watcher is not active, an error is returned. +func (l *lighthouse) Add(watch *Watch) error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if !l.active { + return LighthouseNotActiveError{} + } + + if _, ok := l.watchers[watch.Path]; ok { + return AlreadyWatchedPathError{AlreadyWatched: watch.Path} + } + + l.watchers[watch.Path] = watch + err := l.fsnotify.Add(watch.Path) + if err != nil { + return err + } + + return nil +} + +// Remove removes a path from the watcher. If the path is not being watched or +// the watcher is not active, an error is returned. +func (l *lighthouse) Remove(path string) error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if !l.active { + return LighthouseNotActiveError{} + } + + if _, ok := l.watchers[path]; !ok { + return UnwatchedPathError{UnwatchedPath: path} + } + + delete(l.watchers, path) + + err := l.fsnotify.Remove(path) + if err != nil { + return err + } + + return nil +} + +// IsWatched returns true if the path is being watched, false otherwise. +// This only checks the lighthouse, not the fsnotify watcher. +// If you want to check if the path is being watched by fsnotify, use IsActiveWatched. +func (l *lighthouse) IsWatched(path string) bool { + l.mutex.Lock() + defer l.mutex.Unlock() + + if !l.active { + return false + } + + if l.watchers == nil { + return false + } + + if l.fsnotify == nil { + return false + } + + if _, ok := l.watchers[path]; !ok { + return false + } + + return true +} + +// Get returns the watch for the given path. If the path is not being watched +// or the watcher is not active, an error is returned. +func (l *lighthouse) Get(path string) (*Watch, error) { + l.mutex.Lock() + defer l.mutex.Unlock() + + if !l.active { + return nil, LighthouseNotActiveError{} + } + + var watch *Watch + var ok bool + + if watch, ok = l.watchers[path]; !ok { + + if watch, ok = l.watchers[filepath.Dir(path)]; !ok { + return nil, UnwatchedPathError{UnwatchedPath: path} + } + + } + + return watch, nil +} + +// IsActiveWatched returns true if the path is being watched, false otherwise. +// If the watcher is not active, false is returned. +// If the path is in the watcher but not in the fsnotify watcher, false is returned. +func (l *lighthouse) IsActiveWatched(path string) bool { + + if l.IsWatched(path) == false { + return false + } + + l.mutex.Lock() + defer l.mutex.Unlock() + + // check fsnotify + for _, fsPath := range l.fsnotify.WatchList() { + if fsPath == path { + return true + } + + } + + return false + +} diff --git a/lighthouse_test.go b/lighthouse_test.go new file mode 100644 index 0000000..6cd75cc --- /dev/null +++ b/lighthouse_test.go @@ -0,0 +1,86 @@ +package watch + +import ( + "errors" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddWatch(t *testing.T) { + tempDir, err := os.MkdirTemp("", "watchtest") + assert.Nil(t, err) + defer func() { + _ = os.RemoveAll(tempDir) // Cleanup + }() + + l := NewLighthouse() + + w := &Watch{Path: tempDir} + err = l.Add(w) + assert.Nil(t, err) + + err = l.Add(w) + assert.True(t, errors.As(err, &AlreadyWatchedPathError{})) +} + +func TestRemoveWatch(t *testing.T) { + tempDir, err := os.MkdirTemp("", "watchtest") + assert.Nil(t, err) + defer func() { + _ = os.RemoveAll(tempDir) // Cleanup + }() + + l := NewLighthouse() + + w := &Watch{Path: tempDir} + err = l.Add(w) + assert.Nil(t, err) + + err = l.Remove(tempDir) + assert.Nil(t, err) + + err = l.Remove("path/not/watched") + assert.True(t, errors.As(err, &UnwatchedPathError{})) +} + +func TestNewLighthouse(t *testing.T) { + l := NewLighthouse() + assert.NotNil(t, l) + assert.True(t, l.IsRunning()) +} + +func TestIsWatched(t *testing.T) { + tempDir, err := os.MkdirTemp("", "watchtest") + assert.Nil(t, err) + defer func() { + _ = os.RemoveAll(tempDir) // Cleanup + }() + + l := NewLighthouse() + + w := &Watch{Path: tempDir} + err = l.Add(w) + assert.Nil(t, err) + + assert.True(t, l.IsWatched(tempDir)) + assert.False(t, l.IsWatched("path/not/watched")) +} + +func TestIsActiveWatched(t *testing.T) { + tempDir, err := os.MkdirTemp("", "watchtest") + assert.Nil(t, err) + defer func() { + _ = os.RemoveAll(tempDir) // Cleanup + }() + + l := NewLighthouse() + + w := &Watch{Path: tempDir} + err = l.Add(w) + assert.Nil(t, err) + + assert.True(t, l.IsActiveWatched(tempDir)) + assert.False(t, l.IsActiveWatched("path/not/watched")) +} diff --git a/watch.go b/watch.go deleted file mode 100644 index b7c9791..0000000 --- a/watch.go +++ /dev/null @@ -1,85 +0,0 @@ -package watch - -import ( - "gopkg.in/fsnotify.v1" - "sync" -) - -type WatchEvent int - -const ( - FileChanged WatchEvent = iota - FileAdded - FileDeleted - FileRemoved -) - -type EventHandler func(string, WatchEvent) - -type Watch struct { - watcher *fsnotify.Watcher - handlers map[string]EventHandler - queue chan string - mu sync.Mutex -} - -func NewWatch() (*Watch, error) { - w, err := fsnotify.NewWatcher() - if err != nil { - return nil, err - } - return &Watch{ - watcher: w, - handlers: make(map[string]EventHandler), - queue: make(chan string, 100), - }, nil -} - -func (w *Watch) Add(path string, handler EventHandler) error { - w.mu.Lock() - defer w.mu.Unlock() - w.handlers[path] = handler - return w.watcher.Add(path) -} - -func (w *Watch) Remove(path string) error { - w.mu.Lock() - defer w.mu.Unlock() - delete(w.handlers, path) - return w.watcher.Remove(path) -} - -func (w *Watch) startWorker(concurrency int) { - for i := 0; i < concurrency; i++ { - go func() { - for path := range w.queue { - if handler, exists := w.handlers[path]; exists { - handler(path, FileChanged) - } - } - }() - } -} - -func (w *Watch) Watch(concurrency int) { - w.startWorker(concurrency) - - for { - select { - case event := <-w.watcher.Events: - if handler, exists := w.handlers[event.Name]; exists { - we := FileChanged // Map fsnotify events to WatchEvent here - w.queue <- event.Name - handler(event.Name, we) - } - case err := <-w.watcher.Errors: - // Handle errors - println("Error:", err) - } - } -} - -func (w *Watch) Stop() { - close(w.queue) - w.watcher.Close() -} diff --git a/watch_test.go b/watch_test.go deleted file mode 100644 index 9ac9a06..0000000 --- a/watch_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package watch - -import ( - "io/ioutil" - "os" - "testing" - "time" -) - -func TestWatch(t *testing.T) { - w, err := NewWatch() - if err != nil { - t.Fatal(err) - } - - tmpFile, err := ioutil.TempFile("", "watch_test") - if err != nil { - t.Fatal(err) - } - defer os.Remove(tmpFile.Name()) - - called := make(chan bool) - handler := func(path string, event WatchEvent) { - if path == tmpFile.Name() && event == FileChanged { - called <- true - } - } - - if err := w.Add(tmpFile.Name(), handler); err != nil { - t.Fatal(err) - } - - go w.Watch(1) - defer w.Stop() - - _, err = tmpFile.WriteString("test") - if err != nil { - t.Fatal(err) - } - - select { - case <-called: - // Success - case <-time.After(time.Second * 2): - t.Fatal("Event handler not called") - } -} diff --git a/watching.go b/watching.go new file mode 100644 index 0000000..2c9f6cd --- /dev/null +++ b/watching.go @@ -0,0 +1,130 @@ +package watch + +import ( + "github.com/fsnotify/fsnotify" + "os" + "sync" + "time" +) + +func (l *lighthouse) StartWatching() { + go func() { + eventChannel := make(chan fsnotify.Event, 100) + errorChannel := make(chan error, 100) + debounceTimers := make(map[string]*time.Timer) + + var debounceMutex sync.Mutex + + go func() { + for { + select { + case event := <-l.fsnotify.Events: + eventChannel <- event + case err := <-l.fsnotify.Errors: + errorChannel <- err + } + } + }() + + for { + select { + case event := <-eventChannel: + + debounceMutex.Lock() + if timer, ok := debounceTimers[event.Name]; ok { + timer.Stop() + } + + debounceTimers[event.Name] = time.AfterFunc(l.debounce, func() { + //debounceTimers[event.Name] = time.AfterFunc(l.debounce, func() { + debounceMutex.Lock() + delete(debounceTimers, event.Name) + debounceMutex.Unlock() + + var watch *Watch + var err error + + if watch, err = l.Get(event.Name); err != nil || watch == nil { + return + } + + switch event.Op { + case fsnotify.Write: + if watch.OnChange != nil { + if watch.OnChange != nil { + go watch.OnChange(event.Name) + } + } + case fsnotify.Create: + if watch.OnCreate != nil { + if watch.OnCreate != nil { + go watch.OnCreate(event.Name) + } + } + + case fsnotify.Rename: + if watch.OnRename != nil { + if watch.OnRename != nil { + go watch.OnRename(event.Name) + } + } + + go func(filename string) { + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + if l.IsWatched(filename) { + if _, err := os.Stat(filename); err == nil { + _ = l.fsnotify.Add(filename) + ticker.Stop() + return + } + } else { + ticker.Stop() + return + } + } + } + }(event.Name) + + case fsnotify.Remove: + if watch.OnDelete != nil { + if watch.OnDelete != nil { + go watch.OnDelete(event.Name) + } + } + + go func(filename string) { + ticker := time.NewTicker(1 * time.Second) + + for { + select { + case <-ticker.C: + + if l.IsWatched(filename) { + if _, err := os.Stat(filename); err == nil { + _ = l.fsnotify.Add(filename) + ticker.Stop() + return + } + } else { + ticker.Stop() + return + } + + } + } + }(event.Name) + + } + + }) + debounceMutex.Unlock() + + case err := <-errorChannel: + go l.onError(err) + } + } + }() +} diff --git a/watching_test.go b/watching_test.go new file mode 100644 index 0000000..3d8a33e --- /dev/null +++ b/watching_test.go @@ -0,0 +1,145 @@ +package watch + +import ( + "github.com/stretchr/testify/assert" + "os" + "path/filepath" + "testing" + "time" +) + +func TestWriteEvent(t *testing.T) { + tmpDir := t.TempDir() + t.Cleanup(func() { + _ = os.RemoveAll(tmpDir) // Cleanup + }) + filePath := filepath.Join(tmpDir, "test.txt") + + l := NewLighthouse() + + ch := make(chan bool) + timeout := make(chan bool) + + t.Cleanup(func() { + close(ch) + close(timeout) + }) + + // Create file and add watcher + if err := os.WriteFile(filePath, []byte("test"), 0644); err != nil { + t.Fatalf("Failed to write file: %v", err) + } + + w1 := &Watch{ + Path: tmpDir, + OnChange: func(name string) { + ch <- true + }, + } + + if err := l.Add(w1); err != nil { + t.Fatalf("Failed to add watch: %v", err) + } + + l.StartWatching() + + timer := time.NewTimer(5 * time.Second) + go func() { + <-timer.C + timeout <- true + }() + + // Change file + if err := os.WriteFile(filePath, []byte("test2"), 0644); err != nil { + t.Fatalf("Failed to write file: %v", err) + } + + select { + case <-ch: + timer.Stop() + case <-timeout: + t.Fail() + } +} + +func TestRecreateEvent(t *testing.T) { + tmpDir := t.TempDir() + t.Cleanup(func() { + _ = os.RemoveAll(tmpDir) + }) + filePath := filepath.Join(tmpDir, "recreate.txt") + + callCounterCreate := 0 + callCounterChange := 0 + + l := NewLighthouse() + ch := make(chan bool) + timeout := make(chan bool) + + t.Cleanup(func() { + close(ch) + close(timeout) + }) + + w1 := &Watch{ + Path: tmpDir, + OnChange: func(name string) { + callCounterChange++ + ch <- true + }, + OnCreate: func(name string) { + callCounterCreate++ + ch <- true + }, + } + + if err := l.Add(w1); err != nil { + t.Fatalf("Failed to add watch: %v", err) + } + + l.StartWatching() + + timer := time.NewTimer(20 * time.Second) + go func() { + <-timer.C + timeout <- true + }() + + // Create file + if err := os.WriteFile(filePath, []byte("test1"), 0644); err != nil { + t.Fatalf("Failed to write file: %v", err) + } + + select { + case <-ch: + // First create event detected + case <-timeout: + t.Log("Timeout 1") + t.Fail() + return + } + + // Delete the file after 2 seconds + time.Sleep(2 * time.Second) + if err := os.Remove(filePath); err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + time.Sleep(2 * time.Second) + + // Recreate the file after another 2 seconds + if err := os.WriteFile(filePath, []byte("test2"), 0644); err != nil { + t.Fatalf("Failed to write file: %v", err) + } + + select { + case <-ch: + timer.Stop() + + assert.Equal(t, 0, callCounterCreate) + assert.Equal(t, 2, callCounterChange) + + case <-timeout: + t.Log("Timeout 2") + t.Fail() + } +} -- GitLab