0%

静态编译:编译时就把所有用到的Java代码全都编译成字节码,是一次性编译。

动态编译:在Java程序运行时才把需要的Java代码的编译成字节码,是按需编译。

从JDK1.6开始,引入了Java代码重写过的编译器接口,使得我们可以在运行时编译Java源代码,然后再通过类加载器将编译好的类加载进JVM,这种在运行时编译代码的操作就叫做动态编译。


【Java动态编译】动态编译的应用_牛客博客

Java动态性(1) - 动态编译(DynamicCompile)

节点 端口号 协议 使用 说明
zookeeper 2181 zkCli.sh -server zookeeper1:2181 客户端接入
2888,3888 N/A 集群内部通讯
HDFS Namenode 9000 HDFS hdfs dfs -ls hdfs://namenode1:9000/ 客户端接入
50070 HTTP http://namenode1:50070/ 集群监控
HDFS SecondaryNamenode 50090 HTTP http://namenode1:50090/ secondary监控
HDFS Datanode 50010 N/A 客户端接入/其他节点接入
50020 N/A
50075 HTTP http://datanode1:50075/ 节点监控
HBase Master 16000 hbase-client-1.x.x.jar RegionServer接入
16010 HTTP http://namenode1:16010/ 集群监控
HBase RegionServer 16020 N/A 客户端接入
16030 HTTP http://datanode1:16030/ 节点监控

目录结构

  • flink-annotations: Flink自定义的一些注解,用于配置、控制编译等功能。
  • flink-clients: Flink客户端,用于向Flink集群提交任务、查询状态等。其中org.apache.flink.client.cli.CliFrontend就是执行./flink run的入口。
  • flink-connectors: Flink连接器,相当于Flink读写外部系统的客户端。这些连接器指定了外部存储如何作为Flink的source或sink。例如对于kafka来说,flink-connector-kafka-xx定义了FlinkKafkaConsumer和FlinkKafkaProducer类分别作为Flink的source和sink,实现了对kafka消费和生产的功能。从图二可以看出,flink 1.9目前支持的外部存储有Cassandra、ES、Kafka、Hive等一些开源外部存储。
  • flink-container: Flink对docker和kubernetes的支持。
  • flink-contrib: 社区开发者提供的一些新特性。
  • flink-core: Flink核心的API、类型的定义,包括底层的算子、状态、时间的实现,是Flink最重要的部分。Flink内部的各种参数配置也都定义在这个模块的configuration中。(这部分代码还没怎么看过,就不细讲了)。
  • flink-dist: Flink编译好之后的jar包会放在这个文件夹下,也就是网上下载的可执行的版本。其中也包括集群启动、终止的脚本,集群的配置文件等。
  • flink-docs: 这个模块并不是Flink的文档,而是Flink文档生成的代码。其中org.apache.flink.docs.configuration.ConfigOptionsDocGenerator是配置文档的生成器,修改相关配置的key或者默认值,重新运行这个类就会更新doc文件夹下的html文件。同样org.apache.flink.docs.rest.RestAPIDocGenerator是Flink RestAPI文档的生成器。
  • flink-fliesystems: Flink对各种文件系统的支持,包括HDFS、Azure、AWS S3、阿里云OSS等分布式文件系统。
  • flink-formats: Flink对各种格式的数据输入输出的支持。包括Json、CSV、Avro等常用的格式。
  • flink-java: Flink java的API,就是写flink应用时用到的map、window、keyBy、State等类或函数的实现。
  • flink-jepsen: 对Flink分布式系统正确性的测试,主要验证Flink的容错机制。
  • flink-libraries: Flink的高级API,包括CEP(复杂事件处理)、Gelly图处理库等。
  • flink-mesos: Flink对mesos集群管理的支持。
  • flink-metrics: Flink监控上报。支持上报到influxdb、prometheus等监控系统。具体的使用配置可以在flink-core模块的org.apache.flink.configuration.MetricOptions中找到。
  • flink-python: Flink对python的支持,目前还比较弱。
  • flink-queryable-state: Flink对可查询状态的支持,其中flink-queryable-state-runtime子模块实现了StateClientProxy和StateServer。这两部分都运行在TaskManager上,StateClientProxy负责接收外部请求,StateServe负责管理内部的queryable state。flink-queryable-state-client-java子模块实现了QueryableStateClient,作为外部系统访问queryable state的客户端。
  • flink-runtime: flink运行时核心代码,在第二节细说。
  • flink-runtime-web: Flink Web Dashboard的实现。默认启动standalone集群后,访问http://localhost:8081 出现的界面。
  • flink-scala: Flink scala的API。
  • flink-scala-shell: Flink提供的scala命令行交互接口。
  • flink-state-backends: flink状态存储的方式,目前这个模块中只有RocksDBStateBackend,未来可能会支持更多种的状态存储,以适应不同的业务场景。MemoryStateBackend和FsStateBackend的实现并不在这个目录下,而是在flink-runtime目录下。
  • flink-streaming-java: Flink Streaming的java API。
  • flink-streaming-scala: Flink Streaming的scala API。
  • flink-table: Flink Table API,在第三小节中细说。
  • flink-yarn: Flink对yarn集群管理的支持。

  • flink-runtime模块是Flink最核心的模块之一,实现了Flink的运行时框架,如JobManager、TaskManager、ResourceManager、Scheduler、Checkpoint Coordinator

  • flink-table模块属于Flink的上层API,包括java和scala版本的table-api,以及SQL的解析和SQL的执行。

    随着Flink SQL越来越受重视,flink-table从flink-libraries中移了出来,成为了独立的一级目录。Flink 1.9中,阿里把blink-planner开源了出来,这样整个flink-table中就有了2个planner。从长期来看,流批的统一是一个趋势,因此blink-planner只使用了StreamTableEnvironment中相关的API,而没有使用BatchTableEnvironment,将批当做一个有限的流来处理,希望通过这种方式实现流和批的统一。由于blink-table-planner更好的支持流批统一,且性能更好,在未来的版本中,很有可能完全替代flink-table-planner的功能,而flink-table-planner可能将会被移除。

mysql数据备份

方案二:双主机HA部署

前提:准备两个机器master1(172.20.3.113)和master2(172.20.3.114),且分别安装了mysql,其中IP地址根据生产具体ip进行替换

一、配置my.cnf信息

  • 配置/etc/my.cnf文件(从mysql5.7开始不会自动生成my.cnf文件,所以需要手动创建)my.cnf文件内容大致如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    [mysql]
    default-character-set=utf8 #设置mysql客户端默认字符集
    [mysqld]
    port = 3306 #可自行更改端口
    basedir=/usr/local/mysql
    datadir=/usr/local/mysql/data
    max_connections = 500 #最大连接数
    log_bin=mysql-bin
    server_id = 1 #机器1设置为1,机器2设置为2
    binlog_format=ROW
    auto-increment-increment = 2 #字段变化增量值
    auto-increment-offset = 1 #机器1设置为1,机器2设置为2
    slave-skip-errors = all #忽略所有复制产生的错误
    gtid_mode=ON
    enforce-gtid-consistency=ON

    character-set-server = utf8
    default-storage-engine = INNODB
    lower_case_table_names = 1
    • [mysql]代表我们使用mysql命令登录mysql数据库时的默认设置

    • [mysqld]代表数据库自身的默认设置

      注意:机器1和机器2只有server-id不同和auto-increment-offset不同,其他必须相同。

      部分配置项解释如下:

      binlog_format= ROW:指定mysql的binlog日志的格式,日志中会记录成每一行数据被修改的形式,然后在 slave 端再对相同的数据进行修改。

      auto-increment-increment= 2:表示自增长字段每次递增的量,其默认值是1。它的值应设为整个结构中服务器的总数,本案例用到两台服务器,所以值设为2。

      auto-increment-offset= 2:用来设定数据库中自动增长的起点(即初始值),因为这两能服务器都设定了一次自动增长值2,所以它们的起点必须得不同,这样才能避免两台服务器数据同步时出现主键冲突。

      注:另外还可以在my.cnf配置文件中,添加“binlog_do_db=数据库名”配置项(可以添加多个)来指定要同步的数据库。如果配置了这个配置项,如果没添加在该配置项后面的数据库,则binlog不记录它的事件。

  • 切换到datacanvas用户进行mysql启动服务 (建议)

    1
    /usr/local/mysql/support-files/mysql.server start

    或者在已经创建软连接的前提下,切换到root用户,并启动mysql服务

    1
    service mysql restart
  • 客户端登录

    1
    /usr/local/mysql/bin/mysql -uroot -p

    设置可远程登录root用户

    1
    2
    GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
    FLUSH PRIVILEGES;

    注意:上面的密码’123456’修改成真实的root密码

开始设置双主备份

  • 在master1上操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    先在master2上执行,
    show master status;(获取master_log_file和master_log_pos信息)

    在master1上执行
    change master to master_host='172.20.3.114',master_port=3306,master_user='rt',master_password='rt123',master_log_file='mysql-bin.000003',master_log_pos=194;

    start slave;

    show slave status\G
  • 在master2上操作

    1
    2
    3
    4
    5
    6
    7
    8
    先在master1上执行,
    show master status;(获取master_log_file和master_log_pos信息)
    在master2上执行
    change master to master_host='172.20.3.113',master_port=3306,master_user='rt',master_password='rt123',master_log_file='mysql-bin.000004',master_log_pos=194;

    start slave;

    show slave status\G

二、keepalived安装配置

需要在master1和master2的机器上安装keepalived服务,安装过程大致如下:

  • 通过地址https://pkgs.org/download/keepalived下载相应的安装版本,然后解压的相关目录。

  • 源码的安装一般由3个步骤组成:配置(configure)、编译(make)、安装( make install)

    1
    ./configure --prefix=/usr/local/keepalived

    如果提示错误信息

    1
    2
    3
    configure: error: 
    !!! OpenSSL is not properly installed on your system. !!!
    !!! Can not include OpenSSL headers files. !!!

    需要安装yum install openssl openssl-devel(RedHat系统),
    再次执行./configure –prefix=/usr/local/keepalived

  • 在安装目录执行make && make install进行编译安装

  • keepalived配置文件,默认情况下keepalived启动时会去/etc/keepalived目录下加载配置文件keepalived.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
! Configuration File forkeepalived
global_defs {
notification_email {
[email protected]
}
notification_email_from [email protected]
smtp_server 127.0.0.1
smtp_connect_timeout 30
router_id MYSQL_HA #标识,双主相同
}
vrrp_instance VI_1 {
state BACKUP #两台都设置BACKUP
interface eth0 #网卡名称
virtual_router_id 51 #主备相同
priority 100 #优先级,另一台改为90
advert_int 1
nopreempt #不抢占,只在优先级高的机器上设置即可,优先级低的机器不设置
authentication {
auth_type PASS #鉴权,默认通过
auth_pass 1111 # 鉴权访问密码
}
virtual_ipaddress {
172.20.3.200 #虚拟ip
}
}

virtual_server 172.20.3.200 3306 {
delay_loop 2 #每个2秒检查一次real_server状态
lb_algo wrr #LVS算法
lb_kind DR #LVS模式
persistence_timeout 60 #会话保持时间
protocol TCP
real_server 172.20.3.113 3306 {
weight 1 #指定了当前主机的权重
notify_down /usr/local/keepalived/kill_keepalived.sh #检测到服务down后执行的脚本
TCP_CHECK {
connect_timeout 10 #连接超时时间
delay_before_retry 3 #重连间隔时间
connect_port 3306 #健康检查端口
}
}
real_server 172.20.3.114 3306 {
weight 2
notify_down /usr/local/keepalived/kill_keepalived.sh #检测到服务down后执行的脚本
TCP_CHECK {
connect_timeout 10
delay_before_retry 3
connect_port 3306
}
}
}

注意:参数priority两个服务器配置不同,其中virtual_ipaddress是虚拟ip,之后项目可通过访问 172.20.3.200:3306进行访问双主mysql机群。

上述配置中会涉及/usr/local/keepalived/kill_keepalived.sh,分别在两台服务器上编写kill_keepalived.sh脚本内容:

1
2
#!/bin/bash
pkill keepalived

然后给脚本加权限

1
chmod +x /usr/local/keepalived/kill_keepalived.sh
  • 启动keepalived服务
    1
    service keepalived start
    如果启动失败,尝试输入pkill -9 keepalived,然后再尝试重启

三、访问双主mysql集群

两台机器的mysql和keepalived配置完成之后,即可在项目中,通过访问虚拟ip地址(172.20.3.200:3306)进行mysql集群的访问。

mysql数据备份

方案一:定期备份数据库数据文件

一、编写shell脚本

脚本文件backup_mysql.sh信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#用户名
username=root
#密码
password=填写密码
#将要备份的数据库
database_name=填写需要备份的数据库

#保存备份文件最多个数
count=30
#备份保存路径
backup_path=/data/mysql_backup
#日期
date_time=`date +%Y-%m-%d-%H-%M`

#如果文件夹不存在则创建
if [ ! -d $backup_path ];
then
mkdir -p $backup_path;
fi
#开始备份
mysqldump -u $username -p$password $database_name > $backup_path/$database_name-$date_time.sql
#开始压缩
cd $backup_path
tar -zcvf $database_name-$date_time.tar.gz $database_name-$date_time.sql
#删除源文件
rm -rf $backup_path/$database_name-$date_time.sql
#更新备份日志
echo "create $backup_path/$database_name-$date_time.tar.gz" >> $backup_path/dump.log

#找出需要删除的备份
delfile=`ls -l -crt $backup_path/*.tar.gz | awk '{print $9 }' | head -1`

#判断现在的备份数量是否大于阈值
number=`ls -l -crt $backup_path/*.tar.gz | awk '{print $9 }' | wc -l`

if [ $number -gt $count ]
then
#删除最早生成的备份,只保留count数量的备份
rm $delfile
#更新删除文件日志
echo "delete $delfile" >> $backup_path/dump.log
fi

该脚本实现的功能:备份指定数据库的数据信息到指定目录,并只保存指定数量的最新文件。

注意:脚本中需要补全脚本中的passworddatabase_name信息,可修改备份保存路径backup_path,以及最多保存的备份文件数量count

编写完脚本信息之后,需要给脚本赋予可执行权限 chmod +x backup_mysql.sh

二、设定定时任务crontab

运行crontab -e命令,打开一个可编辑的文本,输入0 1 * * * /path/to/backup_mysql.sh 保本并退出即添加完成。

注意:其中0 1 * * *,表示每天凌晨1点进行备份操作,可自行修改1的值(范围0~23)

其中路径信息/path/to/backup_mysql.sh需要修改为实际的脚本路径。

应用场景

风险控制
对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。

策略营销
用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。

运维监控
灵活配置多指标、多依赖来实现更复杂的监控模式。

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。

由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。

Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。

Watermark的产生和Apache Flink内部处理逻辑如下图所示:

产生方式

  • Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。 在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

  • Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

what ?

State是指流计算过程中计算节点的中间计算结果或元数据属性,比如 在aggregation过程中要在state中记录中间聚合结果,比如 Apache Kafka 作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)。所以Apache Flink中的State就是与时间相关的,Apache Flink任务的内部数据(计算数据和元数据属性)的快照。

why ?

与批计算相比,State是流计算特有的,批计算没有failover机制,要么成功,要么重新计算。流计算在 大多数场景 下是增量计算,数据逐条处理(大多数场景),每次计算是在上一次计算结果之上进行处理的,这样的机制势必要将上一次的计算结果进行存储(生产模式要持久化),另外由于 机器,网络,脏数据等原因导致的程序错误,在重启job时候需要从成功的检查点(checkpoint,后面篇章会专门介绍)进行state的恢复。增量计算,Failover这些机制都需要state的支撑。

how ?

存储实现

  • 基于内存的HeapStateBackend - 在debug模式使用,不 建议在生产模式下应用;

  • 基于HDFS的FsStateBackend - 分布式文件持久化,每次读写都产生网络IO,整体性能不佳;

  • 基于RocksDB的RocksDBStateBackend - 本地文件+异步HDFS持久化;

    Apache Flink版本选择用RocksDB+HDFS的方式进行State的存储,State存储分两个阶段,首先本地存储到RocksDB,然后异步的同步到远程的HDFS。 这样而设计既消除了HeapStateBackend的局限(内存大小,机器坏掉丢失等),也减少了纯分布式存储的网络IO开销。

  • 还有一个是基于Niagara(Alibaba内部实现)NiagaraStateBackend - 分布式持久化- 在Alibaba生产环境应用;

分类

通过算子和数据层面划分

  • 算子类state

    KeyedState - 这里面的key是我们在SQL语句中对应的GroupBy/PartitioneBy里面的字段,key的值就是groupby/PartitionBy字段组成的Row的字节数组,每一个key都有一个属于自己的State,key与key之间的State是不可见的

  • 数据类state

    OperatorState - Apache Flink内部的Source Connector的实现中就会用OperatorState来记录source数据读取的offset。

checkpoint

checkpoint是使Flink 能从故障恢复的一种内部机制。检查点是 Flink 应用状态的一个一致性副本,包括了输入的读取位点。在发生故障时,Flink 通过从检查点加载应用程序状态来恢复,并从恢复的读取位点继续处理,就好像什么事情都没发生一样。Flink的状态存储在Flink的内部,这样做的好处就是不再依赖外部系统,降低了对外部系统的依赖,在Flink的内部,通过自身的进程去访问状态变量.同时会定期的做checkpoint持久化,把checkpoint存储在一个分布式的持久化系统中,如果发生故障,就会从最近的一次checkpoint中将整个流的状态进行恢复.