最近一直在搞网关插件,没来的及写博客。不过也有好处,就是对代码的熟悉程度更高了一点。
今天我们来看一下kong的事件通知机制。
kong的事件通知基于lua-resty-worker-events 这个库,这个库也是Kong开发的。
Kong的事件通知主要涉及两个方面,一个方面是单个节点内的事件通知,比如api信息更新,插件更新等;另一个方面是集群内的信息同步
我们先看单个节点内的事件通知:
单个节点内的事件主要采用发布订阅的方式,大家首先都到事件中心注册感兴趣的事件,然后当这些事件发生时,事件中心再进行回调处理。那么lua-resty-worker-events这个库就扮演着事件中心的角色。
事件注册的相关代码主要在 kong/core/router.lua文件里:
worker_events.register(function(data)
if not data.schema then
log(ngx.ERR, "[events] missing schema in crud subscriber")
return
end
if not data.entity then
log(ngx.ERR, "[events] missing entity in crud subscriber")
return
end
local cache_key = dao[data.schema.table]:entity_cache_key(data.entity)
if cache_key then
cache:invalidate(cache_key)
end
if data.old_entity then
cache_key = dao[data.schema.table]:entity_cache_key(data.old_entity)
if cache_key then
cache:invalidate(cache_key)
end
end
if not data.operation then
log(ngx.ERR, "[events] missing operation in crud subscriber")
return
end
local entity_channel = data.schema.table
local entity_operation_channel = fmt("%s:%s", data.schema.table,
data.operation)
-- crud:apis
local _, err = worker_events.post_local("crud", entity_channel, data)
if err then
log(ngx.ERR, "[events] could not broadcast crud event: ", err)
return
end
-- crud:apis:create
_, err = worker_events.post_local("crud", entity_operation_channel, data)
if err then
log(ngx.ERR, "[events] could not broadcast crud event: ", err)
return
end
end, "dao:crud")
从上面的代码我们可以看到,当数据库发生更新操作时,主要执行清除缓存的操作(缓存清除了,下次请求到来就会从数据库中重新拉取数据)
然后剩余的一些事件注册也在这个文件里,我们看一下主要也都是关于数据库变更的事件通知,做的主要操作也是清除缓存。另外还有一些事件会通知到集群的其它节点,比如:cluster_events:broadcast("balancer:targets", key)
。这个集群事件我们随后再分析。
接下来我们看一下事件的生产者:
从上面的事件订阅那里我们可以看到事件生产者主要应该在dao层这里,我们打开 kong/dao/dao.lua文件,在 update、insert和delete方法可以很明显的看到类似的语句。说明我们的判断是正确的。
local _, err = self.events.post_local("dao:crud", "delete", {
schema = self.schema,
operation = "delete",
entity = row,
})
总的来说单个node内的集群主要就是采用发布订阅的方式来进行事件的更新。
那么集群范围内又是如何更新节点的呢?
我们看一下kong/cluster_events.lua文件,可以很明显的看到有一个subscribe方法,也有一个broadcast方法,难道也是采用的发布订阅模式?其实的确是这样的:
function _M:broadcast(channel, data, nbf)
if type(channel) ~= "string" then
return nil, "channel must be a string"
end
if type(data) ~= "string" then
return nil, "data must be a string"
end
if nbf and type(nbf) ~= "number" then
return nil, "nbf must be a number"
end
-- insert event row
--log(DEBUG, "broadcasting on channel: '", channel, "' data: ", data,
-- " with nbf: ", nbf and nbf or "none")
local ok, err = self.strategy:insert(self.cluster_name, self.node_id, channel, ngx_now(), data, nbf)
if not ok then
return nil, err
end
return true
end
可以看到发布事件是主要就是往数据库中插入一条数据
然后我们看一下订阅相关的代码
function _M:subscribe(channel, cb, start_polling)
if type(channel) ~= "string" then
return error("channel must be a string")
end
if type(cb) ~= "function" then
return error("callback must be a function")
end
if not self.callbacks[channel] then
self.callbacks[channel] = { cb }
insert(self.channels, channel)
else
insert(self.callbacks[channel], cb)
end
if start_polling == nil then
start_polling = true
end
if not self.polling and start_polling then
-- start recurring polling timer
local ok, err = timer_at(self.poll_interval, poll_handler, self)
if not ok then
return nil, "failed to start polling timer: " .. err
end
self.polling = true
end
return true
end
local function poll(self)
-- get events since last poll
local min_at, err = self.shm:get(CURRENT_AT_KEY)
if err then
return nil, "failed to retrieve 'at' in shm: " .. err
end
if not min_at then
return nil, "no 'at' in shm"
end
-- apply grace period
min_at = min_at - self.poll_offset - 0.001
local max_at = ngx_now()
log(DEBUG, "polling events from: ", min_at, " to: ", max_at)
for rows, err, page in self.strategy:select_interval(self.channels, self.cluster_name, min_at, max_at) do
if err then
return nil, "failed to retrieve events from DB: " .. err
end
if page == 1 then
local ok, err = self.shm:safe_set(CURRENT_AT_KEY, max_at)
if not ok then
return nil, "failed to set 'at' in shm: " .. err
end
end
for i = 1, #rows do
local row = rows[i]
if row.node_id ~= self.node_id then
local ran, err = self.events_shm:get(row.id)
if err then
return nil, "failed to probe if event ran: " .. err
end
if not ran then
log(DEBUG, "new event (channel: '", row.channel, "') data: '",
row.data, "' nbf: '", row.nbf or "none", "'")
local exptime = self.poll_interval + self.poll_offset
-- mark as ran before running in case of long-running callbacks
local ok, err = self.events_shm:set(row.id, true, exptime)
if not ok then
return nil, "failed to mark event as ran: " .. err
end
local cbs = self.callbacks[row.channel]
if cbs then
for j = 1, #cbs do
if not row.nbf then
-- unique callback run without delay
local ok, err = pcall(cbs[j], row.data)
if not ok and not ngx_debug then
log(ERR, "callback threw an error: ", err)
end
else
-- unique callback run after some delay
local delay = max(row.nbf - ngx_now(), 0)
log(DEBUG, "delaying nbf event by ", delay, "s")
local ok, err = timer_at(delay, nbf_cb_handler, cbs[j], row)
if not ok then
log(ERR, "failed to schedule nbf event timer: ", err)
end
end
end
end
end
end
end
end
return true
end
可以看到订阅这里主要就是维持了一个定时器,不断根据间隔去数据库中poll数据,发现有数据更新了,执行一下数据更新回调(kong源代码中没有cluster_name字段,这个是我为了公司业务新添加的字段不影响阅读)
kong的事件更新机制主要还是发布订阅模型。