Go 监控文件变化

试想如下需求:我们需要监控某个日志文件,当有新的日志被写入时,读出新的部分并进行处理(如解析内容,并分类存储,发出通知)

在写项目时,刚好遇到了这个问题,所以就思考如何实现

思路其实很明确,尽管大部分高级语言已经把文件读取封装了,但是如果写过 C/C++,应该是可以知道读入文件是可以一个字符一个字符读取(在算法竞赛中,这样读取甚至可能可以提升速度),也可以通过seek()移动到指定位置(也即可以直接跳到任意位置开始读数据)。

大概思路如下:

  1. 以只读方式打开文件
  2. 监控文件更改
    1. 发现文件被改动
    2. 从当前读入指针开始读取,直到结束
    3. 读入的数据就是新增的数据

使用 fsnotify 进行读取

fsnotify 是一个监控文件修改的库,可以在文件被创建、写入、删除、重命名、修改时触发通知。

package main

import (
	"bytes"
	"io"
	"os"
	"path"

	"github.com/OhYee/rainbow/errors"
	"github.com/OhYee/rainbow/log"
	"gopkg.in/fsnotify.v1"
)

func watch(file string) {
	watch, err := fsnotify.NewWatcher()
	if err != nil {
		log.Error.Println(errors.ShowStack(err))
		return
	}
	defer watch.Close()

	f, err := os.OpenFile(file, os.O_RDONLY|os.O_CREATE, os.ModePerm)
	if err != nil {
		log.Error.Println(errors.ShowStack(err))
		return
	}
	defer f.Close()

	// 读入已有的部分
	buf := bytes.NewBuffer([]byte{})
	b := make([]byte, 64)
	for {
		n, err := f.Read(b)
		if err != nil && err != io.EOF {
			log.Error.Println(errors.ShowStack(err))
			return
		}
		if n == 0 {
			break
		}
		// buf.Write(b[:n])
	}
	buf.Reset()

	//添加要监控的对象,文件或文件夹
	err = watch.Add(file)
	if err != nil {
		log.Error.Println(errors.ShowStack(err))
		return
	}

	log.Debug.Println("start watch", file)
	for {
		select {
		case ev := <-watch.Events:
			{
				//判断事件发生的类型,如下5种
				// Create 创建
				// Write 写入
				// Remove 删除
				// Rename 重命名
				// Chmod 修改权限
				// if ev.Op&fsnotify.Create == fsnotify.Create {
				// 	log.Debug.Println("创建文件 : ", ev.Name)
				// }
				if ev.Op&fsnotify.Write == fsnotify.Write {
					// log.Debug.Println("写入文件 : ", ev.Name, ev.String())
					for {
						n, err := f.Read(b)
						if err != nil && err != io.EOF {
							log.Error.Println(errors.ShowStack(err))
							return
						}
						if n == 0 {
							break
						}
						buf.Write(b[:n])
					}
					log.Debug.Println(buf.String())
					buf.Reset()
				}
				// if ev.Op&fsnotify.Remove == fsnotify.Remove {
				// 	log.Debug.Println("删除文件 : ", ev.Name)
				// }
				// if ev.Op&fsnotify.Rename == fsnotify.Rename {
				// 	log.Debug.Println("重命名文件 : ", ev.Name)
				// }
				// if ev.Op&fsnotify.Chmod == fsnotify.Chmod {
				// 	log.Debug.Println("修改权限 : ", ev.Name)
				// }
			}
		case err := <-watch.Errors:
			{
				log.Debug.Println("error : ", err)
				return
			}

		}
	}
}

func main() {
	logFile := path.Join("./", "log.log")
	watch(logFile)
	for {

	}
}

思路上,这么处理没有任何问题,但是在实际使用中就出现了问题,单次读入的数据可能不是完整的。导致这个的原因可能是读取的同时有文件正在写入,并且还未写入完整,从而导致只读入了一半。因此,我们应该确保每次都读到换行符再结束。

每次读入一个换行符

为了避免上述问题,需要确保每次读入只有是完整的一行(读到换行符)时,才将数据输出,否则结束读入,重新设置读入指针的位置到行首(原本位置)

具体代码如下

package main

import (
	"bufio"
	"io"
	"io/ioutil"
	"os"
	"path"
	"strings"

	"github.com/OhYee/rainbow/errors"
	"github.com/OhYee/rainbow/log"
	"gopkg.in/fsnotify.v1"
)

func main() {
	logFile := path.Join("./", "log.log")

	w, old, err := New(logFile)
	if err != nil {
		log.Error.Println(errors.ShowStack(err))
		os.Exit(1)
	}

	log.Info.Println(old)
	err = w.Watch(func(s string) {
		log.Info.Println(s)
	})
	if err != nil {
		log.Error.Println(errors.ShowStack(err))
	}
	defer w.Close()
	for {
	}
}

// Watcher 监控文件添加行
type Watcher struct {
	f     *os.File
	r     *bufio.Reader
	close chan bool
}

// New 初始化一个监控程序
func New(filename string) (w *Watcher, oldValue string, err error) {
	f, err := os.OpenFile(filename, os.O_RDONLY|os.O_CREATE, os.ModePerm)
	if err != nil {
		return
	}

	// 读入原本的内容
	b, err := ioutil.ReadAll(f)
	if err != nil {
		return
	}

	oldValue = string(b)

	w = &Watcher{
		f:     f,
		r:     bufio.NewReader(f),
		close: make(chan bool),
	}
	return
}

// Close 结束监控,关闭文件
func (w *Watcher) Close() {
	w.close <- true
	w.f.Close()
}

// Watch 开始进行监控
func (w *Watcher) Watch(callback func(line string)) (err error) {
	defer errors.Wrapper(&err)

	//添加要监控的对象,文件或文件夹
	watch, err := fsnotify.NewWatcher()
	if err != nil {
		return
	}

	err = watch.Add(w.f.Name())
	if err != nil {
		return
	}

	log.Debug.Println("start watch", w.f.Name())

	for {
		select {
		case ev := <-watch.Events:
			if ev.Op&fsnotify.Write == fsnotify.Write {
				// 监控到文件写入
				for {
					line, err := w.ReadLine()
					if err != nil && err != io.EOF {
						return err
					}
					if err == nil {
						// 正确读入一行
						go callback(line)
					} else {
						// 当前是由于 EOF 结束,后面无内容,不需要继续读入下一行
						break
					}
				}
			}
		case err = <-watch.Errors:
			return
		case <-w.close:
			return nil
		}
	}
}

// ReadLine 读入一行的数据
func (w *Watcher) ReadLine() (line string, err error) {
	pos, err := w.f.Seek(0, 1)
	if err != nil {
		return
	}

	line, err = w.r.ReadString('\n')
	if err == io.EOF {
		// 未读到换行符,读到 EOF
		// 放弃本次读取

		// 回退到原本的位置
		log.Debug.Println("读到 EOF,回退")
		w.f.Seek(pos, 0)
		return
	}
	if err != nil {
		// 存在其他错误
		return
	}
	// 清楚末尾的 \n
	line = strings.TrimRight(line, "\n")
	return
}

为了方面使用,上述功能已经在 v1.1.2 加入至github.com/OhYee/goutils/file,可以直接下载yi'lai

package main

import (
	"os"
	"path"

	"github.com/OhYee/goutils/file"
	"github.com/OhYee/rainbow/errors"
	"github.com/OhYee/rainbow/log"
)

func main() {
	logFile := path.Join("./", "log.log")

	w, old, err := file.New(logFile)
	if err != nil {
		log.Error.Println(errors.ShowStack(err))
		os.Exit(1)
	}

	log.Info.Println(old)
	err = w.Watch(func(s string) {
		log.Info.Println(s)
	})
	if err != nil {
		log.Error.Println(errors.ShowStack(err))
	}
	defer w.Close()
	for {
	}
}

参考资料