一、Canal介绍
1、应用场景
在前面的统计分析功能中,我们采取了服务调用获取统计数据,这样耦合度高,效率相对较低,目前我采取另一种实现方式,通过实时同步数据库表的方式实现,例如我们要统计每天注册与登录人数,我们只需把会员表同步到统计库中,实现本地统计就可以了,这样效率更高,耦合度更低,Canal就是一个很好的数据库同步工具。canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。
2、Canal环境搭建
canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能 开启mysql服务: service mysql start (或者 systemctl start mysqld.service)
(2.1)检查binlog功能是否有开启 (2.2)如果显示状态为OFF表示该功能未开启,开启binlog功能
1,修改 mysql 的配置文件 my
.cnf
vi
/etc
/my
.cnf
追加内容:
log
-bin
=mysql
-bin #binlog文件名
binlog_format
=ROW #选择row模式
server_id
=1 #mysql实例id
,不能和canal的slaveId重复
2,重启 mysql:
service mysql restart
3,登录 mysql 客户端,查看 log_bin 变量
mysql
> show variables like
'log_bin';
+---------------+-------+
| Variable_name
| Value
|
+---------------+-------+
| log_bin
| ON
|
+---------------+-------+
1 row in set
(0.00 sec
)
————————————————
如果显示状态为ON表示该功能已开启
(2.3)在mysql里面添加以下的相关用户和权限,方便远程访问(加%)
CREATE USER
'canal'@
'%' IDENTIFIED BY
'canal';
GRANT SHOW VIEW
, SELECT
, REPLICATION SLAVE
, REPLICATION CLIENT ON
*.* TO
'canal'@
'%';
FLUSH PRIVILEGES
;
3、 下载安装Canal服务
下载地址:
(1)下载之后,放到目录中,解压文件
cd /usr/local/canal
canal.deployer-1.1.4.tar.gz
tar zxvf canal.deployer-1.1.4.tar.gz
(2)修改配置文件 vi conf/example/instance.properties
#需要改成自己的数据库信息
canal
.instance
.master
.address
=192.168.44.132:3306
#需要改成自己的数据库用户名与密码
canal
.instance
.dbUsername
=canal
canal
.instance
.dbPassword
=canal
#需要改成同步的数据库表规则,例如只是同步一下表
#canal
.instance
.filter
.regex
=.*\\
..*
canal
.instance
.filter
.regex
=guli_ucenter
.ucenter_member
**注: mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子:
所有表:.* or .\…canal schema下所有表: canal\…*canal下的以canal打头的表:canal\.canal.*canal schema下的一张表:canal.test1多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤) **
(3)进入bin目录下启动 sh bin/startup.sh
4、实现数据同步创建canal_client模块
4.1、创建canal_client模块
4.2、引入相关依赖
<dependencies>
<dependency>
<groupId>org
.springframework
.boot
</groupId
>
<artifactId>spring
-boot
-starter
-web
</artifactId
>
</dependency
>
<!--mysql
-->
<dependency>
<groupId>mysql
</groupId
>
<artifactId>mysql
-connector
-java
</artifactId
>
</dependency
>
<!--mysql 数据库操作
-->
<dependency>
<groupId>commons
-dbutils
</groupId
>
<artifactId>commons
-dbutils
</artifactId
>
</dependency
>
<dependency>
<groupId>org
.springframework
.boot
</groupId
>
<artifactId>spring
-boot
-starter
-jdbc
</artifactId
>
</dependency
>
<dependency>
<groupId>com
.alibaba
.otter
</groupId
>
<artifactId>canal
.client
</artifactId
>
</dependency
>
</dependencies
>
4.3、创建application.properties配置文件
# 服务端口
server
.port
=10000
# 服务名
spring
.application
.name
=canal
-client
# 环境设置:dev、test、prod
spring
.profiles
.active
=dev
# mysql数据库连接
spring
.datasource
.driver
-class-name
=com
.mysql
.cj
.jdbc
.Driver
spring
.datasource
.url
=jdbc
:mysql
://localhost
:3306/yunxun
?serverTimezone
=GMT
%2B8
spring
.datasource
.username
=root
spring
.datasource
.password
=root
4.4、创建CanalApplication 启动类
package com
.atguigu
.canal
;
import com
.atguigu
.canal
.client
.CanalClient
;
import org
.springframework
.boot
.CommandLineRunner
;
import org
.springframework
.boot
.SpringApplication
;
import org
.springframework
.boot
.autoconfigure
.SpringBootApplication
;
import javax
.annotation
.Resource
;
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {
@Resource
private CanalClient canalClient
;
public static void main(String
[] args
) {
SpringApplication
.run(CanalApplication
.class, args
);
}
@Override
public void run(String
... strings
) throws Exception
{
canalClient
.run();
}
}
4.5、编写canal客户端类(固定写法)
package com
.atguigu
.canal
.client
;
import com
.alibaba
.otter
.canal
.client
.CanalConnector
;
import com
.alibaba
.otter
.canal
.client
.CanalConnectors
;
import com
.alibaba
.otter
.canal
.protocol
.CanalEntry
.*
;
import com
.alibaba
.otter
.canal
.protocol
.Message
;
import com
.google
.protobuf
.InvalidProtocolBufferException
;
import org
.apache
.commons
.dbutils
.DbUtils
;
import org
.apache
.commons
.dbutils
.QueryRunner
;
import org
.springframework
.stereotype
.Component
;
import javax
.annotation
.Resource
;
import javax
.sql
.DataSource
;
import java
.net
.InetSocketAddress
;
import java
.sql
.Connection
;
import java
.sql
.SQLException
;
import java
.util
.Iterator
;
import java
.util
.List
;
import java
.util
.Queue
;
import java
.util
.concurrent
.ConcurrentLinkedQueue
;
@Component
public class CanalClient {
private Queue
<String> SQL_QUEUE
= new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource
;
public void run() {
CanalConnector connector
= CanalConnectors
.newSingleConnector(new InetSocketAddress("192.168.44.132",
11111), "example", "", "");
int batchSize
= 1000;
try {
connector
.connect();
connector
.subscribe(".*\\..*");
connector
.rollback();
try {
while (true) {
Message message
= connector
.getWithoutAck(batchSize
);
long batchId
= message
.getId();
int size
= message
.getEntries().size();
if (batchId
== -1 || size
== 0) {
Thread
.sleep(1000);
} else {
dataHandle(message
.getEntries());
}
connector
.ack(batchId
);
if (SQL_QUEUE
.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e
) {
e
.printStackTrace();
} catch (InvalidProtocolBufferException e
) {
e
.printStackTrace();
}
} finally {
connector
.disconnect();
}
}
public void executeQueueSql() {
int size
= SQL_QUEUE
.size();
for (int i
= 0; i
< size
; i
++) {
String sql
= SQL_QUEUE
.poll();
System
.out
.println("[sql]----> " + sql
);
this.execute(sql
.toString());
}
}
private void dataHandle(List
<Entry> entrys
) throws InvalidProtocolBufferException
{
for (Entry entry
: entrys
) {
if (EntryType
.ROWDATA
== entry
.getEntryType()) {
RowChange rowChange
= RowChange
.parseFrom(entry
.getStoreValue());
EventType eventType
= rowChange
.getEventType();
if (eventType
== EventType
.DELETE
) {
saveDeleteSql(entry
);
} else if (eventType
== EventType
.UPDATE
) {
saveUpdateSql(entry
);
} else if (eventType
== EventType
.INSERT
) {
saveInsertSql(entry
);
}
}
}
}
private void saveUpdateSql(Entry entry
) {
try {
RowChange rowChange
= RowChange
.parseFrom(entry
.getStoreValue());
List
<RowData> rowDatasList
= rowChange
.getRowDatasList();
for (RowData rowData
: rowDatasList
) {
List
<Column> newColumnList
= rowData
.getAfterColumnsList();
StringBuffer sql
= new StringBuffer("update " + entry
.getHeader().getTableName() + " set ");
for (int i
= 0; i
< newColumnList
.size(); i
++) {
sql
.append(" " + newColumnList
.get(i
).getName()
+ " = '" + newColumnList
.get(i
).getValue() + "'");
if (i
!= newColumnList
.size() - 1) {
sql
.append(",");
}
}
sql
.append(" where ");
List
<Column> oldColumnList
= rowData
.getBeforeColumnsList();
for (Column column
: oldColumnList
) {
if (column
.getIsKey()) {
sql
.append(column
.getName() + "=" + column
.getValue());
break;
}
}
SQL_QUEUE
.add(sql
.toString());
}
} catch (InvalidProtocolBufferException e
) {
e
.printStackTrace();
}
}
private void saveDeleteSql(Entry entry
) {
try {
RowChange rowChange
= RowChange
.parseFrom(entry
.getStoreValue());
List
<RowData> rowDatasList
= rowChange
.getRowDatasList();
for (RowData rowData
: rowDatasList
) {
List
<Column> columnList
= rowData
.getBeforeColumnsList();
StringBuffer sql
= new StringBuffer("delete from " + entry
.getHeader().getTableName() + " where ");
for (Column column
: columnList
) {
if (column
.getIsKey()) {
sql
.append(column
.getName() + "=" + column
.getValue());
break;
}
}
SQL_QUEUE
.add(sql
.toString());
}
} catch (InvalidProtocolBufferException e
) {
e
.printStackTrace();
}
}
private void saveInsertSql(Entry entry
) {
try {
RowChange rowChange
= RowChange
.parseFrom(entry
.getStoreValue());
List
<RowData> rowDatasList
= rowChange
.getRowDatasList();
for (RowData rowData
: rowDatasList
) {
List
<Column> columnList
= rowData
.getAfterColumnsList();
StringBuffer sql
= new StringBuffer("insert into " + entry
.getHeader().getTableName() + " (");
for (int i
= 0; i
< columnList
.size(); i
++) {
sql
.append(columnList
.get(i
).getName());
if (i
!= columnList
.size() - 1) {
sql
.append(",");
}
}
sql
.append(") VALUES (");
for (int i
= 0; i
< columnList
.size(); i
++) {
sql
.append("'" + columnList
.get(i
).getValue() + "'");
if (i
!= columnList
.size() - 1) {
sql
.append(",");
}
}
sql
.append(")");
SQL_QUEUE
.add(sql
.toString());
}
} catch (InvalidProtocolBufferException e
) {
e
.printStackTrace();
}
}
public void execute(String sql
) {
Connection con
= null
;
try {
if(null
== sql
) return;
con
= dataSource
.getConnection();
QueryRunner qr
= new QueryRunner();
int row
= qr
.execute(con
, sql
);
System
.out
.println("update: "+ row
);
} catch (SQLException e
) {
e
.printStackTrace();
} finally {
DbUtils
.closeQuietly(con
);
}
}
}