剑客
关注科技互联网

ZooKeeper的协议与客户端逻辑分析

ZooKeeper的协议与客户端逻辑分析

(图文无关)

最近在做让VLCP的中心数据存储支持ZooKeeper的工作。ZooKeeper是广泛使用的分布一致性服务,但是它的文档真的很少,尤其是缺少介绍wire protocol的文档。分析了官方的Java和C的binding之后,现在整理一下相关的内容。

ZooKeeper的wire protocol基于一个叫做jute的库。这个库,从查到的资料来看,最早是Hadoop用的,但后来被抛弃了,结果现在只有ZooKeeper还在用这个缺少维护的库了。库的功能不复杂,源代码来看一目了然,在ZooKeeper当中最重要的功能就是将结构体转换成二进制流,方法是生成相应结构体对应的转换代码(Java和C的), 转换的规则基本是按照Java的序列化的方式:大端格式,int 4字节,long 8字节,bool 1字节,没有额外字段。两种特殊的数据类型,ustring是一个4字节int(大端格式)表示字符串长度,跟着相应数量的字节,没有结尾的0字符;vector是一个4自己int(大端格式)表示数组大小,跟着相应数量的内部类型。

ZooKeeper的jute结构体定义可以参看源代码中src目录下的zookeeper.jute。

然而做到这种程度,只能说是定义了结构体,并没有如我们以前的文章 如何设计与实现一个自定义的二进制协议 中说的那样,能够正确让消息进行分片和解析。在ZooKeeper中,jute的结构体还要配合许多外围代码进行分步骤的解析,代码实现的真的不是很出色,也怪不得社区中关于这个问题总是怨声连天。对于不同版本之间的兼容性实现的也非常简单粗暴,比如说后续版本为ConnectRequest和ConnectResponse增加了一个字段,实现上就是通过hack进行的(没有把这个字段写到jute文件里面)

为了能够正确分片ZooKeeper的消息,ZooKeeper用了一个非常简单的方法:每个消息体的开头是一个4字节的长度,代表这个消息体剩余部分的字节数。这样从第一个消息体开始,所有的消息体都可以正确拆分出长度。解析出长度之后,整个消息体只需要从消息头开始从前向后解析就可以了。

整个连接客户端的第一个包必须是ConnectRequest,相应的服务器端回复的是ConnectResponse。jute中的定义如下:

    class ConnectRequest {
        int protocolVersion;
        long lastZxidSeen;
        int timeOut;
        long sessionId;
        buffer passwd;
    }
    class ConnectResponse {
        int protocolVersion;
        int timeOut;
        long sessionId;
        buffer passwd;
    }

用namedstruct(参见 namedstruct: 二进制结构体的正则表达式 – 网络与SDN – 知乎专栏 )改写:

ConnectRequest = nstruct(
        (int32, 'protocolVersion'),
        (int64, 'lastZxidSeen'),
        (int32, 'timeOut'),
        (int64, 'sessionId'),
        (z_buffer, 'passwd'),
        (boolean, 'readOnly'),
        name = 'ConnectRequest',
        base = _TypedZooKeeperRequest,
        criteria = lambda x: x.zookeeper_type == CONNECT_PACKET,
        init = packvalue(CONNECT_PACKET, 'zookeeper_type')
    )

_ConnectResponseOptional = nstruct(
        name = '_ConnectResponseOptional',
        padding = 1,
        inline = False
    )

_ConnectResponseReadOnly = nstruct(
        (boolean, 'readOnly'),
        name = '_ConnectResponseReadOnly',
        base = _ConnectResponseOptional,
        criteria = lambda x: x._realsize() > 0
    )

ConnectResponse = nstruct(
        (int32, 'protocolVersion'),
        (int32, 'timeOut'),
        (int64, 'sessionId'),
        (z_buffer, 'passwd'),
        (_ConnectResponseOptional,),
        name = 'ConnectResponse',
        base = _TypedZooKeeperReply,
        criteria = lambda x: x.zookeeper_type == CONNECT_PACKET,
        init = packvalue(CONNECT_PACKET, 'zookeeper_type'),
        lastextra = True
    )

注意到我们改写后多了一个readOnly的字段,这是协议中真实存在的字段,为了兼容以前的版本,没有写进jute,而是在Java代码当中进行了hack,加入了这个字段的支持。根据客户端是否支持该字段、服务器是否支持该字段,就会分成四种情况。

这两个包实现客户端与服务端的协商。ZooKeeper建立连接分两种情况:新建一个session,和失去网络连接后恢复到之前的session(包括重新连接到另一台服务器并恢复到原来的session)。这两种情况的区分通过sessionId进行,如果传递sessionId = 0,则建立新的session,服务器返回新的sessionId和相应的passwd,passwd是一个用于重新恢复session的密码,永远是16个字节;恢复session时,则传递原来的sessionId和passwd。

吐槽: ConnectRequest的protocolVersion字段,本来应当是设计为用来协商协议版本,以实现不同版本协议的请求有不同格式的。但它并没有用上,在现在的client和server的代码中都没有检查这个字段是否是0以外的值,这意味着无法通过这个字段实现ConnectRequest本身的格式变动——否则,新版本的client与旧版本的server进行协商的时候,旧版本server会直接忽略这个版本号,然后按旧版本的格式解析ConnectRequest。这就是为什么增加的readOnly字段需要通过hack的方式来得到支持。

ConnectRequest在新建session的时候,一定要将passwd设置为16个字节的字符串,否则无法正常连接——这简直毫无道理……原因在于server代码实现中,直接将client传递过来的passwd字节数组用随机数进行填充,而没有创建新的16字节数组。review代码的人应该拖出去续了。

passwd的算法是将sessionId与一个程序中固定的64位数异或,作为随机数种子生成16位随机字节。不是很安全。

ConnectRequest和ConnectResponse是唯一没有请求头部和响应头部的消息,这导致相应的代码增加了许多额外的逻辑,简直不明白为什么要自己给自己添堵……

除协商消息以外的其他消息,请求均以RequestHeader开头,响应均以ReplyHeader开头:

    class RequestHeader {
        int xid;
        int type;
    }
    class ReplyHeader {
        int xid;
        long zxid;
        int err;
    }
RequestHeader = nstruct(
        (zk_xid, 'xid'),
        (zk_request_type, 'type'),
        name = 'RequestHeader',
        base = _TypedZooKeeperRequest,
        criteria = lambda x: x.zookeeper_type != CONNECT_PACKET,
        init = packvalue(HEADER_PACKET, 'zookeeper_type'),
        classifier = lambda x: x.type
    )

ReplyHeader = nstruct(
        (int32, 'xid'),
        (int64, 'zxid'),
        (zk_err, 'err'),
        name = 'ReplyHeader',
        base = _TypedZooKeeperReply,
        criteria = lambda x: x.zookeeper_type != CONNECT_PACKET,
        init = packvalue(HEADER_PACKET, 'zookeeper_type')
    )

xid用于将请求和应答相对应,type用来表示请求的类型,响应中,zxid代表zookeeper当前状态的一个序号,这个序号单调递增且在不同服务器之间一致,可以当作是时间的一种代替;err代表错误类型,0的情况下没有出错。正的xid代表普通的请求,负的xid代表特殊请求,其中-1表示Watch事件(从服务器推送给客户端),-2表示ping请求,-4表示Auth请求,-8表示SetWatches请求。

吐槽: 以type和xid两者分工来表示消息体的类型,意义完全不明……reply当中没有type字段,这意味着为了正确解析消息,必须提前记住每个xid发送的是什么类型的请求。特殊类型的消息xid是特殊值,这意味着无法将Reply正确对应到相应的请求(比如发出多个AuthPacket的时候,其中一个失败了,会比较难确定失败的是哪一个)

ZooKeeper的普通请求(以正的xid表示)永远按请求的顺序返回。负的请求可以超前返回。ZooKeeper server在client不活动或者失去连接、而协商的sessionTimeout时间到达时会使会话超时,因此client必须不断发送ping请求来保持连接活动。

大部分请求都是一个请求格式与一个回应格式相对应。一部分请求的回应格式是空的(只有ReplyHeader)。当请求参数错误时,有时也会返回空的回应,需要配合消息长度以及err的值来判断。

在连接协商完成时,client可能需要发送AuthPacket来进行身份认证,认证会影响到节点的访问权限(ACL)。在重建session时,需要发送SetWatches请求来重新建立之前的Watch,防止Watch消息丢失。官方的client随着ConnectRequest一起发送这些包来降低连接建立的延迟。如果AuthPacket认证失败,整个session都会进入AUTH_FAIL状态,不能继续使用。当Watch很多的时候,SetWatches请求包可能超过了单个请求的最大限制(4MB),需要拆分成多个SetWatches请求。

CreateRequest请求用于创建节点,需要注意的是创建节点的时候一定要指定ACL,这通常是在官方的binding当中自动完成的:如果没有特殊需要可以指定为

perms = ZOO_PERM_ALL(0x1f),Id = (scheme = ‘world’, id = ‘anyone’)

其他请求都比较直白,不再详细描述。

新版本当中一个比较重要的功能是支持批量事务型操作,通过MultiRequest进行。MultiRequest请求的结构是首先有消息体长度和RequestHeader(和其他请求一样),然后对于事务型操作中的每个请求,首先是一个公共的MultiHeader:

MultiHeader = nstruct(
        (zk_request_type, 'type'),
        (boolean, 'done'),
        (zk_err, 'err'),
        name = 'MultiHeader',
        padding = 1
    )

然后按照相应类型附加相应的请求体。只支持create、delete、setdata和check四种操作,前三种和相应的请求相同,第四个表示检查相应的节点版本是否一致(即是否在上次获取的版本之后进行了修改),如果不一致则让整个multi操作失败,实现一个乐观锁的逻辑。

在所有的请求之后必须附加一个额外的MultiHeader,它的done字段为True,type = -1, err = -1 。

应答有相似的格式,首先是MultiHeader,然后分别跟着相应请求的应答体,最后是额外的MultiHeader。

吐槽: MultiHeader中没有长度字段,这对于将来的扩展非常致命,如果client和server支持的Mutli操作的类型不一致,其中一方将无法正确解析对方的请求或应答,因为对于未知类型的Multi操作,无法判断后续结构体的长度;这样如果高版本的client使用了新支持的类型去请求了旧的server,要么server崩溃,要么server无法返回相应数量的Multi结果,从而让client崩溃。

额外的MultiHeader完完全全不是必要的——从消息体总长度中就可以判断出Multi操作的数量,只要解析到没有剩余字节就行了。

只支持写入而不支持读取究竟是要做什么……同时读出多个字段,保证这些字段的值是一致的,这个功能很重要。

WatcherEvent是唯一的不需要客户端请求、而是由服务器端推送的消息。对于GetData、Exists、GetChildren、GetChildren2请求,可以要求服务器在相应的key的值发生相应变化(即:改变或删除;创建;子节点创建或删除)时推送提示消息,告知client已经发生了相应的变化。WatcherEvent包括了路径、客户端状态和改变类型三个字段:

_WatcherEvent = nstruct(
        (zk_watch_event, 'type'),  # event type
        (zk_client_state, 'state'), # state of the Keeper client runtime
        (ustring, 'path'),
        name = '_WatcherEvent',
        padding = 1
    )

吐槽: state只有ZOO_SYNC_CONNECTED_STATE一种可能返回的状态。它是不可能返回ZOO_DISCONNECTED_STATE的——断开连接的服务器当然不能推送消息。它也不可能返回ZOO_CONNECTED_READONLY_STATE——因为Readonly状态的服务器是不可能有状态改变的。AUTH_FAIL和EXPIRED自然也不可能。

ZooKeeper的wire protocol差不多就介绍到这里,VLCP中实现了一个新的ZooKeeper的Python binding,可以用协程来支持异步编程,近期应该可以release。

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址