go-zero服务注册和发现

go-zero 服务注册和发现
在没有服务注册和发现的时候, 没新上⼀个服务, 或者没部署⼀个新的节点, 都要改所有调⽤⽅的配置⽂件, 简直就是配置噩梦, 还容易配置错误分析⼀个go-zero的服务注册和发现,
, 看看rpc客户端怎么寻址到rpc服务端的
//logic调⽤的代码
regRsp, err := l.svcCtx.UserServiceRpc., in)
//rpc/
func NewUserService(cli zrpc.Client) UserService {
return &defaultUserService{
cli: cli,
}
}
// 注册
func (m *defaultUserService) Register(ctx context.Context, in *RegisterRequest) (*RegisterResponse, error) {
//发起调⽤, 使⽤的是上⾯NewUserService⾥的zrpc.Client
client := userService.NewUserServiceClient(m.cli.Conn())
return client.Register(ctx, in)
}
//api/internal/ 中调⽤的NewUserService
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config:        c,
Model:          model.NewUserinfoModel(sqlx.NewMysql(c.DataSource), c.Cache),
UserServiceRpc: userserviceclient.NewUserService(zrpc.MustNewClient(c.Rpc)), //初始化rpcClient
}
}
//先看⼀下zrpc.MustNewClient 这个⽅法, 传⼊的配置⽂件中的etcd 的hosts和服务key, 跟进去看下这个⽅法
//github/tal-tech/go-zero/
//这个⽅法没啥, 继续往下⾯⾛
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
cli, err := NewClient(c, )
if err != nil {
log.Fatal(err)
}
return cli
}
//⽅法中主要⽅法是 internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), )
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
var opts []ClientOption
if c.HasCredential() {
opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
App:  c.App,
Token: c.Token,
})))
}
if c.Timeout > 0 {
opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
}
opts = append(opts, )
var client Client
var err error
if len(c.Endpoints) > 0 {
client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), )
} else if err = c.Etcd.Validate(); err == nil {
client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), )
}
if err != nil {
return nil, err
}
return &RpcClient{
client: client,
}, nil
}
//先看⼀下internal.BuildDiscovTarget, 这个⽅法⼊参是etcd的hosts和服务的key, 返回的是⼀个类似url的东西, 协议是DiscovScheme = "discov"
func BuildDiscovTarget(endpoints []string, key string) string {
return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
strings.Join(endpoints, resolver.EndpointSep), key)
}
//BuildDiscovTarget返回的url类似 : discov://127.0.0.1:2379/user-service, 传⼊NewClient中
//这个函数有两个核⼼逻辑⼀个是grpc.WithBalancerName(p2c.Name)
// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
var cli client
opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, )
if err := cli.dial(target, ); err != nil {
return nil, err
}
return &cli, nil
}
//WithBalancerName 这⽅法看名字知道是负载均衡的作⽤, 通过balancerName获取的, 对应⼊参的p2c.Name
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
}
return newFuncDialOption(func(o *dialOptions) {
o.balancerBuilder = builder
})
}
//跟着p2c.Name进去, github/tal-tech/go-zero@v1.1.7/zrpc/internal/balancer/
//在这⾥注⼊的负载均衡, 核⼼的逻辑在Pick中, ⼤致是⼀个可选就选⼀个, 两个就选择连接数最⼩的, 两个以上就随机两个出来进⾏选择
//如何选择的逻辑在choose(c1, c2 *subConn)⽅法中, 基本上就两个选连接数⼩的那个,
func init() {
balancer.Register(newBuilder())
}
type p2cPickerBuilder struct{}
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}
//....
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
p.lock.Lock()
defer p.lock.Unlock()
var chosen *subConn
switch s) {
case 0:
return nil, nil, balancer.ErrNoSubConnAvailable
case 1:
chosen = p.s[0], nil)
case 2:
chosen = p.s[0], p.conns[1])
default:
var node1, node2 *subConn
for i := 0; i < pickTimes; i++ {
a := p.r.Intn(s))
b := p.r.Intn(s) - 1)
if b >= a {
b++
}
node1 = p.conns[a]
node2 = p.conns[b]
if node1.healthy() && node2.healthy() {
break
}
}
chosen = p.choose(node1, node2)
}
atomic.AddInt64(&chosen.inflight, 1)
atomic.AddInt64(&quests, 1)
, p.buildDoneFunc(chosen), nil
}
/
/继续返回去看NewClient⽅法中的dial⽅法, 传⼊的是target, 也就是那个url, discov://127.0.0.1:2379/user-service
if err := cli.dial(target, ); err != nil {
return nil, err
}
//github/tal-tech/go-zero@v1.1.7/zrpc/
//这个⽅法本⾝没什么, 主要就是调⽤grpc.DialContext()⽅法, 这⾥就进⼊了grpc的逻辑了, 相当于通过grpc dial 了discov://127.0.0.1:2379/user-service, 继续进去看
func (c *client) dial(server string, opts ...ClientOption) error {
options := c.)
timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
conn, err := grpc.DialContext(timeCtx, server, )
if err != nil {
service := server
if errors.Is(err, context.DeadlineExceeded) {
pos := strings.LastIndexByte(server, separator)
// len(server) - 1 is the index of last char
if 0 < pos && pos < len(server)-1 {
service = server[pos+1:]
}
}
return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is alread started",
server, err.Error(), service)
}
< = conn
return nil
}
///grpc@v1.29., 这个代码逻辑⽐较多, 我们到我们关⼼的部分(服务发现), 就是如何解析discov://127.0.0.1:2379/user-service 成为⼀个ip:port, 通过分析发现cc.parsedTarget.Scheme, 也就是⼀开始拼接的discov字符func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target:            target,
csMgr:            &connectivityStateManager{},
conns:            make(map[*addrConn]struct{}),
dopts:            defaultDialOptions(),
blockingpicker:    newPickerWrapper(),
czData:            new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
//.....
//发现是通过Scheme去获取的, 也就是discov,
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
//....
return cc, nil
}
///grpc@v1.29.1/
//发现get是⽤map中读的, map数据是Register⽅法注⼊的, 返回到DiscovScheme = "discov"定义的地⽅, 看看有没有调⽤Register⽅法
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
// defaultScheme is the default scheme to use.
defaultScheme = "passthrough"
)
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
m[b.Scheme()] = b
}
// Get returns the resolver builder registered with the given scheme.
//
// If no builder is register with the scheme, nil will be returned.
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
/
/github/tal-tech/go-zero@v1.1.7/zrpc/internal/
//注⼊的是discovBuilder, 继续看下discovBuilder的具体实现
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
}
//github/tal-tech/go-zero@v1.1.7/zrpc/internal/
//具体逻辑看函数中的知识
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
//new⼀个服务发现的客户端, ⾥⾯基本上就是个etcd的封装, etcd的逻辑在NewSubscriber⾥⾯, ⽐较简单, 就写出来了
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
//拿到服务key的所有etcd中的数据
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
//实时监听etcd数据变化, 然后通过update⽅法更新数据到grpc的client
sub.AddListener(update)
//初始化的时候调⽤⼀次
update()
return &nopResolver{cc: cc}, nil
}
func (d *discovBuilder) Scheme() string {
return DiscovScheme
}
//到这⾥客户端的服务发现就结束了
//服务注册的代码在中, MustNewServer调⽤NewServer
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
server, err := NewServer(c, register)
if err != nil {
log.Fatal(err)
}
return server
}
//NewServer调⽤的是server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, c.ListenOn, internal.WithMetrics(metrics)) //这个⽅法进去就是调⽤discov.NewPublishe
pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, pubListenOn)
//继续跟进去, 发现在KeepAlive()中会调⽤register⽅法, ⽤etcd的put⽅法注册到etcd中(client.Put)
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
resp, err := client.Grant(client.Ctx(), TimeToLive)
if err != nil {
return clientv3.NoLease, err
}
lease := resp.ID
if p.id > 0 {
p.fullKey = makeEtcdKey(p.key, p.id)
} else {
p.fullKey = makeEtcdKey(p.key, int64(lease))
}
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
return lease, err
}
总结
1. go-zero的注册发现代码⽐较容易懂, ⽐较简单, 可以作为初步阅读源码的练⼿项⽬
2. 业务上基本上是够⽤的

本文发布于:2024-09-25 23:10:07,感谢您对本站的认可!

本文链接:https://www.17tex.com/xueshu/824858.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:发现   服务   注册   继续   逻辑   客户端
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议