基于consul实现watch机制

基于consul实现watch机制

前言

consul常常被用来作服务注册与服务发现,而它的watch机制则可被用来监控一些数据的更新,实时获取最新的数据。另外,在监控到数据变化后,还可以调用外部处理程序,此处理程序可以是任何可执行文件或HTTP调用,具体说明可见官网

当前consul支持以下watch类型如下所示:

  • key 监听一个consul kv中的key
  • keyprefix 监听consul kv中的key的前缀
  • services 监听有效服务的变化
  • nodes 监听节点的变化
  • service 监听服务的变化
  • checks 监听check的变化
  • event 监听自定义事件的变化

从以上可以看出consul提供非常丰富的监听类型,通过这些类型我们可以实时观测到consul整个集群中的变化,从而实现一些特别的需求,比如:实时更新、服务告警等功能。

基于Golang 实现watch 对服务变化的监控

consul官方提供了Golang版的watch包。其实际上也是对watch机制进行了一层封装,最终代码实现的只是对consul HTTP API 的 endpoints的使用,不涉及数据变化后的相关处理,封装程度不够。

接下来我将基于封装了的相关处理函数的工具包进行解决,详细代码可通过https://github.com/longpi1/consul-tool 进行下载查看。

1.客户端client.go 用于初始consul相关配置以及封装consul的api库的基础操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
package backends

import (
"encoding/json"
"fmt"
"github.com/hashicorp/consul/api"
errors "github.com/longpi1/consul-tool/pkg/error"
"github.com/longpi1/consul-tool/pkg/log"
"strings"
"sync"
"time"
)

// Option ...
type Option func(opt *Config)

// NewConfig 初始化consul配置
func NewConfig(opts ...Option) *Config {
c := &Config{
conf: api.DefaultConfig(),
watchers: make(map[string]*watcher),
logger: log.NewLogger(),
}

for _, o := range opts {
o(c)
}

return c
}

// Config 相关配置的结构体
type Config struct {
sync.RWMutex
logger log.Logger
kv *api.KV
conf *api.Config
watchers map[string]*watcher
prefix string
}

// 循环监听
func (c *Config) watcherLoop(path string) {
c.logger.Info("watcher start...", "path", path)

w := c.getWatcher(path)
if w == nil {
c.logger.Error("watcher not found", "path", path)
return
}

for {
if err := w.run(c.conf.Address, c.conf); err != nil {
c.logger.Warn("watcher connect error", "path", path, "error", err)
time.Sleep(time.Second * 3)
}

w = c.getWatcher(path)
if w == nil {
c.logger.Info("watcher stop", "path", path)
return
}

c.logger.Warn("watcher reconnect...", "path", path)
}
}

// 重置consul的watcher
func (c *Config) Reset() error {
watchMap := c.getAllWatchers()

for _, w := range watchMap {
w.stop()
}

return c.Init()
}

// Init 初始化consul客户端
func (c *Config) Init() error {
client, err := api.NewClient(c.conf)
if err != nil {
return fmt.Errorf("init fail: %w", err)
}

c.kv = client.KV()
return nil
}

// Put 插入该路径的kv
func (c *Config) Put(path string, value interface{}) error {
var (
data []byte
err error
)

data, err = json.Marshal(value)
if err != nil {
data = []byte(fmt.Sprintf("%v", value))
}

p := &api.KVPair{Key: c.absPath(path), Value: data}
_, err = c.kv.Put(p, nil)
if err != nil {
return fmt.Errorf("put fail: %w", err)
}
return nil
}

// Get 获取该路径的kv
func (c *Config) Get(keys ...string) (ret *KV) {
var (
path = c.absPath(keys...) + "/"
fields []string
)

ret = &KV{}
ks, err := c.list()
if err != nil {
ret.err = fmt.Errorf("get list fail: %w", err)
return
}

for _, k := range ks {
if !strings.HasPrefix(path, k+"/") {
ret.err = errors.ErrKeyNotFound
continue
}
field := strings.TrimSuffix(strings.TrimPrefix(path, k+"/"), "/")
if len(field) != 0 {
fields = strings.Split(field, "/")
}

kvPair, _, err := c.kv.Get(k, nil)
ret.value = kvPair.Value
ret.key = strings.TrimSuffix(strings.TrimPrefix(path, c.prefix+"/"), "/")
if err != nil {
err = fmt.Errorf("get fail: %w", err)
}
ret.err = err
break
}

if len(fields) == 0 {
return
}
ret.key += "/" + strings.Join(fields, "/")
return
}

// Delete 删除该路径的kv
func (c *Config) Delete(path string) error {
_, err := c.kv.Delete(c.absPath(path), nil)
if err != nil {
return fmt.Errorf("delete fail: %w", err)
}
return nil
}

// Watch 实现监听
func (c *Config) Watch(path string, handler func(*KV)) error {
// 初始化watcher
watcher, err := newWatcher(c.absPath(path))
if err != nil {
return fmt.Errorf("watch fail: %w", err)
}
// 对应的路径发生变化时,调用对应的处理函数
watcher.setHybridHandler(c.prefix, handler)
// 相应路径下添加对应的wathcer用于实现watch机制
err = c.addWatcher(path, watcher)
if err != nil {
return err
}
// 调用协程循环监听
go c.watcherLoop(path)
return nil
}

// StopWatch 停止监听
func (c *Config) StopWatch(path ...string) {
if len(path) == 0 {
c.cleanWatcher()
return
}

for _, p := range path {
wp := c.getWatcher(p)
if wp == nil {
c.logger.Info("watcher already stop", "path", p)
continue
}

c.removeWatcher(p)
wp.stop()
for !wp.IsStopped() {
}
}
}

// 获取绝对路径
func (c *Config) absPath(keys ...string) string {
if len(keys) == 0 {
return c.prefix
}

if len(keys[0]) == 0 {
return c.prefix
}

if len(c.prefix) == 0 {
return strings.Join(keys, "/")
}

return c.prefix + "/" + strings.Join(keys, "/")
}

func (c *Config) list() ([]string, error) {
keyPairs, _, err := c.kv.List(c.prefix, nil)
if err != nil {
return nil, err
}

list := make([]string, 0, len(keyPairs))
for _, v := range keyPairs {
if len(v.Value) != 0 {
list = append(list, v.Key)
}
}

return list, nil
}

// WithPrefix ...
func WithPrefix(prefix string) Option {
return func(c *Config) {
c.prefix = prefix
}
}

// WithAddress ...
func WithAddress(address string) Option {
return func(c *Config) {
c.conf.Address = address
}
}

// Withlogger ...
func Withlogger(logger log.Logger) Option {
return func(c *Config) {
c.logger = logger
}
}


// CheckWatcher ...
func (c *Config) CheckWatcher(path string) error {
c.RLock()
defer c.RUnlock()

if _, ok := c.watchers[c.absPath(path)]; ok {
return errors.ErrAlreadyWatch
}

return nil
}

func (c *Config) getWatcher(path string) *watcher {
c.RLock()
defer c.RUnlock()

return c.watchers[c.absPath(path)]
}

func (c *Config) addWatcher(path string, w *watcher) error {
c.Lock()
defer c.Unlock()

if _, ok := c.watchers[c.absPath(path)]; ok {
return errors.ErrAlreadyWatch
}

c.watchers[c.absPath(path)] = w
return nil
}

func (c *Config) removeWatcher(path string) {
c.Lock()
defer c.Unlock()

delete(c.watchers, c.absPath(path))
}

func (c *Config) cleanWatcher() {
c.Lock()
defer c.Unlock()

for k, w := range c.watchers {
w.stop()
delete(c.watchers, k)
}
}

// 获取所有的watcher
func (c *Config) getAllWatchers() []*watcher {
c.RLock()
defer c.RUnlock()

watchers := make([]*watcher, 0, len(c.watchers))
for _, w := range c.watchers {
watchers = append(watchers, w)
}

return watchers
}


2.watcher.go实现对watch机制相关函数的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package backends

import (
"bytes"
"fmt"
"strings"
"sync"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)

//初始化对应的watcher ,这里设置的是监听路径的类型,也可以支持service、node等,通过更改type
//支持的type类型有
//key - Watch a specific KV pair
//keyprefix - Watch a prefix in the KV store
//services - Watch the list of available services
//nodes - Watch the list of nodes
//service- Watch the instances of a service
//checks - Watch the value of health checks
//event - Watch for custom user events
func newWatcher(path string) (*watcher, error) {
wp, err := watch.Parse(map[string]interface{}{"type": "keyprefix", "prefix": path})
if err != nil {
return nil, err
}

return &watcher{
Plan: wp,
lastValues: make(map[string][]byte),
err: make(chan error, 1),
}, nil
}

func newServiceWatcher(serviceName string) (*watcher, error) {
wp, err := watch.Parse(map[string]interface{}{"type": "service", "service": serviceName})
if err != nil {
return nil, err
}
return &watcher{
Plan: wp,
lastValues: make(map[string][]byte),
err: make(chan error, 1),
}, nil
}

type watcher struct {
sync.RWMutex
*watch.Plan
lastValues map[string][]byte
hybridHandler watch.HybridHandlerFunc // 当对于路径发生变化时,调用相应函数
stopChan chan struct{}
err chan error
}

//获取value
func (w *watcher) getValue(path string) []byte {
w.RLock()
defer w.RUnlock()

return w.lastValues[path]
}

//更新value
func (w *watcher) updateValue(path string, value []byte) {
w.Lock()
defer w.Unlock()

if len(value) == 0 {
delete(w.lastValues, path)
} else {
w.lastValues[path] = value
}
}

//用于设置对应的处理函数
func (w *watcher) setHybridHandler(prefix string, handler func(*KV)) {
w.hybridHandler = func(bp watch.BlockingParamVal, data interface{}) {
kvPairs := data.(api.KVPairs)
ret := &KV{}

for _, k := range kvPairs {
path := strings.TrimSuffix(strings.TrimPrefix(k.Key, prefix+"/"), "/")
v := w.getValue(path)

if len(k.Value) == 0 && len(v) == 0 {
continue
}

if bytes.Equal(k.Value, v) {
continue
}

ret.value = k.Value
ret.key = path
w.updateValue(path, k.Value)
handler(ret)
}
}
}

//运行watcher机制
func (w *watcher) run(address string, conf *api.Config) error {
w.stopChan = make(chan struct{})
w.Plan.HybridHandler = w.hybridHandler

go func() {
w.err <- w.RunWithConfig(address, conf)
}()

select {
case err := <-w.err:
return fmt.Errorf("run fail: %w", err)
case <-w.stopChan:
w.Stop()
return nil
}
}

func (w *watcher) stop() {
close(w.stopChan)
}

3.main.go,初始化consul配置信息后,实现对test路径下的Key进行监听;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"github.com/longpi1/consul-tool/internal/backends"
"log"
"os"
"os/signal"
"syscall"
"time"
)

func main() {
// 初始化consul配置信息
cli := backends.NewConfig(backends.WithPrefix("kvTest"))
if err := cli.Init(); err != nil {
log.Fatalln(err)
}
//监听consul中的key: test
err := cli.Watch("test", func(r *backends.KV) {
log.Printf("该key: %s 已经更新", r.Key())
})
if err != nil {
log.Fatalln(err)
}
//插入key
if err := cli.Put("test", "value"); err != nil {
log.Fatalln(err)
}
//读取key
if ret := cli.Get("test"); ret.Err() != nil {
log.Fatalln(ret.Err())
} else {
println(ret.Value())
}

c := make(chan os.Signal, 1)
// 监听退出相关的syscall
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
s := <-c
log.Printf("exit with signal %s", s.String())
switch s {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
//停止监听对应的路径
cli.StopWatch("test")
time.Sleep(time.Second * 2)
close(c)
return
case syscall.SIGHUP:
default:
close(c)
return
}
}
}

参考链接

  1. JackBai233,使用Consul的watch机制监控注册的服务变化
  2. 风车,深入Consul Watch功能

基于consul实现watch机制
https://blog.longpi1.com/2022/12/04/基于consul实现watch机制/