Loading... <div class="tip inlineBlock warning"> 本文基于 **[0.2.0 版本 apisix-go-plugin-runner](https://github.com/apache/apisix-go-plugin-runner/tree/release/0.2.0)** 编写,注意信息时效 </div> ## 插件注册 ```go // Plugin represents the Plugin type Plugin interface { // Name returns the plguin name Name() string // ParseConf is the method to parse given plugin configuration. When the // configuration can't be parsed, it will be skipped. ParseConf(in []byte) (conf interface{}, err error) // Filter is the method to handle request. // It is like the `http.ServeHTTP`, plus the ctx and the configuration created by // ParseConf. // // When the `w` is written, the execution of plugin chain will be stopped. // We don't use onion model like Gin/Caddy because we don't serve the whole request lifecycle // inside the runner. The plugin is only a filter running at one stage. Filter(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) } ``` Go plugin runner 插件的 interface 有以上三个抽象方法。在启动的 `main` 包里会导入我们自定义的插件,插件的 `init()` 会调用 `RegisterPlugin()` 注册插件。 ```go // main.go import ( _ "apisix-go-plugin-runner/cmd/go-runner/plugins" ) // say.go func init() { err := plugin.RegisterPlugin(&Say{}) if err != nil { log.Fatalf("failed to register plugin say: %s", err) } } // plugin.go // RegisterPlugin register a plugin. Plugin which has the same name can't be registered twice. // This method should be called before calling `runner.Run`. func RegisterPlugin(p Plugin) error { return plugin.RegisterPlugin(p.Name(), p.ParseConf, p.Filter) } ``` 注册时会调用 `Name()` 方法检验是否已经有同名插件注册。如果没有则把 `ParseConf()` 和 `Filter()` 方法放在内存里 `pluginRegistry = pluginRegistries{opts: map[string]*pluginOpts{}}` 。 ```go func RegisterPlugin(name string, pc ParseConfFunc, sv FilterFunc) error { log.Infof("register plugin %s", name) ... opt := &pluginOpts{ ParseConf: pc, Filter: sv, } pluginRegistry.Lock() defer pluginRegistry.Unlock() if _, found := pluginRegistry.opts[name]; found { return ErrPluginRegistered{name} } pluginRegistry.opts[name] = opt return nil } ``` ## runner 启动 apisix 启动时会根据 `config.yaml` 配置文件中定义的 `ext-plugin` 启动 runner 并监听指定路径的 sock 文件。 ```yaml // apisix 配置文件 ext-plugin: cmd: ["/bin/apisix-go-runner", "run", "--mode", "prod", "--log_level", "warn"] sock_path: /tmp ``` ```go // runner 启动 // https://github.com/apache/apisix-go-plugin-runner/blob/release/0.2.0/internal/server/server.go#L171 func Run() { sockAddr := getSockAddr() // clean up sock file created by others if err := os.RemoveAll(sockAddr); err != nil { log.Fatalf("remove file %s: %s", sockAddr, err) } // clean up sock file created by me defer func() { if err := os.RemoveAll(sockAddr); err != nil { log.Errorf("remove file %s: %s", sockAddr, err) } }() l, err := net.Listen("unix", sockAddr) if err != nil { log.Fatalf("listen %s: %s", sockAddr, err) } defer l.Close() // the default socket permission is 0755, which prevents the 'nobody' worker process // from writing to it if the APISIX is run under root. err = os.Chmod(sockAddr, 0766) if err != nil { log.Fatalf("can't change mod for file %s: %s", sockAddr, err) } done := make(chan struct{}) quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) // handle conn go func() { for { conn, err := l.Accept() ... if err != nil { log.Errorf("accept: %s", err) continue } go handleConn(conn) } }() ... } ``` ## Filter/ParseConf  根据官方的流程示意图,可以得知命中路由规则后,apisix 会向 runner 的 sock 发起 RPC 请求(这里有坑,详见下一节)。 在 runner 启动的 `handleConn(conn)` 中,根据 rpc 头中不同的 type 来调用插件实现的 `Filter()` 或是 `ParseConf()` ```go func handleConn(c net.Conn) { .... header := make([]byte, util.HeaderLen) for { ty := header[0] length := binary.BigEndian.Uint32(header) buf := make([]byte, length) n, err = c.Read(buf) bd, err := dispatchRPC(ty, buf, c) ... n, err = c.Write(out) } } func dispatchRPC(ty byte, in []byte, conn net.Conn) (*flatbuffers.Builder, error) { var err error var bd *flatbuffers.Builder switch ty { case util.RPCPrepareConf: bd, err = plugin.PrepareConf(in) case util.RPCHTTPReqCall: bd, err = plugin.HTTPReqCall(in, conn) case util.RPCTest: // Just for test bd, err = dealRPCTest(in) default: err = UnknownType{ty} } return bd, err } // 调用 Filter func HTTPReqCall(buf []byte, conn net.Conn) (*flatbuffers.Builder, error) { ... err = filter(conf, resp, req) } func filter(conf RuleConf, w *inHTTP.Response, r pkgHTTP.Request) error { for _, c := range conf { plugin := findPlugin(c.Name) plugin.Filter(c.Value, w, r) } } // 调用 ParseConf func PrepareConf(buf []byte) (*flatbuffers.Builder, error) { ... // 插件的配置是缓存在 runner 的内存里的 token, err := cache.Set(req) } func (cc *ConfCache) Set(req *pc.Req) (uint32, error) { // 每次修改配置的缓存都有加锁,不用担心并发问题 cc.lock.Lock() defer cc.lock.Unlock() ... for i := 0; i < req.ConfLength(); i++ { name := string(te.Name()) plugin := findPlugin(name) conf, err := plugin.ParseConf(v) ... } } ``` ## 坑 在详读 apisix 代码发起 rpc 部分的代码后发现,apisix 内部会实现一套 LRU Cache,根据缓存变化情况发起 ParseConf 的 rpc 调用。 ```lua function (conf, ctx, sock, entry) local lrucache_id = core.lrucache.plugin_ctx_id(ctx, entry) local token, err = core.lrucache.plugin_ctx(lrucache, ctx, entry, rpc_call, constants.RPC_PREPARE_CONF, conf, ctx, lrucache_id) ``` 且仅在有请求命中这个插件时(不是请求 apisix 的 plugins_config Admin API 修改配置),才会视情况发起 ParseConf 和触发 Filter 的 RPC 调用。 ```lua -- 这里 ext-plugin-pre-req 和 ext-plugin-post-resp(0.5.0以上版本才有)位置响应的插件同理 -- 仅有在请求命中配有该插件的路由时才会响应 -- -- apisix/plugins/ext-plugin-post-req.lua function _M.access(conf, ctx) return ext.communicate(conf, ctx, name) end ``` > 所以不要妄想在 ParseConf 里插奇怪的逻辑,使用 Admin API 触发该方法 ## Reference https://segmentfault.com/a/1190000040549834 最后修改:2023 年 02 月 27 日 10 : 44 AM © 允许规范转载 赞赏 如果觉得我的文章对你有用,请随意赞赏 赞赏作者 支付宝微信