0%

使用 SQL 进行数据分析的过程中,join 是经常要使用的操作。

在离线场景中,join 的数据集是有边界的,可以缓存数据有边界的数据集进行查询,有Nested Loop/Hash Join/Sort Merge Join 等多表 join;

而在实时场景中,join 两侧的数据都是无边界的数据流,所以缓存数据集对长时间 job 来说,存储和查询压力很大,另外双流的到达时间可能不一致,造成 join 计算结果准确度不够;因此,Flink SQL 提供了多种 join 方法,来帮助用户应对各种 join 场景。

regular join

regular join 是最通用的 join 类型,不支持时间窗口以及时间属性,任何一侧数据流有更改都是可见的,直接影响整个 join 结果。如果有一侧数据流增加一个新纪录,那么它将会把另一侧的所有的过去和将来的数据合并在一起,因为 regular join 没有剔除策略,这就影响最新输出的结果; 正因为历史数据不会被清理,所以 regular join 支持数据流的任何更新操作。

对于 regular join 来说,更适合用于离线场景和小数据量场景。

  • 语法
    1
    2
    3
    4
    SELECT columns
    FROM t1 [AS <alias1>]
    [LEFT/INNER/FULL OUTER] JOIN t2
    ON t1.column1 = t2.key-name1

interval join

相对于 regular join,interval Join 则利用窗口的给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 join 不可见并可以被清理掉,这样就能修正 regular join 因为没有剔除数据策略带来 join 结果的误差以及需要大量的资源。

但是使用interval join,需要定义好时间属性字段,可以是计算发生的 Processing Time,也可以是根据数据本身提取的 Event Time;如果是定义的是 Processing Time,则Flink 框架本身根据系统划分的时间窗口定时清理数据;如果定义的是 Event Time,Flink 框架分配 Event Time 窗口并根据设置的 watermark 来清理数据。

  • 语法1

    1
    2
    3
    4
    SELECT columns
    FROM t1 [AS <alias1>]
    [LEFT/INNER/FULL OUTER] JOIN t2
    ON t1.column1 = t2.key-name1 AND t1.timestamp BETWEEN t2.timestamp AND BETWEEN t2.timestamp + + INTERVAL '10' MINUTE;
  • 语法2

    1
    2
    3
    4
    SELECT columns
    FROM t1 [AS <alias1>]
    [LEFT/INNER/FULL OUTER] JOIN t2
    ON t1.column1 = t2.key-name1 AND t2.timestamp <= t1.timestamp and t1.timestamp <= t2.timestamp + + INTERVAL10' MINUTE ;

temproal table join

interval Join 提供了剔除数据的策略,解决资源问题以及计算更加准确,这是有个前提:join 的两个流需要时间属性,需要明确时间的下界,来方便剔除数据;

显然,这种场景不适合维度表的 join,因为维度表没有时间界限,对于这种场景,Flink 提供了 temproal table join 来覆盖此类场景。

在 regular join和interval join中,join 两侧的表是平等的,任意的一个表的更新,都会去和另外的历史纪录进行匹配,temproal table 的更新对另一表在该时间节点以前的记录是不可见的。

  • 语法
    1
    2
    3
    4
    SELECT columns
    FROM t1 [AS <alias1>]
    [LEFT] JOIN t2 FOR SYSTEM_TIME AS OF t1.proctime [AS <alias2>]
    ON t1.column1 = t2.key-name1

lookup join

  • 维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件

reference:

Flink SQL 实战:双流 join 场景应用-阿里云开发者社区

Flink SQL 功能解密系列 —— 维表 JOIN 与异步优化

table vs view

Temporary tables are always stored in memory and only exist for the duration of the Flink session they are created within. These tables are not visible to other sessions. They are not bound to any catalog or database but can be created in the namespace of one. Temporary tables are not dropped if their corresponding database is removed.

Tables can be either virtual (VIEWS) or regular (TABLES). VIEWS can be created from an existing Table object, usually the result of a Table API or SQL query. TABLES describe external data, such as a file, database table, or message queue.

grammer

  • NOT ENFORCED

If you know that the data conforms to these constraints, you can use the NOT ENFORCED capability to help achieve two goals:

  • Improve performance, primarily in insert, update, and delete operations on the table
  • Reduce space requirements that are associated with enforcing a primary key or unique constraint

data type

字符串类型:

  • ⭐ CHAR、CHAR(n):定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。
  • ⭐ VARCHAR、VARCHAR(n)、STRING:可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)。

二进制字符串类型:

  • ⭐ BINARY、BINARY(n):定长二进制字符串,n 代表定长,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。
  • ⭐ VARBINARY、VARBINARY(n)、BYTES:可变长二进制字符串,n 代表字符的最大长度,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)。

精确数值类型:

  • ⭐ DECIMAL、DECIMAL(p)、DECIMAL(p, s)、DEC、DEC(p)、DEC(p, s)、NUMERIC、NUMERIC(p)、NUMERIC(p, s):固定长度和精度的数值类型,就和 Java 中的 BigDecimal 一样,p 代表数值位数(长度),取值范围 [1, 38];s 代表小数点后的位数(精度),取值范围 [0, p]。如果不指定,p 默认为 10,s 默认为 0。
  • ⭐ TINYINT:-128 到 127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。
  • ⭐ SMALLINT:-32,768 to 32,767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。
  • ⭐ INT、INTEGER:-2,147,483,648 to 2,147,483,647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。
  • ⭐ BIGINT:-9,223,372,036,854,775,808 to 9,223,372,036,854,775,807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。

有损精度数值类型:

日期、时间类型:

  • ⭐ DATE:由 年-月-日 组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31]

  • ⭐ TIME、TIME(p):由 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的的时间的数据类型,精度高达纳秒,取值范围 [00:00:00.000000000到23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 0。

  • ⭐ TIMESTAMP、TIMESTAMP(p)、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP(p) WITHOUT TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。

    Timestamp precision can range from 0 (seconds) to 9 (nanoseconds). The default precision is 6.

  • ⭐ TIMESTAMP WITH TIME ZONE、TIMESTAMP(p) WITH TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。

  • ⭐ TIMESTAMP_LTZ、TIMESTAMP_LTZ(p):由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。

  • ⭐ TIMESTAMP_LTZ 与 TIMESTAMP WITH TIME ZONE 的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由 table.local-time-zone 参数来设置时区。

  • ⭐ INTERVAL YEAR TO MONTH、 INTERVAL DAY TO SECOND:interval 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。INTERVAL 子句总共涉及到的语法种类如下 Flink SQL 案例所示。

注意:但是在 Flink SQL 中,无法直接去掉时间字段的小数秒部分而保留时间类型。

其它类型

  1. 布尔类型:BOOLEAN
  2. ⭐ NULL 类型:NULL
  3. ⭐ Raw 类型:RAW(‘class’, ‘snapshot’) 。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器

Flink on YARN模式

在这种模式下Flink的资源由YARN来进行管理,Flink服务被提交到YARN的ResourceManager后,YARN的NodeManager会为Flink生成对应的容器,Flink再将JobManager和TaskManager实例部署到容器中。在这种情况下Flink可以通过JobManager所需要的slots数量来动态的调整TaskManager的资源,达到了资源的可拓展性。Flink官方也推荐正式的生产环境使用这种部署模式。
在YARN上,又分为三种部署模式:

Session Mode

共享JobManager和TaskManager,所有提交的任务都在一个集群中运行,集群的生命周期独立于任务,任务的开始、结束不影响集群的生命周期。类似于上面的Standalone-cluster模式,任务与任务之间不隔离,共享同一套资源。

Per-Job Mode

为每个任务创建单独的JobManager和TaskManager集群,每个任务之间互相隔离互不干扰,集群的生命周期随着任务的生命周期结束而结束。这种模式的优点就是任务独占一个集群,资源的隔离性好。

Application Mode

一个Application可以存在多个任务,这时YARN为每个Application创建集群,Application中的任务共享该集群,资源的隔离是Application级别的,集群的生命周期随着Application的生命周期结束。这种模式更像是Session Mode和Pre-Job Mode的折中方案,既做到了资源的隔离,又提高了任务之间资源的利用率。

interaction

two way to submit job on yarn

first way:yarn session

(Start a long-running Flink cluster on YARN)这种方式需要先启动集群,然后在提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交.

ps:所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业.适用于本地测试或者开发

mode one: 客户端模式

可以启动多个yarn session,一个yarn session模式对应一个JobManager,并按照需求提交作业,同一个Session中可以提交多个Flink作业。如果想要停止Flink Yarn Application,需要通过yarn application -kill命令来停止.

1
bin/yarn-session.sh -n 2 -jm 1024 -tm 4096 -s 6
  • YarnSessionClusterEntrypoint进程

    代表本节点可以命令方式提交job,而且可以不用指定-m参数。

    • 本节点提交任务

      bin/flink run ~/flink-demo-wordcount.jar

    • 如果需要在其他主机节点提交任务

      bin/flink run -m vmhome10.com:43258 examples/batch/WordCount.jar

  • FlinkYarnSessionCli进程

    代表yarn-session集群入口,实际就是jobmanager节点,也是yarn的ApplicationMaster节点。

mode two: 分离式模式

JobManager的个数只能是一个,同一个Session中可以提交多个Flink作业。如果想要停止Flink Yarn Application,需要通过yarn application -kill命令来停止。通过-d指定分离模式.

1
./bin/yarn-session.sh -nm test3 -d

在所有的节点只会出现一个 YarnSessionClusterEntrypoint进程

直接在YARN上提交运行Flink作业(Run a Flink job on YARN),这种方式的好处是一个任务会对应一个job,即没提交一个作业会根据自身的情况,向yarn申请资源,直到作业执行完成,并不会影响下一个作业的正常运行,除非是yarn上面没有任何资源的情况下。

ps:适用于生产环境,可启动多个yarn session (bin/yarn-session.sh -nm ipOrHostName)

1
./bin/flink run -m addressOfJobmanager -yn 1 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

注意使用参数-m yarn-cluster提交到yarn集群。

  • 运行到指定的yarn session可以指定 -yid,–yarnapplicationId Attach to running YARN session来附加到到特定的yarn session上运行

reference

Flink on yarn部署模式 - 简书

flink on yarn模式下两种提交job方式 - 我是属车的 - 博客园

Flink-On-Yarn的部署模式

Hive-site里面的配置!!!

level-1

get started

1
2
nohup hive --service metastore &
nohup hive --service hiveserver2 &

beeline

1)metadata :hive元数据,即hive定义的表名,字段名,类型,分区,用户这些数据。一般存储关系型书库mysql中,在测试阶段也可以用hive内置Derby数据库。

(2)metastore :hivestore服务端。主要提供将DDL,DML等语句转换为MapReduce,提交到hdfs中。

(3)hiveserver2:hive服务端。提供hive服务。客户端可以通过beeline,jdbc(即用java代码链接)等多种方式链接到hive。

(4)beeline:hive客户端链接到hive的一个工具。可以理解成mysql的客户端。如:navite cat 等。

2 连接hive:
(1)./bin/hive
通过 ./bin/hive 启动的hive服务,第一步会先启动metastore服务,然后在启动一个客户端连接到metastore。此时metastore服务端和客户端都在一台机器上,别的机器无法连接到metastore,所以也无法连接到hive。这种方式不常用,一直只用于调试环节。

(2) ./bin/hive –service metastore
通过hive –service metastore 会启动一个 hive metastore服务默认的端口号为:9083。metastore服务里面配置metadata相关的配置。此时可以有多个hive客户端在hive-site.xml配置hive.metastore.uris=thrift://ipxxx:9083 的方式链接到hive。motestore 虽然能使hive服务端和客户端分别部署到不同的节点,客户端不需要关注metadata的相关配置。但是metastore只能通过只能通过配置hive.metastore.uris的方式连接,无法通过jdbc的方式访问。

(3)./bin/hiveserver2
hiveserver2 会启动一个hive服务端默认端口为:10000,可以通过beeline,jdbc,odbc的方式链接到hive。hiveserver2启动的时候会先检查有没有配置hive.metastore.uris,如果没有会先启动一个metastore服务,然后在启动hiveserver2。如果有配置hive.metastore.uris。会连接到远程的metastore服务。这种方式是最常用的。部署在图如下:

  • 登录bin/beeline,可以启动客户端链接到hiveserver2。执行beeline后在控制输入 !connect jdbc:hive2://localhost:10000/default root 123 就可以链接到 hiveserver2了;default表示链接到default database, root 和123 分别为密码。注意这里的密码不是mysql的密码,是hive中的用户
1
2
连接库
!connect jdbc:hive2://localhost:10000/default root 123

hive中几种分割符

分隔符

\n 每行一条记录
^A 分隔列(八进制 \001)
^B 分隔ARRAY或者STRUCT中的元素,或者MAP中多个键值对之间分隔(八进制 \002)
^C 分隔MAP中键值对的“键”和“值”(八进制 \003)

用到了系统默认分隔符。通常下面2中情况我们需要需要用到分隔符

1,制作table的输入文件,有时候我们需要输入一些特殊的分隔符

2,把hive表格导出到本地时,系统默认的分隔符是^A,这个是特殊字符,直接cat或者vim是看不到的

分隔符在HIVE中的用途

分隔符 描述
\n 对于文本文件来说,每行都是一条记录,因此换行符可以分隔记录
^A(Ctrl+A) 用于分隔字段(列)。在CREATE TABLE语句中可以使用八进制编码\001表示
^B(Ctrl+B) 用于分隔ARRAY或者STRUCT中的元素,或用于MAP中键-值对之间的分隔。在CREATE TABLE语句中可以使用八进制编码\002表示
^C(Ctrl+C) 用于MAP中键和值之间的分隔。在CREATE TABLE语句中可以使用八进制编码\003表示

Hive 中没有定义专门的数据格式,数据格式可以由用户指定,用户定义数据格式需要指定三个属性:列分隔符(通常为空格、”\t”、”\x001″)、行分隔符(”\n”)以及读取文件数据的方法。由于在加载数据的过程中,不需要从用户数据格式到 Hive 定义的数据格式的转换,因此,Hive 在加载的过程中不会对数据本身进行任何修改,而只是将数据内容复制或者移动到相应的 HDFS 目录中。

我们可以在create表格的时候,选择如下,表格加载input的文件的时候就会按照下面格式匹配

1
2
3
4
5
6
row format delimited 
fields terminated by '\001'
collection items terminated by '\002'
map keys terminated by '\003'
lines terminated by '\n'
stored as textfile;

如何查看和修改分割符,特殊符号

  1. 查看隐藏字符的方法

1.1,cat -A filename

img

1.2,vim filename后 命令模式下输入

1
2
set list显示特殊符号
set nolist 取消显示特殊符号
  1. 修改隐藏字符的方法

首先按照1.2打开显示特殊符号。进入INSERT模式

1
2
3
4
ctrl + V 可以输入 ^符号
ctrl + a 可以输入A---'\001'
ctrl + b 可以输入A---'\002'
ctrl + c 可以输入A---'\003'

注意:虽然键盘上你能找到^和A但直接输入时不行的,必须按照上面的方法输入。

第一行是特殊符号颜色蓝色,第二行直接输入不是特殊符号。

img

特殊号直接cat是不可以看见的,但是第二行是可见的,所以不是特殊符号。

img

1
2
3
4
5
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\u0001'
COLLECTION ITEMS TERMINATED BY '\u0002'
MAP KEYS TERMINATED BY '\u0003'
\u0001是ASCII编码值,对应java代码中的"\001"

意义如下:

(1)FIELDS,字段之间的分隔符是’\u0001’

(2)COLLECTION ITEMS,多个集合之间的分隔符是’\u0002’,例如(kv1,kv2,kv3)这种多个键值对之间的分隔符就是’\u0002’

(3)MAP KEYS,单个map的k和v之间的分隔符是\u0003\,例如kv1里,k \u0003 v

查看orc文件

1
hive --orcfiledump <hdfs-location-of-orc-file>

修改字段类型

1
2
ALTER TABLE <table-name> CHANGE <old-col-name> <new-col-name> <data-type>;
ALTER TABLE employee CHANGE e_id e_id INT;

建表

  • Create ORC table

  • Login to the web console

  • Launch Hive by typing hive in the web console. Run the below commands in Hive.

  • Use your database by using the below command. ${env:USER} gets replaced by your username automatically:

    1
    use ${env:USER};
  • To create an ORC file format:

    1
    2
    3
    4
    5
    CREATE TABLE orc_table (
    first_name STRING,
    last_name STRING
    )
    STORED AS ORC;
  • To insert values in the table:

    1
    INSERT INTO orc_table VALUES ('John','Gill');
  • To retrieve all the values in the table:

    1
    SELECT * FROM orc_table;

其它

查看hive进程

1
jps -ml  | grep Hive

level-2

内部表vs外部表

内部表

  1. 按照表数据的生命周期,可以将表分为内部表和外部表两类;
  2. 内部表也叫管理表或临时表,该类型表的生命周期时由hive控制的,默认情况下数据都存放在/user/hive/warehouse/下面;
  3. 删除表时数据会被删除;
  4. 以下命令创建的就是内部表,可见前面两篇文章中创建的表都是内部表

外部表

  • 创建表的SQL语句中加上external,创建的就是外部表了;
  • 外部表的数据生命周期不受Hive控制;
  • 删除外部表的时候不会删除数据;
  • 外部表的数据,可以同时作为多个外部表的数据源共享使用;
  • 接下来开始实践,下面是建表语句:
  • 在实际生产业务系统开发中,外部表是我们主要应用的表类型

cpu

us:用户态使用的cpu时间比
sy:系统态使用的cpu时间比
ni:用做nice加权的进程分配的用户态cpu时间比
id:空闲的cpu时间比
wa:cpu等待磁盘写入完成时间
hi:硬中断消耗时间
si:软中断消耗时间
st:虚拟机偷取时间

Yet Another Resource Negotiator

YARN 看做一个云操作系统,它负责为应用程序启 动 ApplicationMaster(相当于主线程),然后再由 ApplicationMaster 负责数据切分、任务分配、 启动和监控等工作,而由 ApplicationMaster 启动的各个 Task(相当于子线程)仅负责自己的计 算任务。当所有任务计算完成后,ApplicationMaster 认为应用程序运行完成,然后退出。

概念

contrainer

容器(Container)这个东西是 Yarn 对资源做的一层抽象。就像我们平时开发过程中,经常需要对底层一些东西进行封装,只提供给上层一个调用接口一样,Yarn 对资源的管理也是用到了这种思想。

Yarn 将CPU核数,内存这些计算资源都封装成为一个个的容器(Container)。    

  • 容器由 NodeManager 启动和管理,并被它所监控。
  • 容器被 ResourceManager 进行调度。

ResourceManager

负责资源管理的,整个系统有且只有一个 RM ,来负责资源的调度。它也包含了两个主要的组件:定时调用器(Scheduler)以及应用管理器(ApplicationManager)。

  1. 定时调度器(Scheduler):从本质上来说,定时调度器就是一种策略,或者说一种算法。当 Client 提交一个任务的时候,它会根据所需要的资源以及当前集群的资源状况进行分配。注意,它只负责向应用程序分配资源,并不做监控以及应用程序的状态跟踪。
  2. 应用管理器(ApplicationManager):同样,听名字就能大概知道它是干嘛的。应用管理器就是负责管理 Client 用户提交的应用。上面不是说到定时调度器(Scheduler)不对用户提交的程序监控嘛,其实啊,监控应用的工作正是由应用管理器(ApplicationManager)完成的。

ApplicationMaster

每当 Client 提交一个 Application 时候,就会新建一个 ApplicationMaster 。由这个 ApplicationMaster 去与 ResourceManager 申请容器资源,获得资源后会将要运行的程序发送到容器上启动,然后进行分布式计算。

ps: 大数据分布式计算的思想,大数据难以移动(海量数据移动成本太大,时间太长),那就把容易移动的应用程序发布到各个节点进行计算。

NodeManager

NodeManager 是 ResourceManager 在每台机器的上代理,负责容器的管理,并监控他们的资源使用情况(cpu,内存,磁盘及网络等),以及向 ResourceManager/Scheduler 提供这些资源使用报告。

命令

submit application to yarn

  1. Client 向 Yarn 提交 Application,这里我们假设是一个 MapReduce 作业。
  2. ResourceManager 向 NodeManager 通信,为该 Application 分配第一个容器。并在这个容器中运行这个应用程序对应的 ApplicationMaster。
  3. ApplicationMaster 启动以后,对 作业(也就是 Application) 进行拆分,拆分 task 出来,这些 task 可以运行在一个或多个容器中。然后向 ResourceManager 申请要运行程序的容器,并定时向 ResourceManager 发送心跳。
  4. 申请到容器后,ApplicationMaster 会去和容器对应的 NodeManager 通信,而后将作业分发到对应的 NodeManager 中的容器去运行,这里会将拆分后的 MapReduce 进行分发,对应容器中运行的可能是 Map 任务,也可能是 Reduce 任务。
  5. 容器中运行的任务会向 ApplicationMaster 发送心跳,汇报自身情况。当程序运行完成后, ApplicationMaster 再向 ResourceManager 注销并释放容器资源。

停止应用

yarn application -kill appID

参数配置

每个job提交到yarn上执行时,都会分配Container容器去运行,而这个容器需要资源才能运行,这个资源就是Cpu和内存。

CPU资源调度

目前的CPU被Yarn划分为虚拟CPU,这是yarn自己引入的概念,因为每个服务器的Cpu计算能力不一样,有的机器可能是 其他机器的计算能力的2倍,然后可以通过多配置几个虚拟内存弥补差异。在yarn中,cpu的相关配置如下。

yarn.nodemanager.resource.cpu-vcores

表示该节点服务器上yarn可以使用的虚拟的CPU个数,默认是8,推荐配置与核心个数相同,如果节点CPU的核心个数不足8个,需要调小这个值,yarn不会智能的去检测物理核数。如果机器性能较好,可以配置为物理核数的2倍。

yarn.scheduler.minimum-allocation-vcores

表示单个任务最小可以申请的虚拟核心数,默认为1

yarn.sheduler.maximum-allocation-vcores

表示单个任务最大可以申请的虚拟核数,默认为4;如果申请资源时,超过这个配置,会抛出 InvalidResourceRequestException

Memory资源调度

yarn一般允许用户配置每个节点上可用的物理资源,可用指的是将机器上内存减去hdfs的,hbase的等等剩下的可用的内存。

yarn.nodemanager.resource.memory-mb

设置该节点上yarn可使用的内存,默认为8G,如果节点内存不足8G,要减少这个值,yarn不会智能的去检测内存资源,一般这个值式yarn的可用内存资源。

yarn.scheduler.minmum-allocation-mb

单个任务最小申请物理内存量,默认是1024M,根据自己业务设定

yarn.scheduler.maximum-allocation-mb

单个任务最大可以申请的物理内存量,默认为8291M

二、如果设置这几个参数
如果一个服务器是32核,虚拟后为64核,128G内存,我们该如何设置上面的6个参数呢?即如何做到资源最大化利用

生产上我们一般要预留15-20%的内存,那么可用内存就是128*0.8=102.4G,去除其他组件的使用,我们设置成90G就可以了。

1、yarn.sheduler.maximum-allocation-vcores
1.
一般就设置成4个,cloudera公司做过性能测试,如果CPU大于等于5之后,CPU的利用率反而不是很好。这个参数可以根据生成服务器决定,比如公司服务器很富裕,那就直接设置成1:1;设置成32,如果不是很富裕,可以直接设置成1:2。我们以1:2来计算。

2、yarn.scheduler.minimum-allocation-vcores
1.
如果设置vcoure = 1,那么最大可以跑64/1=64个container,如果设置成这样,最小container是64/4=16个。

3、yarn.scheduler.minmum-allocation-mb

如果设置成2G,那么90/2=45最多可以跑45个container,如果设置成4G,那么最多可以跑24个;vcore有些浪费。

4、yarn.scheduler.maximum-allocation-mb

这个要根据自己公司的业务设定,如果有大任务,需要5-6G内存,那就设置为8G,那么最大可以跑11个container。

TIPS

配置指定用户启停

start-yarn.shstop-yarn.sh添加如下配置

1
2
YARN_RESOURCEMANAGER_USER=xxx
YARN_NODEMANAGER_USER=xxx

ps: 如果某个节点nodemanager没有拉起,需要执行./yarn --daemon start nodemanager

在资源够用的情况,无法发布新的应用

修改capacity-scheduler.xml

1
2
3
4
5
6
7
8
9
10
<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<!--<value>0.1</value>-->
<value>0.8</value>
<description>
Maximum percent of resources in the cluster which can be used to run
application masters i.e. controls number of concurrent running
applications.
</description>
</property>


reference:

https://zhuanlan.zhihu.com/p/54192454

当设计一款产品或者平台的时候,可以划分为两层,即底层实现和上层抽象。

实时数仓和传统数仓的对比主要可以从四个方面考虑:

  • 第一个是分层方式,离线数仓为了考虑到效率问题,一般会采取空间换时间的方式,层级划分会比较多;则实时数仓考虑到实时性问题,一般分层会比较少,另外也减少了中间流程出错的可能性。
  • 第二个是事实数据存储方面,离线数仓会基于 HDFS,实时数仓则会基于消息队列(如 Kafka)。
  • 第三个是维度数据存储,实时数仓会将数据放在 KV 存储上面。
  • 第四个是数据加工过程,离线数仓一般以 Hive、Spark 等批处理为主,而实时数仓则是基于实时计算引擎如 Storm、Flink 等,以流处理为主。

实时数仓主要有两个要点。首先是分层设计上,一般也是参考离线数仓的设计,通常会分为

ODS操作数据层

DWD明细层

DWS汇总层(轻度汇总/高度汇总)

ADS应用层

可能还会分出一层DIM维度数据层。另外分层设计上也有不同的思路,比如可以将DWS和ADS归为DM数据集市层

  • ODS(Operational Data Store): 贴源层

    这一层又叫做贴源层,最为接近数据源的一层,需要存储的数据量是最大的,存储的数据也是最原始。对众多数据源而言,他们的数据格式基本不一致,经过统一规格化后可以得到规整的数据,将数据源中的数据经过抽取、清洗、传输后装入ODS层。

  • DWD(Data Warehouse Detail):数据明细层

    业务层与数据仓库的隔离层,主要对ODS层做一些数据清洗和规范化的操作,并且可以按照不同的行为维度对数据进行划分,例如本文对数据源就进行了划分,主要分为浏览、曝光、点击、交易等不同的维度,这些不同的维度能够对上层调用方提供更细粒度的数据服务。

  • DWS(Data WareHouse Servce):数据服务层

    对各个域进行了适度汇总,主要以数据域+业务域的理念建设公共汇总层,与离线数仓不同的是,实时数仓的汇总层分为轻度汇总层和高度汇总层,例如将轻度汇总层数据写入 ADS,用于前端产品复杂的OLAP查询场景,满足自助分析和产出报表的需求。

  • ADS(Application Data Store):应用数据服务层

    主要是为了具体需求而构建的应用层,通过 RPC 框架对外提供服务,例如本文中提到的数据报表分析与展示、监控告警、流量调控、开放平台等应用。

  • DIM(Dimension):维表

    在实时计算中非常重要,也是重点维护的部分,维表需要实时更新,且下游基于最新的维表进行计算


reference:

实时数仓 | 你想要的数仓分层设计与技术选型 - 云+社区 - 腾讯云

  • 实时数仓是指在数据仓库的基础上,增加了实时数据处理和分析功能,可以在数据生成后立即进行处理和分析,并及时反馈给业务用户。实时数仓的建设需要关注以下几个问题:
  1. 数据源接入:实时数仓需要及时地处理和分析数据,因此数据源的接入必须实时或近实时,需要考虑数据源的多样性和高并发性。
  2. 数据质量:数据的准确性和完整性对于实时数仓的建设尤为重要,需要对数据进行清洗、过滤、校验等操作,保证数据的质量。
  3. 数据集成:实时数仓需要整合多个数据源的数据,因此需要进行数据集成和融合,确保数据的一致性和准确性。
  4. 数据模型设计:数据模型是实时数仓建设的核心,需要设计合理的数据模型,确保数据的可扩展性和灵活性。
  5. 实时计算和分析:实时数仓需要具备实时计算和分析能力,需要使用实时计算引擎和分析工具,对数据进行实时处理和分析。
  6. 数据安全:实时数仓中的数据可能涉及敏感信息,需要采取一定的安全措施,保证数据的安全性和隐私性。
  7. 系统运维:实时数仓需要进行系统运维和监控,包括数据备份和恢复、系统调优、故障处理等方面,确保系统的稳定性和可靠性。

综上所述,实时数仓的建设需要关注数据源接入、数据质量、数据集成、数据模型设计、实时计算和分析、数据安全以及系统运维等多个方面,需要根据实际需求和场景进行综合考虑和规划。

存储架构

Clickhouse 存储中的最小单位是 DataPart,写入链路为了提升吞吐,放弃了部分写入实时可见性,即数据攒批写入,一次批量写入的数据会落盘成一个 DataPart.

它不像 Druid 那样一条一条实时摄入。但 ClickHouse 把数据延迟攒批写入的工作交给来客户端实现,比如达到 10 条记录或每过 5s 间隔写入,换句话说就是可以在用户侧平衡吞吐量和时延,如果在业务高峰期流量不是太大,可以结合实际场景将参数调小,以达到极致的实时效果。

查询架构

计算能力方面

Clickhouse 采用向量化函数和 aggregator 算子极大地提升了聚合计算性能,配合完备的 SQL 能力使得数据分析变得更加简单、灵活。

数据扫描方面

ClickHouse 是完全列式的存储计算引擎,而且是以有序存储为核心,在查询扫描数据的过程中,首先会根据存储的有序性、列存块统计信息、分区键等信息推断出需要扫描的列存块,然后进行并行的数据扫描,像表达式计算、聚合算子都是在正规的计算引擎中处理。从计算引擎到数据扫描,数据流转都是以列存块为单位,高度向量化的。

高并发服务方面

Clickhouse 的并发能力其实是与并行计算量和机器资源决定的。如果查询需要扫描的数据量和计算复杂度很大,并发度就会降低,但是如果保证单个 query 的 latency 足够低(增加内存和 cpu 资源),部分场景下用户可以通过设置合适的系统参数来提升并发能力,比如 max_threads 等。其他分析型系统(例如 Elasticsearch)的并发能力为什么很好,从 Cache 设计层面来看,ES 的 Cache 包括 Query Cache, Request Cache,Data Cache,Index Cache,从查询结果到索引扫描结果层层的 Cache 加速,因为 Elasticsearch 认为它的场景下存在热点数据,可能被反复查询。反观 ClickHouse,只有一个面向 IO 的 UnCompressedBlockCache 和系统的 PageCache,为了实现更优秀的并发,我们很容易想到在 Clickhouse 外面加一层 Cache,比如 redis,但是分析场景下的数据和查询都是多变的,查询结果等 Cache 都不容易命中,而且在广投业务中实时查询的数据是基于 T 之后不断更新的数据,如果外挂缓存将降低数据查询的时效性。

技巧

唯一键约束

1
2
3
4
5
6
7
8
9
10
CREATE TABLE IF NOT EXISTS qilu.t_01(
C1 String,
C2 String,
C3 String,
C4 Date,
PRIMARY KEY (C1) # 要设置主键
) engine=ReplacingMergeTree() # 引擎要用ReplacingMergeTree
ORDER BY C1; # 要设置排序

optimize table t_01 FINAL; # 要强制合并分区

Change Data Capture(变更数据获取)

核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

应用场景

  • 数据同步,用于备份,容灾;
  • 数据分发,一个数据源分发给多个下游;
  • 数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

分类

主要分为基于查询基于 Binlog 两种方式

传统 CDC ETL

性能点

大数据领域的 4 类场景:

B batch 离线计算

A Analytical 交互式分析

S Servering 高并发的在线服务

T Transaction 事务隔离机制

离线计算通常在计算层,所以应该重点考虑 A、S 和 T

考虑点

  • 保证端到端的数据一致性,包括维度一致性以及全流程数据一致性;

  • 实时流处理过程中数据到达顺序无法预知时,如何保证双流 join 时数据能及时关联同时不造成数据堵塞;

  • Oracle

    1
    2
    1.Oracle 是第三方厂商维护的,不允许对线上系统有过多的侵入,容易造成监听故障甚至系统瘫痪,
    2.归档日志是在开启那一刻起才开始生成的,之前的存量数据难以进入 kafka,但是后来实时数据又必须依赖前面的计算结果

实时数仓方案

Lambda 架构

目前主流的一套实时数仓架构,存在离线和实时两条链路。实时部分以消息队列的方式实时增量消费,一般以 Flink+Kafka 的组合实现,维度表存在关系型数据库或者 HBase;离线部分一般采用 T+1 周期调度分析历史存量数据,每天凌晨产出,更新覆盖前一天的结果数据,计算引擎通常会选择 Hive 或者 Spark。

Kappa 架构

相较于 Lambda 架构,它移除了离线生产链路,思路是通过传递任意想要的 offset(偏移量)来达到重新消费处理历史数据的目的。优点是架构相对简化,数据来源单一,共用一套代码,开发效率高;缺点是必须要求消息队列中保存了存量数据,而且主要业务逻辑在计算层,比较消耗内存资源。

OLAP 变体架构

是 Kappa 架构的进一步演化,它的思路是将聚合分析计算由 OLAP 引擎承担,减轻实时计算部分的聚合处理压力。优点是自由度高,可以满足数据分析师的实时自助分析需求,减轻了计算引擎的处理压力;缺点是必须要求消息队列中保存存量数据,且因为是将计算部分的压力转移到了查询层,对查询引擎的吞吐和实时摄入性能要求较高。

数据湖架构

存储、计算和查询,分别由三个独立产品负责,分别是数据湖、Flink 和 Clickhouse。数仓分层存储和维度表管理均由数据湖承担,Flink SQL 负责批流任务的 SQL 化协同开发,Clickhouse 实现变体的事务机制,为用户提供离线分析和交互查询。CDC 到消息队列这一链路将来是完全可以去掉的,只需要 Flink CDC 家族中再添加 Oracle CDC 一员。未来,实时数仓架构将得到极致的简化并且性能有质的提升。