上一篇文章简单介绍了一下Consul-template基本用法,本篇主要深入看一下consul-template的源码。
Consul-template的整个流程还是比较清晰的,不过代码中大量运用了goroutine、channel和goto等高级特性,如果不仔细看的话,有些地方可能理不清楚。
下图即是consul-template的整个执行流程。整个流程开有多个单独的goroutine,分别是监听信号重新加载配置或者停止、监听consul kv存储的变化发送更新通知、监听数据的更新通知以及渲染模板并执行命令等
初始化资源
consul-template启动后首先解析配置,并初始化相关资源
//cli.go
//dry:dry模式下会将渲染内容展示在stdout中,不会改变生成的文件,方便验证模板内容是否正确
//once:是否只渲染一下,可用于调试
runner, err := manager.NewRunner(config, dry, once)
if err != nil {
return logError(err, ExitCodeRunnerError)
}
go runner.Start()
//省略信号监控goroutine
```
```go
//runner.go
//NewRunner中调用init方法
func (r *Runner) init() error {
//...省略配置解析,异常捕获等代码
//根据配置文件创建链接consul的客户端
clients, err := newClientSet(r.config)
//创建监听器
watcher, err := newWatcher(r.config, clients, r.once)
//解析配置的模板
for _, ctmpl := range *r.config.Templates {
tmpl, err := template.NewTemplate(&template.NewTemplateInput{
//模板源文件的路径
Source: config.StringVal(ctmpl.Source),
//模板内容 和上面的路径必须保证有一个存在
Contents: config.StringVal(ctmpl.Contents),
})
}
//省略部分初始化代码
if *r.config.Dedup.Enabled {
if r.once {
} else {
//如果开启了de-dup属性的话,这里会创建de-dup管理器
r.dedup, err = NewDedupManager(r.config.Dedup, clients, r.brain, r.templates)
if err != nil {
return err
}
}
}
return nil
}
启动de-dup管理器和数据监控goroutine
de-dup主要是为了优化性能,具体可参考上一篇基本用法。
func (r *Runner) Start() {
//启动 de-duplication 管理器
var dedupCh <-chan struct{}
if r.dedup != nil {
if err := r.dedup.Start(); err != nil {
r.ErrCh <- err
return
}
dedupCh = r.dedup.UpdateCh()
}
if err := r.Run(); err != nil {
r.ErrCh <- err
return
}
for {
NEXT_Q:
for _, t := range r.templates {
if _, ok := r.quiescenceMap[t.ID()]; ok {
continue NEXT_Q
}
for _, c := range r.templateConfigsFor(t) {
if *c.Wait.Enabled {
r.quiescenceMap[t.ID()] = newQuiescence(
r.quiescenceCh, *c.Wait.Min, *c.Wait.Max, t)
continue NEXT_Q
}
}
if *r.config.Wait.Enabled {
r.quiescenceMap[t.ID()] = newQuiescence(
r.quiescenceCh, *r.config.Wait.Min, *r.config.Wait.Max, t)
continue NEXT_Q
}
}
OUTER:
select {
case view := <-r.watcher.DataCh():
r.Receive(view.Dependency(), view.Data())
//循环读取数据
for {
select {
case view := <-r.watcher.DataCh():
r.Receive(view.Dependency(), view.Data())
default:
break OUTER
}
}
case <-dedupCh:
//接收到de-dup消息
log.Printf("[INFO] (runner) watcher triggered by de-duplication manager")
break OUTER
}
//开始渲染数据
if err := r.Run(); err != nil {
r.ErrCh <- err
return
}
}
}
渲染模板
consul-template的模板语法其实是采用的golang模板的模板语法,通过自定义函数,来进行数据注入
//runner.go
//go runner.Start()
func (r *Runner) Run() error {
var newRenderEvent, wouldRenderAny, renderedAny bool
runCtx := &templateRunCtx{
depsMap: make(map[string]dep.Dependency),
}
//渲染模板
for _, tmpl := range r.templates {
//渲染单个模板
event, err := r.runTemplate(tmpl, runCtx)
if err != nil {
return err
}
}
//渲染模板完毕执行命令
var errs []error
for _, t := range runCtx.commands {
command := config.StringVal(t.Exec.Command)
env := t.Exec.Env.Copy()
env.Custom = append(r.childEnv(), env.Custom...)
if _, err := spawnChild(&spawnChildInput{
//省略
}); err != nil {
s := fmt.Sprintf("failed to execute command %q from %s", command, t.Display())
errs = append(errs, errors.Wrap(err, s))
}
}
return nil
}
//runner.go
func (r *Runner) runTemplate(tmpl *template.Template, runCtx *templateRunCtx) (*RenderEvent, error) {
// 检查本示例节点是否是leader节点
isLeader := true
if r.dedup != nil {
isLeader = r.dedup.IsLeader(tmpl)
}
//尝试渲染模板
result, err := tmpl.Execute(&template.ExecuteInput{
Brain: r.brain,
Env: r.childEnv(),
})
if err != nil {
return nil, errors.Wrap(err, tmpl.Source())
}
//检查模板渲染所需要的���据是否都满足了,如果不满足,则加入监控列表
missing, used := result.Missing, result.Used
for _, d := range used.List() {
if isLeader && !r.watcher.Watching(d) {
missing.Add(d)
}
if _, ok := runCtx.depsMap[d.String()]; !ok {
runCtx.depsMap[d.String()] = d
}
}
if l := unwatched.Len(); l > 0 {
for _, d := range unwatched.List() {
if isLeader || !d.CanShare() {
//注意此处将调用goroutine监控consul kv的变化
r.watcher.Add(d)
}
}
return event, nil
}
//如果开启了de-duplication模式,并且本示例为leader节点,则更新consul中模板渲染的结果,便于其他节点使用
if r.dedup != nil && isLeader {
if err := r.dedup.UpdateDeps(tmpl, used.List()); err != nil {
log.Printf("[ERR] (runner) failed to update dependency data for de-duplication: %v", err)
}
}
//如果开启了quiescence特性,则检查一定时间内是否已经更新过了,如果更新过了,不再更新,直接返回
if q, ok := r.quiescenceMap[tmpl.ID()]; ok {
q.tick()
event.ForQuiescence = true
return event, nil
}
// 对于每一个模板,将其渲染后的数据写入文件,并存储需要后续执行的命令
for _, templateConfig := range r.templateConfigsFor(tmpl) {
result, err := renderer.Render(&renderer.RenderInput{
Backup: config.BoolVal(templateConfig.Backup),
Contents: result.Output,
CreateDestDirs: config.BoolVal(templateConfig.CreateDestDirs),
Dry: r.dry,
DryStream: r.outStream,
Path: config.StringVal(templateConfig.Destination),
Perms: config.FileModeVal(templateConfig.Perms),
})
if result.DidRender {
//省略模板渲染后执行后续命令的代码
}
}
return event, nil
}
//renderer/renderer.go
func Render(i *RenderInput) (*RenderResult, error) {
existing, err := ioutil.ReadFile(i.Path)
if err != nil && !os.IsNotExist(err) {
return nil, errors.Wrap(err, "failed reading file")
}
//读取上次渲染后的结果,比较和本次结果是否一致,一致的话就不再重复写入
if bytes.Equal(existing, i.Contents) {
return &RenderResult{
DidRender: false,
WouldRender: true,
Contents: existing,
}, nil
}
if i.Dry {
//开启dry模式的话,不写入文件,只打印结果
fmt.Fprintf(i.DryStream, "> %s\n%s", i.Path, i.Contents)
} else {
//否则,确保原子写入文件
if err := AtomicWrite(i.Path, i.CreateDestDirs, i.Contents, i.Perms, i.Backup); err != nil {
return nil, errors.Wrap(err, "failed writing file")
}
}
return &RenderResult{
DidRender: true,
WouldRender: true,
Contents: i.Contents,
}, nil
}
//template/template.go
type ExecuteInput struct {
// Brain 存储模板渲染所需要的数据
Brain *Brain
Env []string
}
type Brain struct {
sync.RWMutex
//判断是否收到数据更新
receivedData map[string]struct{}
//存储具体的数据
data map[string]interface{}
}
//解析渲染模板:一、需要模板 二需要数据
func (t *Template) Execute(i *ExecuteInput) (*ExecuteResult, error) {
if i == nil {
i = &ExecuteInput{}
}
var used, missing dep.Set
//consul-template使用的go自带的模板渲染引擎
tmpl := template.New("")
//自定义分隔符
tmpl.Delims(t.leftDelim, t.rightDelim)
//自定义函数,包括需要渲染的数据都在自定义函数里面
tmpl.Funcs(funcMap(&funcMapInput{
t: tmpl,
brain: i.Brain,
env: i.Env,
used: &used,
missing: &missing,
}))
//解析模板
tmpl, err := tmpl.Parse(t.contents)
//开始渲染,返回结果放在b中,这里需要传的数据为nil,是因为数据都在自定义函数里面
var b bytes.Buffer
if err := tmpl.Execute(&b, nil); err != nil {
return nil, errors.Wrap(err, "execute")
}
return &ExecuteResult{
Used: &used,
Missing: &missing,
Output: b.Bytes(),
}, nil
}
func funcMap(i *funcMapInput) template.FuncMap {
return template.FuncMap{
"datacenters": datacentersFunc(i.brain, i.used, i.missing),
"file": fileFunc(i.brain, i.used, i.missing),
//所有的值都放在这里面,因此在模板中定义时需要添加上 {{ key xxx}}
//keyFunc这个函数主要就是从brain中取出数据
"key": keyFunc(i.brain, i.used, i.missing),
//省略其它自定义函数
}
}
监控consul-kv
这里监控consul的数据,采用的consul的阻塞get方法,默认会等待1分钟。如果有数据更新,立即返回;否则会超时返回。
//watch/watcher.go
func (w *Watcher) Add(d dep.Dependency) (bool, error) {
//创建监听视图
v, err := NewView(&NewViewInput{
Dependency: d,
Clients: w.clients,
MaxStale: w.maxStale,
Once: w.once,
RetryFunc: retryFunc,
VaultGrace: w.vaultGrace,
})
if err != nil {
return false, errors.Wrap(err, "watcher")
}
w.depViewMap[d.String()] = v
//开启单独的poll
go v.poll(w.dataCh, w.errCh)
return true, nil
}
// watch/view.go
func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
var retries int
for {
doneCh := make(chan struct{}, 1)
successCh := make(chan struct{}, 1)
fetchErrCh := make(chan error, 1)
//开启单独的goroutine从consul中获取数据
go v.fetch(doneCh, successCh, fetchErrCh)
WAIT:
select {
//省略接收到数据后的一些充值操作
}
}
}
//watch/view.go
func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
for {
case <-v.stopCh:
return
default:
}
data, rm, err := v.dependency.Fetch(v.clients, &dep.QueryOptions{
AllowStale: allowStale, //此变量决定是否从consul的flower节点拉取数据
WaitTime: defaultWaitTime,//defaultWaitTime = 60 * time.Second 默认超时时间为1分钟,不可以修改
WaitIndex: v.lastIndex,//consul通过此字段来判断客户端和consul的数据是否同步
VaultGrace: v.vaultGrace,
})
select {
//通知收到了数据
case successCh <- struct{}{}:
default:
}
//省略内容
return
}
}
//dependency/kv_get.go
func (d *KVGetQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
//这里会阻塞一分钟,在这一分钟内,如果数据有更新,会立即返回,否则会超时返回
pair, qm, err := clients.Consul().KV().Get(d.key, opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
rm := &ResponseMetadata{
LastIndex: qm.LastIndex,
LastContact: qm.LastContact,
Block: d.block,
}
value := string(pair.Value)
return value, rm, nil
}