/
connection.rb
263 lines (224 loc) · 9.48 KB
/
connection.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
require File.dirname(__FILE__) + '/asymy'
module Asymy
# I'm thinking, one per connection
class Connection
def initialize(opts={})
@target = opts[:target]
@port = opts[:port]
@password = opts[:password].extend(StringX)
@database = opts[:database].extend(StringX)
@username = opts[:username].extend(StringX)
@queue = []
@state = :preauth
reco
end
# If I exec'd a statement right now, would it run right now? You probably
# don't care. Just call Connection#exec and hope.
def ready?
self.state == :ready
end
# Is there an error on the connection? Since I basically don't handle errors
# at all right now, and barely even catch them, your best bet is to give up.
def error?
self.error || false
end
# Queue up an SQL command, and, if the channel is open, send it. Takes a
# block argument that receives the results, in two arguments, fields (an array of hashes)
# and rows (an array of strings)
def exec(cmd, &block)
@queue << [cmd.extend(StringX), block, Commands::QUERY]
@fp.inject
end
def prepare(cmd, &block)
@queue << [cmd.extend(StringX), block, Commands::STMT_PREPARE]
@fp.inject
end
def execute_prepared(args, &block)
@queue << [args, block, Commands::STMT_EXECUTE]
@fp.inject
end
# no user-servicable parts below (attrs used to communicate with module)
attr_reader :password
attr_reader :database
attr_reader :username
attr_accessor :state
attr_accessor :error
attr_accessor :queue
private
# EM administrivia: the actual I/O for the connection is handled in a module mixed
# in to its own connection object. XXX factor this code out into Asymy#Connection, leave
# only stubs.
module Session
attr_accessor :bp
# EM's idiosyncratic initializer
def post_init; @framer = Framer.new; end
# receive 1-48739 bytes of data, which may contain one, two, zero, or 5.7
# MySQL packets.
def receive_data(buf)
@framer << buf
while @framer.complete?
num, packet = @framer.next_buffer
receive_packet(num, packet)
end
end
# receive a whole MySQL packet and run the state machine
def receive_packet(num, packet)
# special case errors until I waste the time to scan them to see if they're
# 4.0 or 4.1 packets. XXX
if packet[0].ord == 0xFF
self.error = packet[3..-1]
self.state = :error
end
def packet.eof?; self[0].ord == 0xfe; end
def packet.ok?; self[0].ord == 0x00; end
case self.state
when :preauth
handle_preauth(num, Packets::Greeting.new(packet))
when :auth_sent
handle_postauth(num, Packets::OK.new(packet))
# queries on a MySQL connection are synchronous. The response
# packets are:
# - ResultSet packet (which basically just says "OK")
# - Field packets (describing columns)
# - EOF (no more fields)
# - RowData packets (describing a row)
# - EOF (no more rows)
when :ready
inject
when :awaiting_result_set
# XXX just ignore for now
self.state = :awaiting_fields
when :awaiting_fields
if packet.eof?
self.state = :awaiting_rows
else
handle_field(num, Packets::Field.new(packet))
end
when :awaiting_rows
if packet.eof?
@cb.call(@fields, @rows)
@fields = nil
@rows = nil
ready!
else
# rows have a variable number of variable-length strings, and no other
# structure, so just hand the raw data off.
handle_row(num, packet)
end
when :awaiting_statement_handle
if packet.ok?
handle_statement_handle(num, Packets::PrepareOk.new(packet))
else
# XXX handle this case
@state = :error
end
when :awaiting_prepared_params
if packet.eof?
@state = :waiting_prepared_fields
else
# I cannot for the life of me figure out what I'm supposed
# to do with these --- using mysql-ruby, I can't get them
# to change based on their type. Why does MySQL send them?
# I figured it'd be to let me enforce types on the params.
end
when :awaiting_prepared_fields
if packet.eof?
@cb.call(@stmt)
@cb, @stmt, @expect_params, @expect_columns = nil, nil, nil, nil
ready!
else
# I guess I could cache these? But why bother? MySQL is just
# going to send them again. This protocol confuses and infuriates us!
end
when :error
pp self.error
else
raise "don't know how to handle"
end
end
def ready!; self.state = :ready; inject; end
def inject
return if not ready?
# this is going to get untidy real fast XXX
if(now = self.queue.slice!(0))
@cb = now[1]
case now[2]
when Commands::QUERY
self.state = :awaiting_result_set
p = Packets::Command.new
p.command = Commands::QUERY
p.arg = now[0]
send_data(p.marshall)
when Commands::STMT_PREPARE
self.state = :awaiting_statement_handle
p = Packets::Prepare.new
p.query = now[0]
send_data(p.marshall)
when Commands::STMT_EXECUTE
else
raise "wtf?"
end
end
end
def handle_preauth(num, greeting)
response = self.password.crypt(greeting.challenge_head + greeting.challenge_tail) unless self.password.empty?
response ||= "".extend(StringX)
a = Packets::Authenticate.new
a.client_flags = (Capabilities::LONG_PASSWORD |
Capabilities::LONG_FLAG |
Capabilities::CONNECT_WITH_DB |
Capabilities::LOCAL_FILES |
Capabilities::PROTOCOL_41 |
Capabilities::INTERACTIVE |
Capabilities::TRANSACTIONS |
Capabilities::SECURE_CONNECTION | # heh
Capabilities::MULTI_STATEMENTS |
Capabilities::MULTI_RESULTS)
a.charset_number = greeting.server_language
a.name = self.username
a.response = response
a.database = self.database
send_data(a.marshall(num+1))
self.state = :auth_sent
end
def handle_postauth(num, ok)
ready!
end
def handle_field(num, field)
@fields ||= []
fh = Hash.new
fh[:table] = field.table
fh[:name] = field.name
fh[:length] = field.length
fh[:type] = field.type
fh[:flags] = field.flags
fh[:decimals] = field.decimals
@fields << fh
end
def handle_row(num, row)
@rows ||= []
rv = []
while not row.empty?
rv << row.shift_lcstring
end
@rows << rv
end
def handle_statement_handle(num, ok)
@stmt = PreparedStatement.new(:backpointer => @bp,
:handle => ok.stmt_id)
@expect_params = @stmt.parameters
@expect_columns = @stmt.columns
if @expect_params > 0
@state = :awaiting_prepared_params
else
@state = :awaiting_prepared_columns
end
end
def method_missing(meth, *args); @bp.send(meth, *args); end
end
def reco
EventMachine::connect(@target, @port, Session) {|c| c.bp = self; @fp = c;}
end
public
end
end