javascript
springboot实现增量备份-ag凯发k8国际
springboot canal数据同步ag凯发k8国际的解决方案
一、需求
微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变更进行增量订阅&消费的业务。无论是canal实验需要还是为了增量备份、主从复制和恢复,都是需要开启mysql-binlog日志,数据目录设置到不同的磁盘分区可以降低io等待。
canal 工作原理
canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave ,向 mysql master 发送dump 协议
mysql master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
二、部署环境
1、登录mysql查看是否开启binlog,标红的log_bin默认是off关
mysql> show variables like 'log_%';
---------------------------------------- -------------------------------------------------------
| variable_name | value |
---------------------------------------- -------------------------------------------------------
| **log_bin | off** |
| log_bin_basename | |
| log_bin_index | |
| log_bin_trust_function_creators | off |
| log_bin_use_v1_row_events | off |
| log_builtin_as_identified_by_password | off |
| log_error | f:\tools\mysql-5.7.28-winx64\data\desktop-c1lu9iq.err |
| log_error_verbosity | 3 |
| log_output | file |
| log_queries_not_using_indexes | off |
| log_slave_updates | off |
| log_slow_admin_statements | off |
| log_slow_slave_statements | off |
| log_statements_unsafe_for_binlog | on |
| log_syslog | on |
| log_syslog_tag | |
| log_throttle_queries_not_using_indexes | 0 |
| log_timestamps | utc |
| log_warnings | 2 |
---------------------------------------- -------------------------------------------------------
19 rows in set (0.03 sec)
复制代码
2、编辑配置文件
[mysqld]
# 设置3306端口
port=3306
# 设置mysql的安装目录,按照个人的实际需要改
basedir=f:\\tools\\mysql-5.7.28-winx64 # 切记此处一定要用双斜杠\\,单斜杠我这里会出错,不过看别人的教程,有的是单斜杠。自己尝试吧
# 设置mysql数据库的数据的存放目录
datadir=f:\\tools\\mysql-5.7.28-winx64\\data # 此处同上
# 允许最大连接数
max_connections=200
# 允许连接失败的次数。这是为了防止有人从该主机试图攻击数据库系统
max_connect_errors=10
# 服务端使用的字符集默认为utf8
character-set-server=utf8
# 创建新表时将使用的默认存储引擎
default-storage-engine=innodb
# 默认使用“mysql_native_password”插件认证
default_authentication_plugin=mysql_native_password
lower_case_table_names=2
sql_mode = strict_trans_tables,no_zero_in_date,no_zero_date,error_for_division_by_zero,no_auto_create_user,no_engine_substitution
max_connections=1000
#实验重点配置
# 开启 binlog
log-bin=mysql-bin
# 选择 row 模式
binlog-format=row
# 配置 mysql replaction 需要定义,不要和 canal 的 slaveid 重复
server_id=1
[mysql]
# 设置mysql客户端默认字符集
default-character-set=utf8
[client]
# 设置mysql客户端连接服务端时默认使用的端口
port=3306
default-character-set=utf8
复制代码
3、创建mysql slave 的权限canal账户并且进行远程连接授权
create user canal identified by 'canal';
grant select, replication slave, replication client on *.* to 'canal'@'%';
-- grant all privileges on *.* to 'canal'@'%' ;
flush privileges;
复制代码
4、记得重启mysql服务
linux:
systemctl restart mysqld
window:
net stop mysql;
net start mysql;
复制代码
三、canal快速部署配置
1、修改配置conf/example/instance.properties
## mysql serverid
canal.instance.mysql.slaveid = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbusername = canal
canal.instance.dbpassword = canal
canal.instance.defaultdatabasename =
canal.instance.connectioncharset = utf-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
复制代码
2、通过启动脚本运行:sh bin/startup.sh
3、查看 server 日志和instance 的日志
$ tail -f logs/canal/canal.log
2020-05-28 13:52:03.037 [main] info com.alibaba.otter.canal.deployer.canallauncher - ## set default uncaught exception handler
2020-05-28 13:52:03.065 [main] info com.alibaba.otter.canal.deployer.canallauncher - ## load canal configurations
2020-05-28 13:52:03.072 [main] info com.alibaba.otter.canal.deployer.canalstarter - ## start the canal server.
2020-05-28 13:52:03.444 [main] info com.alibaba.otter.canal.deployer.canalcontroller - ## start the canal server[172.36.58.25(172.36.58.25):11111]
2020-05-28 13:52:04.604 [main] info com.alibaba.otter.canal.deployer.canalstarter - ## the canal server is running now ......
$ tail -f logs/example/example.log
2020-05-28 13:52:04.238 [main] warn o.s.beans.generictypeawarepropertydescriptor - invalid javabean property 'connectioncharset' being accessed! ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.abstractmysqleventparser.setconnectioncharset(java.lang.string)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.abstractmysqleventparser.setconnectioncharset(java.nio.charset.charset)]
2020-05-28 13:52:04.264 [main] info c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [canal.properties]
2020-05-28 13:52:04.265 [main] info c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [example/instance.properties]
2020-05-28 13:52:04.568 [main] info c.a.otter.canal.instance.spring.canalinstancewithspring - start cannalinstance for 1-example
2020-05-28 13:52:04.572 [main] warn c.a.o.canal.parse.inbound.mysql.dbsync.logeventconvert - --> init table filter : ^.*\..*$
2020-05-28 13:52:04.573 [main] warn c.a.o.canal.parse.inbound.mysql.dbsync.logeventconvert - --> init table black filter :
2020-05-28 13:52:04.577 [main] info c.a.otter.canal.instance.core.abstractcanalinstance - start successful....
2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , eventparser] warn c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - ---> begin to find start position, it will be long time for reset or first position
2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , eventparser] warn c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - prepare to find start position just show master status
2020-05-28 13:52:06.556 [destination = example , address = /127.0.0.1:3306 , eventparser] warn c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - ---> find start position successfully, entryposition[included=false,journalname=mysql-bin.000001,position=4,serverid=1,gtid=,timestamp=1590644973000] cost : 1935ms , the next step is binlog dump
复制代码
四、初步监听实验
com.alibaba.otter
canal.client
1.1.0
复制代码import java.net.inetsocketaddress;
import java.util.list;
import com.alibaba.otter.canal.client.canalconnectors;
import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.common.utils.addressutils;
import com.alibaba.otter.canal.protocol.message;
import com.alibaba.otter.canal.protocol.canalentry.column;
import com.alibaba.otter.canal.protocol.canalentry.entry;
import com.alibaba.otter.canal.protocol.canalentry.entrytype;
import com.alibaba.otter.canal.protocol.canalentry.eventtype;
import com.alibaba.otter.canal.protocol.canalentry.rowchange;
import com.alibaba.otter.canal.protocol.canalentry.rowdata;
public class simplecanalclientexample {
public static void main(string args[]) {
// 创建链接
canalconnector connector = canalconnectors.newsingleconnector(new inetsocketaddress(addressutils.gethostip(),
11111), "example", "", "");
int batchsize = 1000;
int emptycount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalemptycount = 120;
while (emptycount < totalemptycount) {
message message = connector.getwithoutack(batchsize); // 获取指定数量的数据
long batchid = message.getid();
int size = message.getentries().size();
if (batchid == -1 || size == 0) {
emptycount ;
system.out.println("empty count : " emptycount);
try {
thread.sleep(1000);
} catch (interruptedexception e) {
}
} else {
emptycount = 0;
// system.out.printf("message[batchid=%s,size=%s] \n", batchid, size);
printentry(message.getentries());
}
connector.ack(batchid); // 提交确认
// connector.rollback(batchid); // 处理失败, 回滚数据
}
system.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printentry(list entrys) {
for (entry entry : entrys) {
if (entry.getentrytype() == entrytype.transactionbegin || entry.getentrytype() == entrytype.transactionend) {
continue;
}
rowchange rowchage = null;
try {
rowchage = rowchange.parsefrom(entry.getstorevalue());
} catch (exception e) {
throw new runtimeexception("error ## parser of eromanga-event has an error , data:" entry.tostring(),
e);
}
eventtype eventtype = rowchage.geteventtype();
system.out.println(string.format("================> binlog[%s:%s] , name[%s,%s] , eventtype : %s",
entry.getheader().getlogfilename(), entry.getheader().getlogfileoffset(),
entry.getheader().getschemaname(), entry.getheader().gettablename(),
eventtype));
for (rowdata rowdata : rowchage.getrowdataslist()) {
if (eventtype == eventtype.delete) {
printcolumn(rowdata.getbeforecolumnslist());
} else if (eventtype == eventtype.insert) {
printcolumn(rowdata.getaftercolumnslist());
} else {
system.out.println("-------> before");
printcolumn(rowdata.getbeforecolumnslist());
system.out.println("-------> after");
printcolumn(rowdata.getaftercolumnslist());
}
}
}
}
private static void printcolumn(list columns) {
for (column column : columns) {
system.out.println(column.getname() " : " column.getvalue() " update=" column.getupdated());
}
}
}
复制代码
随便插入数据触发
insert into `demo`.`tb_ad`(`id`, `url`, `status`, `position`, `image`, `start_time`, `end_time`) values (1, 'https://www.baidu.com/', '1', 'web_index_lb', 'https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-vip-680.jpeg', '2020-05-22 10:58:08', '2021-06-01 10:58:14');
复制代码
从控制台中看到
empty count : 66
empty count : 67
empty count : 68
empty count : 69
empty count : 70
================> binlog[mysql-bin.000001:355] , name[demo,tb_ad] , eventtype : insert
id : 2 update=true
url : https://www.baidu.com/ update=true
status : 1 update=true
position : web_index_lb update=true
image : https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-vip-680.jpeg update=true
start_time : 2020-05-22 10:58:08 update=true
end_time : 2021-06-01 10:58:14 update=true
复制代码
五、数据监控微服务
top.javatool
canal-spring-boot-starter
1.2.1-release
复制代码
订阅数据库的增删改操作
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.stereotype.component;
import top.javatool.canal.client.annotation.canaltable;
import top.javatool.canal.client.handler.entryhandler;
@component
@canaltable(value = "t_user")
public class userhandler implements entryhandler {
private logger logger = loggerfactory.getlogger(userhandler.class);
public void insert(user user) {
logger.info("insert message {}", user);
}
public void update(user before, user after) {
logger.info("update before {} ", before);
logger.info("update after {}", after);
}
public void delete(user user) {
logger.info("delete {}", user);
}
}
复制代码
启动数据监控微服务,修改user表,观察控制台输出。
2020-05-28 16:23:22.667 info 24284 --- [l-client-thread] t.j.c.client.client.abstractcanalclient : 获取消息 message[id=23,entries=[header {
version: 1
logfilename: "mysql-bin.000001"
logfileoffset: 18380
serverid: 1
serverencode: "utf-8"
executetime: 1590654201000
sourcetype: mysql
schemaname: ""
tablename: ""
eventlength: 68
}
entrytype: transactionbegin
storevalue: " \025"
, header {
version: 1
logfilename: "mysql-bin.000001"
logfileoffset: 18505
serverid: 1
serverencode: "utf-8"
executetime: 1590654201000
sourcetype: mysql
schemaname: "demo"
tablename: "t_user"
eventlength: 88
eventtype: update
props {
key: "rowscount"
value: "1"
}
}
entrytype: rowdata
storevalue: "\b\210\002\020\002p\000b\370\003\n\033\b\000\020\004\032\002id \001(\0000\000b\00221r\aint(11)\n*\b\001\020\f\032\tuser_name \000(\0000\000b\005zeldar\fvarchar(255)\n*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000b\0010r\ntinyint(4)\n\"\b\003\020\004\032\ncountry_id \000(\0000\000b\0011r\aint(11)\n&\b\004\020[\032\bbirthday \000(\0000\000b\n1998-04-18r\004date\n7\b\005\020]\032\vcreate_time \000(\0000\000b\0231991-01-10 05:45:50r\ttimestamp\022\033\b\000\020\004\032\002id \001(\0000\000b\00221r\aint(11)\022.\b\001\020\f\032\tuser_name \000(\0010\000b\tzelda1111r\fvarchar(255)\022*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000b\0010r\ntinyint(4)\022\"\b\003\020\004\032\ncountry_id \000(\0000\000b\0011r\aint(11)\022&\b\004\020[\032\bbirthday \000(\0000\000b\n1998-04-18r\004date\0227\b\005\020]\032\vcreate_time \000(\0000\000b\0231991-01-10 05:45:50r\ttimestamp"
, header {
version: 1
logfilename: "mysql-bin.000001"
logfileoffset: 18593
serverid: 1
serverencode: "utf-8"
executetime: 1590654201000
sourcetype: mysql
schemaname: ""
tablename: ""
eventlength: 31
}
entrytype: transactionend
storevalue: "\022\0041574"
],raw=false,rawentries=[]]
2020-05-28 16:23:22.668 info 24284 --- [xecute-thread-6] t.j.canal.example.handler.userhandler : update before user{id=null, username='zelda', gender=null, countryid=null, birthday=null, createtime=null}
2020-05-28 16:23:22.668 info 24284 --- [xecute-thread-6] t.j.canal.example.handler.userhandler : update after user{id=21, username='zelda1111', gender=0, countryid=1, birthday=sat apr 18 00:00:00 cst 1998, createtime=thu jan 10 05:45:50 cst 1991}
复制代码
关于找一找教程网
本站文章仅代表作者观点,不代表本站立场,所有文章非营利性免费分享。
本站提供了软件编程、网站开发技术、服务器运维、人工智能等等it技术文章,希望广大程序员努力学习,让我们用科技改变世界。
[springboot canal数据同步ag凯发k8国际的解决方案]http://www.zyiz.net/tech/detail-137645.html
总结
以上是ag凯发k8国际为你收集整理的springboot实现增量备份_springboot canal数据同步ag凯发k8国际的解决方案的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇:
- 下一篇: