java dynamic compile
静态编译:编译时就把所有用到的Java代码全都编译成字节码,是一次性编译。
动态编译:在Java程序运行时才把需要的Java代码的编译成字节码,是按需编译。
从JDK1.6开始,引入了Java代码重写过的编译器接口,使得我们可以在运行时编译Java源代码,然后再通过类加载器将编译好的类加载进JVM,这种在运行时编译代码的操作就叫做动态编译。
hbase point
节点 | 端口号 | 协议 | 使用 | 说明 |
---|---|---|---|---|
zookeeper | 2181 | zkCli.sh -server zookeeper1:2181 | 客户端接入 | |
2888,3888 | N/A | 集群内部通讯 | ||
HDFS Namenode | 9000 | HDFS | hdfs dfs -ls hdfs://namenode1:9000/ | 客户端接入 |
50070 | HTTP | http://namenode1:50070/ | 集群监控 | |
HDFS SecondaryNamenode | 50090 | HTTP | http://namenode1:50090/ | secondary监控 |
HDFS Datanode | 50010 | N/A | 客户端接入/其他节点接入 | |
50020 | N/A | |||
50075 | HTTP | http://datanode1:50075/ | 节点监控 | |
HBase Master | 16000 | hbase-client-1.x.x.jar | RegionServer接入 | |
16010 | HTTP | http://namenode1:16010/ | 集群监控 | |
HBase RegionServer | 16020 | N/A | 客户端接入 | |
16030 | HTTP | http://datanode1:16030/ | 节点监控 |
flink module
目录结构
- flink-annotations: Flink自定义的一些注解,用于配置、控制编译等功能。
- flink-clients: Flink客户端,用于向Flink集群提交任务、查询状态等。其中org.apache.flink.client.cli.CliFrontend就是执行./flink run的入口。
- flink-connectors: Flink连接器,相当于Flink读写外部系统的客户端。这些连接器指定了外部存储如何作为Flink的source或sink。例如对于kafka来说,flink-connector-kafka-xx定义了FlinkKafkaConsumer和FlinkKafkaProducer类分别作为Flink的source和sink,实现了对kafka消费和生产的功能。从图二可以看出,flink 1.9目前支持的外部存储有Cassandra、ES、Kafka、Hive等一些开源外部存储。
- flink-container: Flink对docker和kubernetes的支持。
- flink-contrib: 社区开发者提供的一些新特性。
- flink-core: Flink核心的API、类型的定义,包括底层的算子、状态、时间的实现,是Flink最重要的部分。Flink内部的各种参数配置也都定义在这个模块的configuration中。(这部分代码还没怎么看过,就不细讲了)。
- flink-dist: Flink编译好之后的jar包会放在这个文件夹下,也就是网上下载的可执行的版本。其中也包括集群启动、终止的脚本,集群的配置文件等。
- flink-docs: 这个模块并不是Flink的文档,而是Flink文档生成的代码。其中org.apache.flink.docs.configuration.ConfigOptionsDocGenerator是配置文档的生成器,修改相关配置的key或者默认值,重新运行这个类就会更新doc文件夹下的html文件。同样org.apache.flink.docs.rest.RestAPIDocGenerator是Flink RestAPI文档的生成器。
- flink-fliesystems: Flink对各种文件系统的支持,包括HDFS、Azure、AWS S3、阿里云OSS等分布式文件系统。
- flink-formats: Flink对各种格式的数据输入输出的支持。包括Json、CSV、Avro等常用的格式。
- flink-java: Flink java的API,就是写flink应用时用到的map、window、keyBy、State等类或函数的实现。
- flink-jepsen: 对Flink分布式系统正确性的测试,主要验证Flink的容错机制。
- flink-libraries: Flink的高级API,包括CEP(复杂事件处理)、Gelly图处理库等。
- flink-mesos: Flink对mesos集群管理的支持。
- flink-metrics: Flink监控上报。支持上报到influxdb、prometheus等监控系统。具体的使用配置可以在flink-core模块的org.apache.flink.configuration.MetricOptions中找到。
- flink-python: Flink对python的支持,目前还比较弱。
- flink-queryable-state: Flink对可查询状态的支持,其中flink-queryable-state-runtime子模块实现了StateClientProxy和StateServer。这两部分都运行在TaskManager上,StateClientProxy负责接收外部请求,StateServe负责管理内部的queryable state。flink-queryable-state-client-java子模块实现了QueryableStateClient,作为外部系统访问queryable state的客户端。
- flink-runtime: flink运行时核心代码,在第二节细说。
- flink-runtime-web: Flink Web Dashboard的实现。默认启动standalone集群后,访问http://localhost:8081 出现的界面。
- flink-scala: Flink scala的API。
- flink-scala-shell: Flink提供的scala命令行交互接口。
- flink-state-backends: flink状态存储的方式,目前这个模块中只有RocksDBStateBackend,未来可能会支持更多种的状态存储,以适应不同的业务场景。MemoryStateBackend和FsStateBackend的实现并不在这个目录下,而是在flink-runtime目录下。
- flink-streaming-java: Flink Streaming的java API。
- flink-streaming-scala: Flink Streaming的scala API。
- flink-table: Flink Table API,在第三小节中细说。
- flink-yarn: Flink对yarn集群管理的支持。
flink-runtime模块是Flink最核心的模块之一,实现了Flink的运行时框架,如JobManager、TaskManager、ResourceManager、Scheduler、Checkpoint Coordinator
flink-table模块属于Flink的上层API,包括java和scala版本的table-api,以及SQL的解析和SQL的执行。
随着Flink SQL越来越受重视,flink-table从flink-libraries中移了出来,成为了独立的一级目录。Flink 1.9中,阿里把blink-planner开源了出来,这样整个flink-table中就有了2个planner。从长期来看,流批的统一是一个趋势,因此blink-planner只使用了StreamTableEnvironment中相关的API,而没有使用BatchTableEnvironment,将批当做一个有限的流来处理,希望通过这种方式实现流和批的统一。由于blink-table-planner更好的支持流批统一,且性能更好,在未来的版本中,很有可能完全替代flink-table-planner的功能,而flink-table-planner可能将会被移除。
mysql HA & keepalived
mysql数据备份
方案二:双主机HA部署
前提:准备两个机器master1(172.20.3.113)和master2(172.20.3.114),且分别安装了mysql,其中IP地址根据生产具体ip进行替换
一、配置my.cnf信息
配置/etc/my.cnf文件(从mysql5.7开始不会自动生成my.cnf文件,所以需要手动创建)my.cnf文件内容大致如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19[mysql]
default-character-set=utf8 #设置mysql客户端默认字符集
[mysqld]
port = 3306 #可自行更改端口
basedir=/usr/local/mysql
datadir=/usr/local/mysql/data
max_connections = 500 #最大连接数
log_bin=mysql-bin
server_id = 1 #机器1设置为1,机器2设置为2
binlog_format=ROW
auto-increment-increment = 2 #字段变化增量值
auto-increment-offset = 1 #机器1设置为1,机器2设置为2
slave-skip-errors = all #忽略所有复制产生的错误
gtid_mode=ON
enforce-gtid-consistency=ON
character-set-server = utf8
default-storage-engine = INNODB
lower_case_table_names = 1[mysql]代表我们使用mysql命令登录mysql数据库时的默认设置
[mysqld]代表数据库自身的默认设置
注意:机器1和机器2只有server-id不同和auto-increment-offset不同,其他必须相同。
部分配置项解释如下:
binlog_format= ROW:指定mysql的binlog日志的格式,日志中会记录成每一行数据被修改的形式,然后在 slave 端再对相同的数据进行修改。
auto-increment-increment= 2:表示自增长字段每次递增的量,其默认值是1。它的值应设为整个结构中服务器的总数,本案例用到两台服务器,所以值设为2。
auto-increment-offset= 2:用来设定数据库中自动增长的起点(即初始值),因为这两能服务器都设定了一次自动增长值2,所以它们的起点必须得不同,这样才能避免两台服务器数据同步时出现主键冲突。
注:另外还可以在my.cnf配置文件中,添加“binlog_do_db=数据库名”配置项(可以添加多个)来指定要同步的数据库。如果配置了这个配置项,如果没添加在该配置项后面的数据库,则binlog不记录它的事件。
切换到datacanvas用户进行mysql启动服务 (建议)
1
/usr/local/mysql/support-files/mysql.server start
或者在已经创建软连接的前提下,切换到root用户,并启动mysql服务
1
service mysql restart
客户端登录
1
/usr/local/mysql/bin/mysql -uroot -p
设置可远程登录root用户
1
2GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
FLUSH PRIVILEGES;注意:上面的密码’123456’修改成真实的root密码
开始设置双主备份
在master1上操作
1
2
3
4
5
6
7
8
9先在master2上执行,
show master status;(获取master_log_file和master_log_pos信息)
在master1上执行
change master to master_host='172.20.3.114',master_port=3306,master_user='rt',master_password='rt123',master_log_file='mysql-bin.000003',master_log_pos=194;
start slave;
show slave status\G在master2上操作
1
2
3
4
5
6
7
8先在master1上执行,
show master status;(获取master_log_file和master_log_pos信息)
在master2上执行
change master to master_host='172.20.3.113',master_port=3306,master_user='rt',master_password='rt123',master_log_file='mysql-bin.000004',master_log_pos=194;
start slave;
show slave status\G
二、keepalived安装配置
需要在master1和master2的机器上安装keepalived服务,安装过程大致如下:
通过地址https://pkgs.org/download/keepalived下载相应的安装版本,然后解压的相关目录。
源码的安装一般由3个步骤组成:配置(configure)、编译(make)、安装( make install)
1
./configure --prefix=/usr/local/keepalived
如果提示错误信息
1
2
3configure: error:
!!! OpenSSL is not properly installed on your system. !!!
!!! Can not include OpenSSL headers files. !!!需要安装yum install openssl openssl-devel(RedHat系统),
再次执行./configure –prefix=/usr/local/keepalived在安装目录执行
make && make install
进行编译安装keepalived配置文件,默认情况下keepalived启动时会去/etc/keepalived目录下加载配置文件keepalived.conf
1 | ! Configuration File forkeepalived |
注意:参数priority两个服务器配置不同,其中virtual_ipaddress是虚拟ip,之后项目可通过访问 172.20.3.200:3306进行访问双主mysql机群。
上述配置中会涉及/usr/local/keepalived/kill_keepalived.sh,分别在两台服务器上编写kill_keepalived.sh脚本内容:
1 | #!/bin/bash |
然后给脚本加权限
1 | chmod +x /usr/local/keepalived/kill_keepalived.sh |
- 启动keepalived服务如果启动失败,尝试输入
1
service keepalived start
pkill -9 keepalived
,然后再尝试重启
三、访问双主mysql集群
两台机器的mysql和keepalived配置完成之后,即可在项目中,通过访问虚拟ip地址(172.20.3.200:3306)进行mysql集群的访问。
mysql backup plan
mysql数据备份
方案一:定期备份数据库数据文件
一、编写shell脚本
脚本文件backup_mysql.sh信息如下:
1 | #用户名 |
该脚本实现的功能:备份指定数据库的数据信息到指定目录,并只保存指定数量的最新文件。
注意:脚本中需要补全脚本中的password和database_name信息,可修改备份保存路径backup_path,以及最多保存的备份文件数量count。
编写完脚本信息之后,需要给脚本赋予可执行权限 chmod +x backup_mysql.sh
二、设定定时任务crontab
运行crontab -e命令,打开一个可编辑的文本,输入0 1 * * * /path/to/backup_mysql.sh
保本并退出即添加完成。
注意:其中0 1 * * *
,表示每天凌晨1点进行备份操作,可自行修改1的值(范围0~23)
其中路径信息/path/to/backup_mysql.sh
需要修改为实际的脚本路径。
flink cdc
flink cep
flink watermark
Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。
由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。
Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。
Watermark的产生和Apache Flink内部处理逻辑如下图所示:
产生方式
Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。 在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
flink state
what ?
State是指流计算过程中计算节点的中间计算结果或元数据属性,比如 在aggregation过程中要在state中记录中间聚合结果,比如 Apache Kafka 作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)。所以Apache Flink中的State就是与时间相关的,Apache Flink任务的内部数据(计算数据和元数据属性)的快照。
why ?
与批计算相比,State是流计算特有的,批计算没有failover机制,要么成功,要么重新计算。流计算在 大多数场景 下是增量计算,数据逐条处理(大多数场景),每次计算是在上一次计算结果之上进行处理的,这样的机制势必要将上一次的计算结果进行存储(生产模式要持久化),另外由于 机器,网络,脏数据等原因导致的程序错误,在重启job时候需要从成功的检查点(checkpoint,后面篇章会专门介绍)进行state的恢复。增量计算,Failover这些机制都需要state的支撑。
how ?
存储实现
基于内存的HeapStateBackend - 在debug模式使用,不 建议在生产模式下应用;
基于HDFS的FsStateBackend - 分布式文件持久化,每次读写都产生网络IO,整体性能不佳;
基于RocksDB的RocksDBStateBackend - 本地文件+异步HDFS持久化;
Apache Flink版本选择用RocksDB+HDFS的方式进行State的存储,State存储分两个阶段,首先本地存储到RocksDB,然后异步的同步到远程的HDFS。 这样而设计既消除了HeapStateBackend的局限(内存大小,机器坏掉丢失等),也减少了纯分布式存储的网络IO开销。
还有一个是基于Niagara(Alibaba内部实现)NiagaraStateBackend - 分布式持久化- 在Alibaba生产环境应用;
分类
通过算子和数据层面划分
算子类state
KeyedState - 这里面的key是我们在SQL语句中对应的GroupBy/PartitioneBy里面的字段,key的值就是groupby/PartitionBy字段组成的Row的字节数组,每一个key都有一个属于自己的State,key与key之间的State是不可见的
数据类state
OperatorState - Apache Flink内部的Source Connector的实现中就会用OperatorState来记录source数据读取的offset。
checkpoint
checkpoint是使Flink 能从故障恢复的一种内部机制。检查点是 Flink 应用状态的一个一致性副本,包括了输入的读取位点。在发生故障时,Flink 通过从检查点加载应用程序状态来恢复,并从恢复的读取位点继续处理,就好像什么事情都没发生一样。Flink的状态存储在Flink的内部,这样做的好处就是不再依赖外部系统,降低了对外部系统的依赖,在Flink的内部,通过自身的进程去访问状态变量.同时会定期的做checkpoint持久化,把checkpoint存储在一个分布式的持久化系统中,如果发生故障,就会从最近的一次checkpoint中将整个流的状态进行恢复.