https://github.com/mysqljs/mysql是一个用Node.js写的mysql驱动,代码简洁,明白。
一个mysql驱动要完成的功能是,建立到mysql服务器的tcp连接,然后按照mysql的protocol把数据发送给服务器,服务器收到数据以后,解析出要执行的命令和sql,执行sql,把结果按照protocol写回到客户端。
mysqljs/mysql的对象主要有
- Connection 管理到服务器的连接
- Protocol 处理发送到服务器的数据和服务器返回的数据
- Sequence/Query 代表一个query
- ComQueryPacket和其他Packet 代表一个发送给服务器的消息
- PacketWriter 把Packet转换为buffer交给Protocol
- Parser 解析Protocol从服务器获得的buffer
建立连接
建立连接的代码
1 | var mysql = require('mysql'); |
lib/Connection.js
1 | function Connection(options) { |
Connection对象的_socket是一个到mysql的tcp链接,代码在
lib/Connection.js
1 | Connection.prototype.connect = function connect(options, callback) { |
上面的this._protocol是一个Protocol对象,Protocol对象是一个stream
lib/protocol/Protocol.js
1 | Util.inherits(Protocol, Stream); |
执行query
connection.query(‘select * from users’)实际创建了一个query,然后调用Protocol._enqueue
1 | Connection.prototype.query = function query(sql, values, cb) { |
Connection.createQuery返回了一个Query对象,Query对象继承自Sequence对象
lib/protocol/sequences/Query.js
1 | Util.inherits(Query, Sequence); |
Sequence是一个EventEmmiter
lib/protocol/sequences/Sequence.js
1 | Util.inherits(Sequence, EventEmitter); |
Connection.createQuery创建的query被添加到Protocol对象的一个内部queue中,然后调用query.start
lib/protocol/Protocol.js
1 | Protocol.prototype._enqueue = function(sequence) { |
Protocol有一个enqueue事件,在有新的query被enqueue时会触发
lib/protocol/Protocol.js
1 | Protocol.prototype._enqueue = function(sequence) { |
在建立的连接的时候,Connection对象会给这个事件添加监听函数
lib/Connection.js
1 | Connection.prototype.connect = function connect(options, callback) { |
this._handleProtocolEnqueue函数仅仅是把这个事件在Connection对象上再emit一次,这样就能允许Connection对象的调用者监听Connection的enqueue事件
1 | Connection.prototype._handleProtocolEnqueue = function _handleProtocolEnqueue(sequence) { |
在Protocol._enqueue方法中,监听了Query对象(Sequence对象)的很多事件
lib/protocol/Protocol.js
1 | Protocol.prototype._enqueue = function(sequence) { |
Protocol在_enqueue方法中调用了Protocol.prototype._startSequence,Protocol.prototype._startSequence的定义
lib/protocol/Protocol.js
1 | Protocol.prototype._startSequence = function(sequence) { |
Sequeuce.start一般由子类实现,所以可以看Query.start
query按照protocol转换为buffer
lib/protocol/sequences/Query.js
1 | Query.prototype.start = function() { |
这个方法创建了一个ComQueryPacket,然后通过packet事件通知其他组件。Query是一个EventEmitter,所以可以通过事件和Query对象通信。
lib/protocol/packets/ComQueryPacket.js
1 | module.exports = ComQueryPacket; |
ComQueryPacket类仅仅有两个成员,command和sql
根据mysql文档
A COM_QUERY is used to send the server a text-based query that is executed immediately.
Query.prototype.start方法emit的packet事件,会触发在Protocol.prototype._enqueue中设置的handler
1 | Protocol.prototype._enqueue = function(sequence) { |
我们看到这里handler调用了self._emitPacket(packet);
1 | Protocol.prototype._emitPacket = function(packet) { |
_emitPacket调用了packet.write
1 | ComQueryPacket.prototype.write = function(writer) { |
packet.write调用了PacketWriter的writeUnsignedNumber和writeString方法
PacketWriter定义
lib/protocol/PacketWriter.js
1 | module.exports = PacketWriter; |
writeUnsignedNumber方法把一个number写到内部的一个buffer,this._buffer
1 | PacketWriter.prototype.writeUnsignedNumber = function(bytes, value) { |
writeString方法把一个string写到this._buffer
接下来Protocol emit了data事件
1 | this.emit('data', packetWriter.toBuffer(this._parser)); |
这里调用了packetWriter.toBuffer
1 | PacketWriter.prototype.toBuffer = function toBuffer(parser) { |
这个方法调整this._buffer的内容,具体步骤是,如果this._buffer的长度超过了MAX_PACKET_LENGTH,那么把this._buffer分成几个packet,每个packet的最大长度是MAX_PACKET_LENGTH。同时在每个packet前添加4个字节的内容,前三个字节代表packet的长度,第四个字节代表packet的序号。然后返回调整后的this._buffer
Protocol emit的data事件会触发在Connection中设置的handler
1 | this._protocol.on('data', function(data) { |
这里把packet写到connection._socket中,服务器就会收到packet。
当mysql服务器收到客户端的数据的时候,先读取4个字节,获得packet的长度和序号,然后读取第一个packet,接下来又读取4个字节,获得下一个packet的长度和序号,依次类推,最终读取完客户端发送的数据。
MAX_PACKET_LENGTH的定义是
1 | var MAX_PACKET_LENGTH = Math.pow(2, 24) - 1; // 大概16M |
明显一般的packet都不会超过16M,所以服务器收到的大部分packet都类似于
1 | 三个字节长度 - 1个字节序号 - packet内容 |
服务器根据长度读取到packet的内容,然后从内容中解析出,要执行的command的类型,比如0x03,和对应的sql。
服务器返回数据
服务器返回数据的时候,会往Connection._socket中写数据,因此会触发Connection的data事件
在Connection的connect方法中,设置了Connection.socket的data事件的handler
1 | this._socket.on('data', wrapToDomain(connection, function (data) { |
服务器返回的数据交给Parser,Parser解析出服务器返回的查询结果
1 | Protocol.prototype.write = function(buffer) { |