Skip to content
Snippets Groups Projects
Verified Commit b922bb10 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: first implementation

parent 0fce6c8c
No related branches found
No related tags found
No related merge requests found
## Wathc
## Watch
## What does this library?
This library allows you to monitor and control files and define actions for changes.
Lighthouse Watch is a Go library for efficiently watching file and directory changes in your filesystem.
Built on top of the powerful fsnotify library, Lighthouse offers a simple, yet highly flexible, API to monitor
changes in real-time.
**Features**
* Real-time file and directory change events
* Customizable event handlers for different types of operations (Create, Write, Rename, Remove)
* Debounce mechanism to group rapid events
* Thread-safe operations
* Error handling
## Installation
Install the package using Go's package manager:
```shell
go get gitlab.schukai.com/oss/libraries/go/utilities/watch
```
......@@ -14,6 +25,74 @@ go get gitlab.schukai.com/oss/libraries/go/utilities/watch
## Usage
```go
package main
import (
"fmt"
"gitlab.schukai.com/oss/libraries/go/utilities/watch"
)
func main() {
// Initialize Lighthouse
l := watch.NewLighthouse()
// Define Watch instance
w1 := &watch.Watch{
Path: "/path/to/watch",
OnCreate: func(name string) {
fmt.Println("File created:", name)
},
OnChange: func(name string) {
fmt.Println("File changed:", name)
},
OnRename: func(name string) {
fmt.Println("File renamed:", name)
},
OnDelete: func(name string) {
fmt.Println("File deleted:", name)
},
}
// Add watch
err := l.Add(w1)
if err != nil {
fmt.Println("Error:", err)
return
}
// Start watching
l.StartWatching()
}
```
Add Multiple Watches
```go
w2 := &watch.Watch{ /*...*/ }
err = l.Add(w2)
```
Remove a Watch
```go
err = l.Remove("/path/to/watch")
```
Error Handling
```go
l.OnError = func(err error) {
fmt.Println("Error:", err)
}
```
Debounce
```go
// Debounce time in milliseconds
l.Debounce = 1000
```
## Contributing
......
......@@ -12,6 +12,8 @@ tasks:
cmds:
- echo "Execute unit tests in Go."
- go test -cover -v ./...
- go test -bench .
- go test -race .
test-fuzz:
desc: Conduct fuzzing tests.#
......
dedup.go 0 → 100644
package watch
//
//// dedup is a function that watches a set of paths and prints a message when
//func (w *Watch) Decouple(waitFor time.Duration) {
// if waitFor == 0 {
// waitFor = 100 * time.Millisecond
// }
// go decoupleLoop(w.watcher, waitFor)
// <-make(chan struct{}) // Block forever
//}
//
//// dedupLoop is the main loop for dedup.
//func decoupleLoop(w *fsnotify.Watcher, waitFor time.Duration) {
// var (
// mu sync.Mutex
// timers = make(map[string]*time.Timer)
//
// printEvent = func(e fsnotify.Event) {
// fmt.Printf("%s: %s\n", e.Op, e.Name)
//
// // Don't need to remove the timer if you don't have a lot of files.
// mu.Lock()
// delete(timers, e.Name)
// mu.Unlock()
// }
// )
//
// for {
// select {
// // Read from Errors.
// case err, ok := <-w.Errors:
// if !ok { // Channel was closed (i.e. Watcher.Close() was called).
// return
// }
// fmt.Println("error:", err)
// // Read from Events.
// case e, ok := <-w.Events:
// if !ok { // Channel was closed (i.e. Watcher.Close() was called).
// return
// }
//
// if e.Op&fsnotify.Chmod == fsnotify.Chmod {
// //util.Remove(fullPath)
// }
//
// if e.Op&fsnotify.Write == fsnotify.Write {
// //util.Remove(fullPath)
// }
//
// if e.Op&fsnotify.Create == fsnotify.Create {
// //logging.LogInfo("watching: created file: %s", d)
// //scanDirectory(fullPath, "")
// //navigation.AddFile(fullPath)
// }
//
// if e.Op&fsnotify.Remove == fsnotify.Remove {
// //logging.LogInfo("watching: removed file: %s", d)
// //removeFromWatchlist(fullPath)
// //util.Remove(fullPath)
// //navigation.RemoveFile(fullPath)
// }
//
// if e.Op&fsnotify.Rename == fsnotify.Rename {
// //logging.LogInfo("watching: renamed file: %s", event.Name)
// //removeFromWatchlist(fullPath)
// //util.Remove(fullPath)
// //navigation.RemoveFile(fullPath)
// }
//
// // Get timer.
// mu.Lock()
// t, ok := timers[e.Name]
// mu.Unlock()
//
// // No timer yet, so create one.
// if !ok {
// t = time.AfterFunc(math.MaxInt64, func() { printEvent(e) })
// t.Stop()
//
// mu.Lock()
// timers[e.Name] = t
// mu.Unlock()
// }
//
// // Reset the timer for this path, so it will start from 100ms again.
// t.Reset(waitFor)
// }
// }
//}
error.go 0 → 100644
package watch
import (
"fmt"
)
// WatchPathError represents an error when trying to remove an unwatched path.
type UnwatchedPathError struct {
UnwatchedPath string
}
func (e UnwatchedPathError) Error() string {
return fmt.Sprintf("Path %s is not being watched", e.UnwatchedPath)
}
// WatchPathError represents an error when trying to add a path that is already being watched.
type AlreadyWatchedPathError struct {
AlreadyWatched string
}
func (e AlreadyWatchedPathError) Error() string {
return fmt.Sprintf("Path %s is already being watched", e.AlreadyWatched)
}
// WatcherNotActiveError represents an error when trying to add a path that is already being watched.
type WatcherNotActiveError struct{}
func (e WatcherNotActiveError) Error() string {
return "Watcher is not active"
}
// LighthouseNotActiveError represents an error when trying to add a path that is already being watched.
type LighthouseNotActiveError struct{}
func (e LighthouseNotActiveError) Error() string {
return "lighthouse is not active"
}
......@@ -3,6 +3,15 @@ module gitlab.schukai.com/oss/libraries/go/utilities/watch
go 1.20
require (
github.com/fsnotify/fsnotify v1.6.0
github.com/stretchr/testify v1.8.4
gopkg.in/fsnotify.v1 v1.4.7
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.12.0 // indirect
gopkg.in/fsnotify.v1 v1.6.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
hash.go 0 → 100644
package watch
import (
"crypto/md5"
"encoding/hex"
"io"
"os"
"time"
)
const hashExpiration = 24 * 3600 // 1 day in seconds
const FastHashSize = 8192 // 8 KB
type fileStatInfo struct {
path string
info os.FileInfo
opened *os.File
lastError error
}
func newFileStatInfo(path string) *fileStatInfo {
s := &fileStatInfo{path: path}
info, err := os.Stat(path)
if err != nil {
s.lastError = err
return s
}
s.info = info
return s
}
func (f *fileStatInfo) close() {
if f.opened != nil {
_ = f.opened.Close()
}
}
func (f *fileStatInfo) open() {
if f.opened != nil {
return
}
f.opened, f.lastError = os.Open(f.path)
}
func (f *fileStatInfo) exists() bool {
return f.info != nil
}
var (
// watcherImpl is the singleton instance of the Watcher.
watcherImpl *Watcher
)
func init() {
watcherImpl = NewWatcher()
}
// buildHash generates both a complete and a "fast" hash.
// the fast hash is for hashing big files.
func buildHash(info *fileStatInfo) FileHash {
h := FileHash{Path: info.path}
hash, fastHash := hashFile(info)
if info.lastError != nil {
return h
}
h.Hash = hash
h.FastHash = fastHash
h.expiresAt = time.Now().Add(hashExpiration * time.Second)
h.lastModTime = info.info.ModTime()
return h
}
// hashFile creates a full hash and a "fast" hash.
func hashFile(info *fileStatInfo) (*string, *string) {
// Full hash
fullHash := fullHashFile(info)
if info.lastError != nil {
return nil, nil
}
// Fast hash
fastHash := fastHashFile(info)
if info.lastError != nil {
return nil, nil
}
return fullHash, fastHash
}
// fullHashFile creates a complete hash.
func fullHashFile(info *fileStatInfo) *string {
info.open()
if info.lastError != nil {
return nil
}
file := info.opened
hash := md5.New()
if _, err := io.Copy(hash, file); err != nil {
info.lastError = err
return nil
}
if _, err := file.Seek(0, 0); err != nil {
info.lastError = err
return nil
}
hashString := hex.EncodeToString(hash.Sum(nil)[:16])
return &hashString
}
// fastHashFile creates a "fast" hash.
func fastHashFile(info *fileStatInfo) *string {
info.open()
if info.lastError != nil {
return nil
}
file := info.opened
fileSize := info.info.Size()
if fileSize < FastHashSize {
return fullHashFile(info)
}
offset := fileSize/2 - FastHashSize/2
fastHash := md5.New()
buffer := make([]byte, FastHashSize)
readPositions := []int64{0, offset, fileSize - FastHashSize}
for _, pos := range readPositions {
if _, err := file.ReadAt(buffer, pos); err != nil {
return nil
}
fastHash.Write(buffer)
}
hashString := hex.EncodeToString(fastHash.Sum(nil)[:16])
return &hashString
}
package watch
import (
"os"
"time"
)
// FileHash contains the md5 hash of a file and the last modification time.
type FileHash struct {
Path string
Hash *string
FastHash *string
expiresAt time.Time
lastModTime time.Time
}
// RemoveFileHash removes a file from the fileHashCache.
// if file not in cache return error that file not in cache
// This function uses the global fileHashCache.
func RemoveFileHash(path string) error {
return watcherImpl.Remove(path)
}
// Remove removes a file from the fileHashCache.
// If the watcher is not active, it returns an error (WatcherNotActiveError).
func (w *Watcher) Remove(path string) error {
if !w.IsActive() {
return WatcherNotActiveError{}
}
// check if file in fileHashCache, if not return error that file not in cache
if _, ok := w.cache.Load(path); !ok {
return os.ErrNotExist
}
w.cache.Delete(path)
return nil
}
// AddFileHash adds a file to the fileHashCache.
// This function uses the global fileHashCache.
func AddFileHash(path string) error {
return watcherImpl.Add(path)
}
// Add adds a file to the watcher cache.
func (w *Watcher) Add(path string) error {
if !w.IsActive() {
return WatcherNotActiveError{}
}
info := newFileStatInfo(path)
defer info.close()
if !info.exists() {
return os.ErrNotExist
}
h := buildHash(info)
if info.lastError != nil {
return info.lastError
}
w.cache.Store(path, h)
return nil
}
// HasFileChanged checks if a file has changed since the last time it was checked.
// It returns true if the file has changed, false if it hasn't and an error if
// something went wrong.
// This function uses the global fileHashCache.
func HasFileChanged(path string) (bool, error) {
return watcherImpl.Has(path)
}
// Has checks if a file has changed since the last time it was checked.
// It returns true if the file has changed, false if it hasn't and an error if
// something went wrong.
// If the watcher is not active, it returns false and an error (WatcherNotActiveError).
func (w *Watcher) Has(path string) (bool, error) {
if !w.IsActive() {
return false, WatcherNotActiveError{}
}
value, ok := w.cache.Load(path)
if !ok {
return true, nil
}
cachedHash, ok := value.(FileHash)
if !ok {
return true, nil
}
info := newFileStatInfo(path)
defer info.close()
if !info.exists() {
return true, nil
}
modTime := info.info.ModTime()
if modTime.After(cachedHash.lastModTime) {
return true, nil
}
currentFastHash := fastHashFile(info)
if info.lastError != nil {
return false, info.lastError
}
if currentFastHash != nil && cachedHash.FastHash != nil && *currentFastHash != *cachedHash.FastHash {
return true, nil
}
// Calculate full hash only if fast hash differs
currentFullHash := fullHashFile(info)
if info.lastError != nil {
return false, info.lastError
}
if *currentFullHash != *cachedHash.Hash {
return true, nil
}
return false, nil
}
package watch
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestAddFileHash(t *testing.T) {
// Create a temporary test file
file, err := os.CreateTemp("", "testfile")
if err != nil {
t.Fatal("Failed to create test file:", err)
}
defer func() {
_ = os.Remove(file.Name())
}()
_ = file.Close()
err = AddFileHash(file.Name())
assert.Nil(t, err)
_, ok := watcherImpl.cache.Load(file.Name())
assert.True(t, ok)
}
func TestRemoveFileHash(t *testing.T) {
watcherImpl.cache.Store("testfile", FileHash{Path: "testfile"})
err := RemoveFileHash("testfile")
assert.Nil(t, err)
_, ok := watcherImpl.cache.Load("testfile")
assert.False(t, ok)
}
func TestRemoveFileHash_NotExist(t *testing.T) {
err := RemoveFileHash("nonexistent")
assert.Equal(t, os.ErrNotExist, err)
}
func TestHasFileChanged(t *testing.T) {
// Create a temporary test file
file, err := os.CreateTemp("", "testfile")
if err != nil {
t.Fatal("Failed to create test file:", err)
}
defer func() {
_ = os.Remove(file.Name())
}()
_ = file.Close()
// Add it to the cache with a specific lastModTime
watcherImpl.cache.Store(file.Name(), FileHash{Path: file.Name(), lastModTime: time.Now().Add(-1 * time.Hour)})
// Test Has
changed, err := HasFileChanged(file.Name())
assert.Nil(t, err)
assert.True(t, changed)
}
func TestHasFileChanged_NotExist(t *testing.T) {
changed, err := HasFileChanged("nonexistent")
assert.Nil(t, err)
assert.True(t, changed)
}
func TestHasFileChanged_InvalidCacheType(t *testing.T) {
watcherImpl.cache.Store("testfile", "invalid type")
changed, err := HasFileChanged("testfile")
assert.Nil(t, err)
assert.True(t, changed)
}
func TestAddFileHashWithInvalidPath(t *testing.T) {
err := AddFileHash("/invalid/path")
if err == nil {
t.Errorf("Expected an error for invalid path")
}
}
func TestAddAndRemoveFileHash2(t *testing.T) {
file, err := os.CreateTemp("", "testfile")
if err != nil {
t.Fatal("Failed to create test file:", err)
}
defer func() {
_ = os.Remove(file.Name())
}()
_ = file.Close()
err = AddFileHash(file.Name())
if err != nil {
t.Errorf("Error adding valid path: %s", err)
}
err = RemoveFileHash(file.Name())
if err != nil {
t.Errorf("Error removing valid path: %s", err)
}
}
func TestAddFileHashRaceCondition(t *testing.T) {
for i := 0; i < 100; i++ {
go func() {
_ = AddFileHash("/some/path")
}()
}
}
func TestRemoveFileHashRaceCondition(t *testing.T) {
for i := 0; i < 100; i++ {
go func() {
_ = RemoveFileHash("/some/path")
}()
}
}
func TestHasFileChangedRaceCondition(t *testing.T) {
for i := 0; i < 100; i++ {
go func() {
_, _ = HasFileChanged("/some/path")
}()
}
}
func BenchmarkAddFileHash(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
file, err := os.CreateTemp("", "testfile")
if err != nil {
b.Fatal("Failed to create test file:", err)
}
_ = AddFileHash(file.Name())
if err := RemoveFileHash(file.Name()); err != nil {
b.Fatal("Failed to remove file hash:", err)
}
if err := os.Remove(file.Name()); err != nil {
b.Fatal("Failed to remove test file:", err)
}
}
}
func BenchmarkHasFileChanged(b *testing.B) {
b.ReportAllocs()
file, err := os.CreateTemp("", "testfile")
if err != nil {
b.Fatal("Failed to create test file:", err)
}
_ = AddFileHash(file.Name())
b.ResetTimer()
for i := 0; i < b.N; i++ {
// change the file every 100 iterations
if i%100 == 0 {
_, _ = file.WriteString("test")
}
_, _ = HasFileChanged(file.Name())
}
}
package watch
import (
"os"
"testing"
"time"
)
// getOrCreateFileHash returns the md5 hash of a file. If the hash is not in the cache,
// it will be calculated. If the file does not exist, an error is returned.
// This is a helper function for the tests.
func getOrCreateFileHash(info *fileStatInfo) (*string, *string) {
if !info.exists() {
return nil, nil
}
p := info.path
if value, ok := watcherImpl.cache.Load(p); ok {
hash := value.(FileHash)
if time.Now().Before(hash.expiresAt) {
return hash.Hash, hash.FastHash
}
}
h := buildHash(info)
if info.lastError != nil {
return nil, nil
}
watcherImpl.cache.Store(p, h)
return h.Hash, h.FastHash
}
func TestFileHashing(t *testing.T) {
// Create a temporary file
tempFile, err := os.CreateTemp("", "tempFile")
if err != nil {
t.Fatalf("Cannot create temporary file: %s", err)
}
defer func() {
_ = os.Remove(tempFile.Name())
}()
// Write data to the temporary file
if _, err := tempFile.Write([]byte("Hello, world!")); err != nil {
t.Fatalf("Cannot write to temporary file: %s", err)
}
_ = tempFile.Close()
// First hashing
info := newFileStatInfo(tempFile.Name())
hash1, fastHash1 := getOrCreateFileHash(info)
if info.lastError != nil {
t.Fatalf("Error hashing file: %s", err)
}
// Open again and change data
tempFile, err = os.OpenFile(tempFile.Name(), os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
t.Fatalf("Cannot open temporary file: %s", err)
}
if _, err := tempFile.Write([]byte(" More data.")); err != nil {
t.Fatalf("Cannot write to temporary file: %s", err)
}
_ = tempFile.Close()
// Wait to make sure that the timestamps are different
time.Sleep(2 * time.Second)
// Second hashing
info = newFileStatInfo(tempFile.Name())
hash2, fastHash2 := getOrCreateFileHash(info)
if info.lastError != nil {
t.Fatalf("Error hashing file: %s", err)
}
// Check the hashes
if hash1 != hash2 {
t.Errorf("Expected full hashes not to differ: %s == %s", *hash1, *hash2)
}
if fastHash1 != fastHash2 {
t.Errorf("Expected fast hashes not to differ: %s == %s", *fastHash1, *fastHash2)
}
// Check if the file has been changed
hasChanged, err := HasFileChanged(tempFile.Name())
if err != nil {
t.Fatalf("Error checking file change: %s", err)
}
if !hasChanged {
t.Errorf("Expected Has to return true, got false")
}
}
func TestAddAndRemoveFileHash(t *testing.T) {
// Test cases
testCases := []struct {
path string
shouldExist bool
sleepTime time.Duration
expectedErr error
}{
{"testfile1.txt", true, 0, nil},
{"testfile2.txt", true, 0, nil},
{"testfile3.txt", false, 0, os.ErrNotExist},
{"testfile1.txt", true, 2 * time.Second, nil}, // Test expiration
}
// Create test files
for _, tc := range testCases {
if tc.shouldExist {
file, err := os.Create(tc.path)
if err != nil {
t.Fatal(err)
}
_ = file.Close()
}
}
for _, tc := range testCases {
if tc.shouldExist {
file, err := os.Create(tc.path)
if err != nil {
t.Fatal(err) // Stop the test if the file cannot be created
}
_ = file.Close()
}
//Add hash
err := AddFileHash(tc.path)
if err != tc.expectedErr {
t.Errorf("Add(%q) error: got %v, want %v", tc.path, err, tc.expectedErr)
}
if tc.sleepTime > 0 {
time.Sleep(tc.sleepTime)
}
// Remove hash
err = RemoveFileHash(tc.path)
if err != tc.expectedErr {
t.Errorf("Remove(%q) error: got %v, want %v", tc.path, err, tc.expectedErr)
}
_, ok := watcherImpl.cache.Load(tc.path)
if ok {
t.Errorf("Remove(%q) failed: value still exists in cache", tc.path)
}
// Delete test files
if tc.shouldExist {
_ = os.Remove(tc.path)
}
}
}
package watch
import (
"context"
"sync"
"time"
)
type Watcher struct {
cache sync.Map
mutex sync.Mutex
cancel context.CancelFunc
active bool
}
// NewWatcher creates a new Watcher.
// This Function starts a goroutine that checks the cache every hour and removes expired entries.
// It is in your responsibility to call Stop() on the Watcher, after you are done with it.
func NewWatcher() *Watcher {
ctx, cancel := context.WithCancel(context.Background())
w := &Watcher{
cache: sync.Map{},
cancel: cancel,
mutex: sync.Mutex{},
active: true,
}
go w.cacheCleaner(ctx)
return w
}
// Stop stops the cleaner goroutine.
func (w *Watcher) Stop() {
w.cancel()
}
// IsActive returns true if the watcher is active, false if not.
// An not active watcher can not be used to add, remove or check files.
func (w *Watcher) IsActive() bool {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.active
}
func (w *Watcher) setActive() {
w.mutex.Lock()
defer w.mutex.Unlock()
w.active = true
}
func (w *Watcher) setInactive() {
w.mutex.Lock()
defer w.mutex.Unlock()
w.active = false
}
// cacheCleaner is a goroutine that checks the cache every hour and removes expired entries.
func (w *Watcher) cacheCleaner(ctx context.Context) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := time.Now()
w.cache.Range(func(key, value interface{}) bool {
hash := value.(FileHash)
if hash.expiresAt.Before(now) {
w.cache.Delete(key)
}
return true
})
case <-ctx.Done():
w.setInactive()
return
}
}
}
package watch
import (
"github.com/stretchr/testify/assert"
"os"
"testing"
)
func TestMultipleWatcher(t *testing.T) {
watcher1 := NewWatcher()
watcher2 := NewWatcher()
defer watcher1.Stop()
defer watcher2.Stop()
assert.NotEqual(t, watcher1, watcher2)
}
func TestMultipleWatcherDoesNotShareCache(t *testing.T) {
watcher1 := NewWatcher()
watcher2 := NewWatcher()
defer watcher1.Stop()
defer watcher2.Stop()
testfiles := []string{}
for i := 0; i < 100; i++ {
testfile, err := os.CreateTemp("", "testfile")
if err != nil {
t.Fatal("Failed to create test file:", err)
}
testfiles = append(testfiles, testfile.Name())
defer func() {
_ = os.Remove(testfile.Name())
}()
}
for _, testfile := range testfiles {
_ = os.WriteFile(testfile, []byte("test"), 0644)
_ = watcher1.Add(testfile)
_, ok := watcher2.cache.Load(testfile)
assert.False(t, ok)
}
for _, testfile := range testfiles {
_ = watcher2.Remove(testfile)
_, ok := watcher2.cache.Load(testfile)
assert.False(t, ok)
_, ok = watcher1.cache.Load(testfile)
assert.True(t, ok)
}
}
package watch
import (
"github.com/fsnotify/fsnotify"
"path/filepath"
"sync"
"time"
)
type Watch struct {
Path string
OnCreate EventCallback
OnChange EventCallback
OnDelete EventCallback
OnRename EventCallback
}
type EventErrorCallback func(err error)
type EventCallback func(path string)
type lighthouse struct {
watchers map[string]*Watch
fsnotify *fsnotify.Watcher
mutex sync.Mutex
active bool
onError EventErrorCallback
debounce time.Duration
}
// SetOnError Methode, um onError zu setzen
func (l *lighthouse) SetOnError(callback EventErrorCallback) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.onError = callback
}
// SetDebounce Methode, um debounce zu setzen
func (l *lighthouse) SetDebounce(duration time.Duration) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.debounce = duration
}
type Lighthouse interface {
Add(watch *Watch) error
Remove(path string) error
IsActiveWatched(path string) bool
IsWatched(path string) bool
IsRunning() bool
StartWatching()
SetOnError(callback EventErrorCallback)
SetDebounce(duration time.Duration)
}
// NewLighthouse creates a new lighthouse instance.
func NewLighthouse() Lighthouse {
l := &lighthouse{}
l.checkAndInit()
return l
}
func (l *lighthouse) checkAndInit() {
if l.active {
return
}
l.debounce = 500 * time.Millisecond
if l.watchers == nil {
l.watchers = make(map[string]*Watch)
}
if l.fsnotify == nil {
var err error
l.fsnotify, err = fsnotify.NewWatcher()
if err != nil {
return
}
}
l.active = true
}
// IsRunning returns true if the watcher is active, false otherwise.
func (l *lighthouse) IsRunning() bool {
return l.active
}
// Add adds a path to the watcher. If the path is already being watched or the
// watcher is not active, an error is returned.
func (l *lighthouse) Add(watch *Watch) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.active {
return LighthouseNotActiveError{}
}
if _, ok := l.watchers[watch.Path]; ok {
return AlreadyWatchedPathError{AlreadyWatched: watch.Path}
}
l.watchers[watch.Path] = watch
err := l.fsnotify.Add(watch.Path)
if err != nil {
return err
}
return nil
}
// Remove removes a path from the watcher. If the path is not being watched or
// the watcher is not active, an error is returned.
func (l *lighthouse) Remove(path string) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.active {
return LighthouseNotActiveError{}
}
if _, ok := l.watchers[path]; !ok {
return UnwatchedPathError{UnwatchedPath: path}
}
delete(l.watchers, path)
err := l.fsnotify.Remove(path)
if err != nil {
return err
}
return nil
}
// IsWatched returns true if the path is being watched, false otherwise.
// This only checks the lighthouse, not the fsnotify watcher.
// If you want to check if the path is being watched by fsnotify, use IsActiveWatched.
func (l *lighthouse) IsWatched(path string) bool {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.active {
return false
}
if l.watchers == nil {
return false
}
if l.fsnotify == nil {
return false
}
if _, ok := l.watchers[path]; !ok {
return false
}
return true
}
// Get returns the watch for the given path. If the path is not being watched
// or the watcher is not active, an error is returned.
func (l *lighthouse) Get(path string) (*Watch, error) {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.active {
return nil, LighthouseNotActiveError{}
}
var watch *Watch
var ok bool
if watch, ok = l.watchers[path]; !ok {
if watch, ok = l.watchers[filepath.Dir(path)]; !ok {
return nil, UnwatchedPathError{UnwatchedPath: path}
}
}
return watch, nil
}
// IsActiveWatched returns true if the path is being watched, false otherwise.
// If the watcher is not active, false is returned.
// If the path is in the watcher but not in the fsnotify watcher, false is returned.
func (l *lighthouse) IsActiveWatched(path string) bool {
if l.IsWatched(path) == false {
return false
}
l.mutex.Lock()
defer l.mutex.Unlock()
// check fsnotify
for _, fsPath := range l.fsnotify.WatchList() {
if fsPath == path {
return true
}
}
return false
}
package watch
import (
"errors"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestAddWatch(t *testing.T) {
tempDir, err := os.MkdirTemp("", "watchtest")
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(tempDir) // Cleanup
}()
l := NewLighthouse()
w := &Watch{Path: tempDir}
err = l.Add(w)
assert.Nil(t, err)
err = l.Add(w)
assert.True(t, errors.As(err, &AlreadyWatchedPathError{}))
}
func TestRemoveWatch(t *testing.T) {
tempDir, err := os.MkdirTemp("", "watchtest")
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(tempDir) // Cleanup
}()
l := NewLighthouse()
w := &Watch{Path: tempDir}
err = l.Add(w)
assert.Nil(t, err)
err = l.Remove(tempDir)
assert.Nil(t, err)
err = l.Remove("path/not/watched")
assert.True(t, errors.As(err, &UnwatchedPathError{}))
}
func TestNewLighthouse(t *testing.T) {
l := NewLighthouse()
assert.NotNil(t, l)
assert.True(t, l.IsRunning())
}
func TestIsWatched(t *testing.T) {
tempDir, err := os.MkdirTemp("", "watchtest")
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(tempDir) // Cleanup
}()
l := NewLighthouse()
w := &Watch{Path: tempDir}
err = l.Add(w)
assert.Nil(t, err)
assert.True(t, l.IsWatched(tempDir))
assert.False(t, l.IsWatched("path/not/watched"))
}
func TestIsActiveWatched(t *testing.T) {
tempDir, err := os.MkdirTemp("", "watchtest")
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(tempDir) // Cleanup
}()
l := NewLighthouse()
w := &Watch{Path: tempDir}
err = l.Add(w)
assert.Nil(t, err)
assert.True(t, l.IsActiveWatched(tempDir))
assert.False(t, l.IsActiveWatched("path/not/watched"))
}
package watch
import (
"gopkg.in/fsnotify.v1"
"sync"
)
type WatchEvent int
const (
FileChanged WatchEvent = iota
FileAdded
FileDeleted
FileRemoved
)
type EventHandler func(string, WatchEvent)
type Watch struct {
watcher *fsnotify.Watcher
handlers map[string]EventHandler
queue chan string
mu sync.Mutex
}
func NewWatch() (*Watch, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &Watch{
watcher: w,
handlers: make(map[string]EventHandler),
queue: make(chan string, 100),
}, nil
}
func (w *Watch) Add(path string, handler EventHandler) error {
w.mu.Lock()
defer w.mu.Unlock()
w.handlers[path] = handler
return w.watcher.Add(path)
}
func (w *Watch) Remove(path string) error {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.handlers, path)
return w.watcher.Remove(path)
}
func (w *Watch) startWorker(concurrency int) {
for i := 0; i < concurrency; i++ {
go func() {
for path := range w.queue {
if handler, exists := w.handlers[path]; exists {
handler(path, FileChanged)
}
}
}()
}
}
func (w *Watch) Watch(concurrency int) {
w.startWorker(concurrency)
for {
select {
case event := <-w.watcher.Events:
if handler, exists := w.handlers[event.Name]; exists {
we := FileChanged // Map fsnotify events to WatchEvent here
w.queue <- event.Name
handler(event.Name, we)
}
case err := <-w.watcher.Errors:
// Handle errors
println("Error:", err)
}
}
}
func (w *Watch) Stop() {
close(w.queue)
w.watcher.Close()
}
package watch
import (
"io/ioutil"
"os"
"testing"
"time"
)
func TestWatch(t *testing.T) {
w, err := NewWatch()
if err != nil {
t.Fatal(err)
}
tmpFile, err := ioutil.TempFile("", "watch_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpFile.Name())
called := make(chan bool)
handler := func(path string, event WatchEvent) {
if path == tmpFile.Name() && event == FileChanged {
called <- true
}
}
if err := w.Add(tmpFile.Name(), handler); err != nil {
t.Fatal(err)
}
go w.Watch(1)
defer w.Stop()
_, err = tmpFile.WriteString("test")
if err != nil {
t.Fatal(err)
}
select {
case <-called:
// Success
case <-time.After(time.Second * 2):
t.Fatal("Event handler not called")
}
}
package watch
import (
"github.com/fsnotify/fsnotify"
"os"
"sync"
"time"
)
func (l *lighthouse) StartWatching() {
go func() {
eventChannel := make(chan fsnotify.Event, 100)
errorChannel := make(chan error, 100)
debounceTimers := make(map[string]*time.Timer)
var debounceMutex sync.Mutex
go func() {
for {
select {
case event := <-l.fsnotify.Events:
eventChannel <- event
case err := <-l.fsnotify.Errors:
errorChannel <- err
}
}
}()
for {
select {
case event := <-eventChannel:
debounceMutex.Lock()
if timer, ok := debounceTimers[event.Name]; ok {
timer.Stop()
}
debounceTimers[event.Name] = time.AfterFunc(l.debounce, func() {
//debounceTimers[event.Name] = time.AfterFunc(l.debounce, func() {
debounceMutex.Lock()
delete(debounceTimers, event.Name)
debounceMutex.Unlock()
var watch *Watch
var err error
if watch, err = l.Get(event.Name); err != nil || watch == nil {
return
}
switch event.Op {
case fsnotify.Write:
if watch.OnChange != nil {
if watch.OnChange != nil {
go watch.OnChange(event.Name)
}
}
case fsnotify.Create:
if watch.OnCreate != nil {
if watch.OnCreate != nil {
go watch.OnCreate(event.Name)
}
}
case fsnotify.Rename:
if watch.OnRename != nil {
if watch.OnRename != nil {
go watch.OnRename(event.Name)
}
}
go func(filename string) {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
if l.IsWatched(filename) {
if _, err := os.Stat(filename); err == nil {
_ = l.fsnotify.Add(filename)
ticker.Stop()
return
}
} else {
ticker.Stop()
return
}
}
}
}(event.Name)
case fsnotify.Remove:
if watch.OnDelete != nil {
if watch.OnDelete != nil {
go watch.OnDelete(event.Name)
}
}
go func(filename string) {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
if l.IsWatched(filename) {
if _, err := os.Stat(filename); err == nil {
_ = l.fsnotify.Add(filename)
ticker.Stop()
return
}
} else {
ticker.Stop()
return
}
}
}
}(event.Name)
}
})
debounceMutex.Unlock()
case err := <-errorChannel:
go l.onError(err)
}
}
}()
}
package watch
import (
"github.com/stretchr/testify/assert"
"os"
"path/filepath"
"testing"
"time"
)
func TestWriteEvent(t *testing.T) {
tmpDir := t.TempDir()
t.Cleanup(func() {
_ = os.RemoveAll(tmpDir) // Cleanup
})
filePath := filepath.Join(tmpDir, "test.txt")
l := NewLighthouse()
ch := make(chan bool)
timeout := make(chan bool)
t.Cleanup(func() {
close(ch)
close(timeout)
})
// Create file and add watcher
if err := os.WriteFile(filePath, []byte("test"), 0644); err != nil {
t.Fatalf("Failed to write file: %v", err)
}
w1 := &Watch{
Path: tmpDir,
OnChange: func(name string) {
ch <- true
},
}
if err := l.Add(w1); err != nil {
t.Fatalf("Failed to add watch: %v", err)
}
l.StartWatching()
timer := time.NewTimer(5 * time.Second)
go func() {
<-timer.C
timeout <- true
}()
// Change file
if err := os.WriteFile(filePath, []byte("test2"), 0644); err != nil {
t.Fatalf("Failed to write file: %v", err)
}
select {
case <-ch:
timer.Stop()
case <-timeout:
t.Fail()
}
}
func TestRecreateEvent(t *testing.T) {
tmpDir := t.TempDir()
t.Cleanup(func() {
_ = os.RemoveAll(tmpDir)
})
filePath := filepath.Join(tmpDir, "recreate.txt")
callCounterCreate := 0
callCounterChange := 0
l := NewLighthouse()
ch := make(chan bool)
timeout := make(chan bool)
t.Cleanup(func() {
close(ch)
close(timeout)
})
w1 := &Watch{
Path: tmpDir,
OnChange: func(name string) {
callCounterChange++
ch <- true
},
OnCreate: func(name string) {
callCounterCreate++
ch <- true
},
}
if err := l.Add(w1); err != nil {
t.Fatalf("Failed to add watch: %v", err)
}
l.StartWatching()
timer := time.NewTimer(20 * time.Second)
go func() {
<-timer.C
timeout <- true
}()
// Create file
if err := os.WriteFile(filePath, []byte("test1"), 0644); err != nil {
t.Fatalf("Failed to write file: %v", err)
}
select {
case <-ch:
// First create event detected
case <-timeout:
t.Log("Timeout 1")
t.Fail()
return
}
// Delete the file after 2 seconds
time.Sleep(2 * time.Second)
if err := os.Remove(filePath); err != nil {
t.Fatalf("Failed to remove file: %v", err)
}
time.Sleep(2 * time.Second)
// Recreate the file after another 2 seconds
if err := os.WriteFile(filePath, []byte("test2"), 0644); err != nil {
t.Fatalf("Failed to write file: %v", err)
}
select {
case <-ch:
timer.Stop()
assert.Equal(t, 0, callCounterCreate)
assert.Equal(t, 2, callCounterChange)
case <-timeout:
t.Log("Timeout 2")
t.Fail()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment