fluent-plugin-mysql, a plugin for Fluentd
fluent plugin mysql bulk insert is high performance and on duplicate key update respond.
fluent-plugin-mysql-bulk merged this repository.
mysql plugin is deprecated. You should use mysql_bulk.
param | value |
---|---|
host | Database host(default: 127.0.0.1) |
port | Database port(default: 3306) |
database | Database name(require) |
username | User(require) |
password | Password(default: blank) |
column_names | Bulk insert column (require) |
key_names | Value key names, ${time} is placeholder Time.at(time).strftime("%Y-%m-%d %H:%M:%S") (default : column_names) |
json_key_names | Key names which store data as json, comma separator. |
table | Bulk insert table (require) |
on_duplicate_key_update | On duplicate key update enable (true:false) |
on_duplicate_key_operations | An array of 'column,operator' where operator is the desired update operator. |
aggregate_data | Data aggregation enable (true: false) |
aggregate_key_list | List of columns to include in the aggregation key |
<match mysql.input>
@type mysql_bulk
host localhost
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
flush_interval 10s
</match>
Assume following input is coming:
mysql.input: {"user_name":"toyama","created_at":"2014/01/03 21:35:15","updated_at":"2014/01/03 21:35:15","dummy":"hogehoge"}
mysql.input: {"user_name":"toyama2","created_at":"2014/01/03 21:35:21","updated_at":"2014/01/03 21:35:21","dummy":"hogehoge"}
mysql.input: {"user_name":"toyama3","created_at":"2014/01/03 21:35:27","updated_at":"2014/01/03 21:35:27","dummy":"hogehoge"}
then result becomes as below (indented):
+-----+-----------+---------------------+---------------------+
| id | user_name | created_at | updated_at |
+-----+-----------+---------------------+---------------------+
| 1 | toyama | 2014-01-03 21:35:15 | 2014-01-03 21:35:15 |
| 2 | toyama2 | 2014-01-03 21:35:21 | 2014-01-03 21:35:21 |
| 3 | toyama3 | 2014-01-03 21:35:27 | 2014-01-03 21:35:27 |
+-----+-----------+---------------------+---------------------+
running query
INSERT INTO users (id,user_name,created_at,updated_at) VALUES (NULL,'toyama','2014/01/03 21:35:15','2014/01/03 21:35:15'),(NULL,'toyama2','2014/01/03 21:35:21','2014/01/03 21:35:21')
<match mysql.input>
@type mysql_bulk
host localhost
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
on_duplicate_key_update true
on_duplicate_key_operations ["user_name,=","updated_at,="]
flush_interval 60s
</match>
Assume following input is coming:
mysql.input: {"id":"1" ,"user_name":"toyama7","created_at":"2014/01/03 21:58:03","updated_at":"2014/01/03 21:58:03"}
mysql.input: {"id":"2" ,"user_name":"toyama7","created_at":"2014/01/03 21:58:06","updated_at":"2014/01/03 21:58:06"}
mysql.input: {"id":"3" ,"user_name":"toyama7","created_at":"2014/01/03 21:58:08","updated_at":"2014/01/03 21:58:08"}
mysql.input: {"id":"10","user_name":"toyama7","created_at":"2014/01/03 21:58:18","updated_at":"2014/01/03 21:58:18"}
then result becomes as below (indented):
+-----+-----------+---------------------+---------------------+
| id | user_name | created_at | updated_at |
+-----+-----------+---------------------+---------------------+
| 1 | toyama7 | 2014-01-03 21:35:15 | 2014-01-03 21:58:03 |
| 2 | toyama7 | 2014-01-03 21:35:21 | 2014-01-03 21:58:06 |
| 3 | toyama7 | 2014-01-03 21:35:27 | 2014-01-03 21:58:08 |
| 10 | toyama7 | 2014-01-03 21:58:18 | 2014-01-03 21:58:18 |
+-----+-----------+---------------------+---------------------+
if duplicate id , update username and updated_at
<match mysql.input>
@type mysql_bulk
host localhost
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
key_names id,user,created_date,updated_date
table users
flush_interval 10s
</match>
Assume following input is coming:
mysql.input: {"user":"toyama","created_date":"2014/01/03 21:35:15","updated_date":"2014/01/03 21:35:15","dummy":"hogehoge"}
mysql.input: {"user":"toyama2","created_date":"2014/01/03 21:35:21","updated_date":"2014/01/03 21:35:21","dummy":"hogehoge"}
mysql.input: {"user":"toyama3","created_date":"2014/01/03 21:35:27","updated_date":"2014/01/03 21:35:27","dummy":"hogehoge"}
then result becomes as below (indented):
+-----+-----------+---------------------+---------------------+
| id | user_name | created_at | updated_at |
+-----+-----------+---------------------+---------------------+
| 1 | toyama | 2014-01-03 21:35:15 | 2014-01-03 21:35:15 |
| 2 | toyama2 | 2014-01-03 21:35:21 | 2014-01-03 21:35:21 |
| 3 | toyama3 | 2014-01-03 21:35:27 | 2014-01-03 21:35:27 |
+-----+-----------+---------------------+---------------------+
<match mysql.input>
@type mysql_bulk
host localhost
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at
key_names id,user,${time}
table users
flush_interval 10s
</match>
Assume following input is coming:
2014-01-03 21:35:15+09:00: mysql.input: {"user":"toyama","dummy":"hogehoge"}
2014-01-03 21:35:21+09:00: mysql.input: {"user":"toyama2","dummy":"hogehoge"}
2014-01-03 21:35:27+09:00: mysql.input: {"user":"toyama3","dummy":"hogehoge"}
then created_at
column is set from time attribute in a fluentd packet:
+-----+-----------+---------------------+
| id | user_name | created_at |
+-----+-----------+---------------------+
| 1 | toyama | 2014-01-03 21:35:15 |
| 2 | toyama2 | 2014-01-03 21:35:21 |
| 3 | toyama3 | 2014-01-03 21:35:27 |
+-----+-----------+---------------------+
As described above, ${time}
placeholder sets time with Time.at(time).strftime("%Y-%m-%d %H:%M:%S")
.
This handles the time with fluentd server default timezone.
If you want to use the specific timezone, you can use the include_time_key feature.
This is useful in case fluentd server and mysql have different timezone.
You can use various timezone format. See below.
http://docs.fluentd.org/articles/formatter-plugin-overview
<match mysql.input>
@type mysql_bulk
host localhost
database test_app_development
username root
password hogehoge
include_time_key yes
timezone +00
time_format %Y-%m-%d %H:%M:%S
time_key created_at
column_names id,user_name,created_at
key_names id,user,created_at
table users
flush_interval 10s
</match>
Assume following input is coming(fluentd server is using JST +09 timezone):
2014-01-03 21:35:15+09:00: mysql.input: {"user":"toyama","dummy":"hogehoge"}
2014-01-03 21:35:21+09:00: mysql.input: {"user":"toyama2","dummy":"hogehoge"}
2014-01-03 21:35:27+09:00: mysql.input: {"user":"toyama3","dummy":"hogehoge"}
then created_at
column is set from time attribute in a fluentd packet with timezone converted to +00 UTC:
+-----+-----------+---------------------+
| id | user_name | created_at |
+-----+-----------+---------------------+
| 1 | toyama | 2014-01-03 12:35:15 |
| 2 | toyama2 | 2014-01-03 12:35:21 |
| 3 | toyama3 | 2014-01-03 12:35:27 |
+-----+-----------+---------------------+
<match mysql.input>
@type mysql_bulk
host localhost
database test_app_development
username root
password hogehoge
table users
column_names id,user_name,created_at,clicks
aggregate_data true
aggregate_key_list user_name,created_at
on_duplicate_key_update true
on_duplicate_key_operations ["clicks,+"]
flush_interval 10s
</match>
Assume following input is recieved:
mysql.input: {"user_name":"toyama","created_at":"2014/01/03 21:35:15","clicks":1,"dummy":"hogehoge"}
mysql.input: {"user_name":"toyama2","created_at":"2014/01/03 21:35:21","clicks":1,"dummy":"hogehoge"}
mysql.input: {"user_name":"toyama","clicks":1,"dummy":"hogehoge"}
mysql.input: {"user_name":"toyama","clicks":1,"dummy":"hogehoge"}
then result becomes as below (indented):
+-----+-----------+---------------------+---------------------+
| id | user_name | created_at | clicks |
+-----+-----------+---------------------+---------------------+
| 1 | toyama | 2014-01-03 21:35:15 | 3 |
| 2 | toyama2 | 2014-01-03 21:35:21 | 1 |
+-----+-----------+---------------------+---------------------+
bundle install
rake test
divide bulk insert(exsample 1000 per)
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
Copyright (c) 2016 Hiroshi Toyama. See LICENSE for details.