0%

单个实体转换为字典

  • 方式一
1
2
# 将实例对象转换为字典,排除未设置值的字段
entity_dict = entity.dict(exclude_unset=True)

使用 Pydantic 模型对象的一个方法,用于将模型对象转换为字典形式。exclude_unset=True 参数是用来控制是否排除未设置的属性字段。当设置为 True 时,将排除那些未设置值的字段,即只包含已经设置值的字段。

  • 方式二:

    遍历字典(对没有设置的字段进行默认值设置,设置排查某字段放入字典)

1
2
3
for key, value in user_group_data.dict(exclude_unset=True, exclude={"team_roles"}).items():
if value is not None:
setattr(user_group, key, value)
  • 方式三
1
2
3
resourceCreate: ResourceCreate
...
resource = Resource(**resourceCreate.dict())

单个实体与实体转换

1
2
3
role_update: RoleUpdate
...
role = Role.from_orm(role_update)

列表实体转换为字典列表

  • 方式一 session.query

在直接返回 查询结构的时候,优先使用,可以直接把结果转为json(e.g. res_json = jsonable_encoder(res_data))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
db_rel_usergroup_roles = (session.query(RelUserGroupUser.user_id, Role.id, Role.name, Role.nick_name, Role.label)
.join(RelUsergroupRole, RelUserGroupUser.usergroup_id == RelUsergroupRole.usergroup_id)
.join(Role, RelUsergroupRole.role_id == Role.id)
.filter(RelUserGroupUser.user_id == user_id)
.all())
rel_usergroup_roles_info = []
for rel_usergroup_role in db_rel_usergroup_roles:
rel_usergroup_roles_info.append(Role(
id=rel_usergroup_role[1],
name=rel_usergroup_role[2],
nick_name=rel_usergroup_role[3],
label=rel_usergroup_role[4]
))
return rel_usergroup_roles_info
  • 方式二 session.exec

在需要特殊处理查询的不同字段信息时,优先使用此方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rel_usergroup_user_list = session.exec(
select(RelUserGroupUser.usergroup_id, RelUserGroupUser.user_id, UserGroup.name, UserGroup.label, UserGroup.remark)
.where(RelUserGroupUser.user_id == user_id)
.join(UserGroup, UserGroup.id == RelUserGroupUser.usergroup_id)
).all()
converted_list = []
for row in rel_usergroup_user_list:
converted_obj = RelUserGroupUserRead(
user_id=row.user_id,
usergroup_id=row.usergroup_id,
usergroup_name=row.name,
usergroup_label=row.label
)
converted_list.append(converted_obj)
return converted_list

查询过滤

  • 筛选条件为或条件

  • ```python
    resource_list = session.query(Resource).filter(or_(Resource.type == “menu”, Resource.type == “element”)).all()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    ## update操作

    如果提前有 `session.get(entity, id)`,那后续在修改相关表属性`enitty.colx = xxx` 之后,需要紧接`session.add(entity)`

    ### session的获取

    在使用 SQLModel 进行数据库操作时,可以使用上下文管理器 `with Session(engine) as session` 来创建并管理数据库会话。这样可以确保会话在使用完毕后会被正确关闭,释放数据库连接资源。

    在例如 `async def delete_user(*, user_id: int, session: Session = Depends(get_session)):`中,`session` 参数是通过 `Depends(get_session)` 进行依赖注入的,FastAPI 框架会负责在请求处理函数执行完毕后自动关闭会话。所以在请求逻辑中,你不需要手动关闭会话连接。

    ## in_语法使用



    ## session.exec vs session.query

    在SQLAlchemy中,`session.query`和`session.exec`都可以用来执行SQL查询。然而,它们之间有一些关键的区别。

    `session.query`是SQLAlchemy ORM的一部分,它返回的是模型对象。这意味着你可以使用Python的属性和方法来访问和操作返回的数据。这对于编写面向对象的代码非常有用。

    ```python
    users = session.query(User).filter(User.name == 'John').all()
    for user in users:
    print(user.id, user.name)

另一方面,session.exec是SQLAlchemy Core的一部分,它返回的是原始的SQL结果。这意味着你需要使用列名作为字符串来访问返回的数据。这对于执行复杂的SQL查询或者需要更接近SQL的性能优化可能更有用。

1
2
3
result = session.exec(select(User).where(User.name == 'John'))
for row in result:
print(row['id'], row['name'])

总的来说:

如果正在编写面向对象的代码,或者查询相对简单,那么session.query可能是更好的选择。

如果需要执行复杂的SQL查询,或者需要更接近SQL的性能优化,那么session.exec可能是更好的选择。

概念

  • LB, “Load Balancing”(负载均衡)的缩写。负载均衡是一种分布式系统中常见的策略,用于将请求均匀地分发到多个服务实例上,以避免某个实例负载过重,提高整体系统的性能、可靠性和可用性。
  • restful header中的所有key,对应的值其实都是集合,只是大多数情况下集合里面只有一个元素

spring cloud gateway

  • DefaultErrorWebExceptionHandler 是 Spring Web 的默认实现类,实现了 ErrorWebExceptionHandler 接口。它提供了默认的错误处理行为,当没有自定义的 ErrorWebExceptionHandler 实现时,Spring Boot 将使用 DefaultErrorWebExceptionHandler 来处理错误。
  • 原生的spring cloud gateway进行请求转发,所有的微服务实例与gateway服务实例在nacos中必须是同一个namesapce和同一个group。

静态路由

  • 使用spring cloud gateway通过代码方式或配置文件方式实现

动态路由

spring cloud gateway 与 nacos结合实现

实现思路:使用nacos的配置中心,将路由配置放在nacos上,写个监听器监听nacos上配置的变化,将变化后的配置更新到GateWay应用的进程内。

  • 场景一(基础)

    可以先在nacos的配置管理中,添加微服务的配置信息(端口、路由等),然后启动微服务程序(不用配置相关端口配置)

  • 场景二(predicate)

    可以控制请求的cookie、header、host、method、path、query、RemoteAddr、Weight(按照权重将请求分发到不同节点服务)

  • 场景三(filters)

    可以控制请求的AddRequestHeader、AddResponseHeader、DedupeResponseHeader、DedupeResponseHeader、CircuitBreaker(断路器)、FallbackHeaders、PrefixPath、PreserveHostHeader(加上请求者的ip信息)、RequestRateLimiter(限流)、RedirectTo、RemoveRequestHeader、RemoveResponseHeader、RewritePath(将请求参数中的路径做变换)、RewriteLocationResponseHeader、RewriteResponseHeader、SecureHeaders、SetPath、SetRequestHeader、SetResponseHeader、SetStatus、StripPrefix、Retry、RequestSize、SetRequestHostHeader、ModifyRequestBody、ModifyResponseBody、TokenRelay(配合鉴权)

  • 场景四(限流)

    • 请求同一目标地址
  • 场景五(修改请求和响应body)

  • 场景六(地址分发)

    • 有多个后端应用服务

注意

  • spring cloud gateway中如果配置文件使用lb访问服务,那需要引入lb相关依赖包

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
    </dependency>

跨域请求

1
2
3
4
5
6
7
8
9
10
11
12
13
gateway:
# 跨域配置
globalcors:
corsConfigurations:
'[/**]':
allowedOrigins: "*"
allowedMethods:
- GET
- POST
- PUT
- DELETE
- OPTIONS
- HEAD

参考

  • Logstash Forwarder/Beats负责在源服务器上收集日志数据,然后将数据发送到集中式的Logstash或Kafka服务器。

  • Logstash是一个灵活的数据收集引擎,它可以从多个来源(包括日志文件、标准输入、Beats等)采集数据,进行转换、过滤和解析,最后将数据发送到各种目的地,比如Elasticsearch、Kafka等。在日志处理中,Logstash常被用作数据的转换和预处理工具。

TODO

POETRY

每个使用 Poetry 的项目都会有自己的虚拟环境,这样可以避免不同项目之间的依赖冲突。因此,每个项目的依赖项都会独立存放在各自的虚拟环境中。

  • 查看安装依赖目录

    1
    poetry env info --path
  • 列出所有虚拟环境

    1
    poetry env list
  • 删除指定虚拟环境

    1
    poetry env remove <虚拟环境名称>
  • 进入 Poetry 创建的虚拟环境

    1
    poetry shell
  • 锁定并安装项目依赖项的确切版本

    1
    2
    poetry lock
    poetry install
  • 安装test相关依赖

    1
    2
    3
    [tool.poetry.group.test.dependencies]
    pytest = "^7.3.0"
    ...
    1
    poetry install --with test
  • 通过requirement添加依赖

    1
    poetry add $( cat requirements.txt )
  • build编译

    如果使用build backend非默认的poetry core,则直接使用其他编译工具命令执行编译

一个典型的使用流程如下:

  1. 在一个新项目中,首先执行 poetry install 安装依赖并生成 poetry.lock 文件。
  2. 在开发过程中,如果需要新增或更新依赖,先执行 poetry add/remove 依赖名
  3. 然后执行 poetry lock 更新 poetry.lock 文件。
  4. 再执行 poetry install 安装新的依赖。
  5. 开发或运行代码时,执行 poetry shell 进入虚拟环境。
  6. 在虚拟环境中进行开发、测试和运行。

HATCH

准备

  • 新建项目

    hatch new "Hatch Demo"

  • 已有项目初始化

    hatch new --init

  • 虚拟环境

    1
    2
    python -m venv venv
    source venv/bin/activate
  • 创建hatch虚拟环境

    1
    hatch env create
  • pycharm配置

    虚拟环境需配置成 hatch shell提示的目录

  • 更新依赖

    • 运行 hatch shell命令
  • 查看环境信息

    1
    hatch env show --ascii
  • 格式化

    1
    hatch run lint:fmt

编译

  • 方式一:

    1
    2
    3
    python -m pip install --upgrade build
    python3 -m build
    # 一旦完成应该在 dist 目录下产生两个文件:

    方式二:

    1
    hatch build

代码格式化

  • 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[tool.hatch.envs.lint.scripts]
typing = "mypy --install-types --non-interactive {args:src/jarvex_auth tests}"
style = [
"ruff {args:.}",
"black --check --diff {args:.}",
]
fmt = [
"black {args:.}",
"ruff --fix {args:.}",
"style",
]
all = [
"style",
"typing",
]
  • 执行命令hatch run lint:fmt

Flit

安装依赖

1
flit install

Reference

语法

数据结构

列表list和元组tuple

1
2
3
4
5
6
7
l = [1, 2, 'hello', 'world'] # 列表中同时含有 int 和 string 类型的元素
l
[1, 2, 'hello', 'world']

tup = ('jason', 22) # 元组中同时含有 int 和 string 类型的元素
tup
('jason', 22)
  • 列表是动态的,长度可变,可以随意的增加、删减或改变元素。列表的存储空间略大于元组,性能略逊于元组。
  • 元组是静态的,长度大小固定,不可以对元素进行增加、删减或者改变操作。元组相对于列表更加轻量级,性能稍优。
  • set是空的时候,进行和其他非空set取交集,永远是空

字典和集合

1
2
3
4
5
6
7
8
9
10
11
d1 = {'name': 'jason', 'age': 20, 'gender': 'male'}
d2 = dict({'name': 'jason', 'age': 20, 'gender': 'male'})
d3 = dict([('name', 'jason'), ('age', 20), ('gender', 'male')])
d4 = dict(name='jason', age=20, gender='male')
d1 == d2 == d3 ==d4
True

s1 = {1, 2, 3}
s2 = set([1, 2, 3])
s1 == s2
True
  • 字典在 Python3.7+ 是有序的数据结构,而集合是无序的,其内部的哈希表存储结构,保证了其查找、插入、删除操作的高效性。所以,字典和集合通常运用在对元素的高效查找、去重等场景。

字符串

1
2
3
4
5
s1 = 'hello'
s2 = "hello"
s3 = """hello"""
s1 == s2 == s3
True
  • 转义字符
1
2
3
4
5
6
7
s = 'a\nb\tc'
print(s)
a
b c

len(s)
5
  • 常用方法

    • 和其他数据结构,如列表、元组一样,字符串的索引同样从 0 开始,index=0 表示第一个元素(字符),[index:index+2] 则表示第 index 个元素到 index+1 个元素组成的子字符串。

    • 字符串是不可变的(immutable)

    • 字符串格式化

      1
      2
      print('no data available for person with id: {}, name: {}'.format(id, name)) '''最新规范'''
      print('no data available for person with id: %s, name: %s' % (id, name))'''以往规范,%s 表示字符串型,%d 表示整型'''

json

  • json.dumps() 这个函数,接受 基本数据类型,然后将其序列化为 string
  • json.loads() 这个函数,接受一个合法字符串,然后将其反序列化为基本数据类型

条件与循环

条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# y = |x|
if x < 0:
y = -x
else:
y = x
# 场景二
if condition_1:
statement_1
elif condition_2:
statement_2
...
elif condition_i:
statement_i
else:
statement_n

循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 列表
l = [1, 2, 3, 4]
for item in l:
print(item)
1
2
3
4
# 字典
d = {'name': 'jason', 'dob': '2000-01-01', 'gender': 'male'}
for k in d: # 遍历字典的键
print(k)
name
dob
gender

for v in d.values(): # 遍历字典的值
print(v)
jason
2000-01-01
male

for k, v in d.items(): # 遍历字典的键值对
print('key: {}, value: {}'.format(k, v))
key: name, value: jason
key: dob, value: 2000-01-01
key: gender, value: male

while

1
2
3
4
5
6
7
8
9
10
11
12
while True:
try:
text = input('Please enter your questions, enter "q" to exit')
if text == 'q':
print('Exit system')
break
...
...
print(response)
except as err:
print('Encountered error: {}'.format(err))
break

异常

  • 当程序中存在多个 except block 时,最多只有一个 except block 会被执行。换句话说,如果多个 except 声明的异常类型都与实际相匹配,那么只有最前面的 except block 会被执行,其他则被忽略。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
try:
s = input('please enter two numbers separated by comma: ')
num1 = int(s.split(',')[0].strip())
num2 = int(s.split(',')[1].strip())
...
except ValueError as err:
print('Value Error: {}'.format(err))
except IndexError as err:
print('Index Error: {}'.format(err))
except:
print('Other error')

print('continue')
...
  • 无论发生什么情况,finally block 中的语句都会被执行,哪怕前面的 try 和 excep block 中使用了 return 语句。
1
2
3
4
5
6
7
8
9
10
import sys
try:
f = open('file.txt', 'r')
.... # some data processing
except OSError as err:
print('OS error: {}'.format(err))
except:
print('Unexpected error:', sys.exc_info()[0])
finally:
f.close()
  • 自定义异常,定义并实现了初始化函数和 str 函数(直接 print 时调用):
1
2
3
4
5
6
7
8
9
10
11
class MyInputError(Exception):
"""Exception raised when there're errors in input"""
def __init__(self, value): # 自定义异常类型的初始化
self.value = value
def __str__(self): # 自定义异常类型的 string 表达形式
return ("{} is invalid input".format(repr(self.value)))

try:
raise MyInputError(1) # 抛出 MyInputError 这个异常
except MyInputError as err:
print('error: {}'.format(err))

函数

1
2
3
def name(param1, param2, ..., paramN):
statements
return/yield value # optional

参数

  • 为了能让一个函数接受任意数量的位置参数,可以使用一个*参数

    1
    2
    3
    4
    def avg(first, *rest):
    ...
    # Sample use
    avg(1, 2)
  • 为了接受任意数量的关键字参数,使用一个以**开头的参数

    1
    2
    3
    def make_element(name, value, **attrs):
    ...
    make_element('item', 'Albatross', size='large', quantity=6)
    1
    2
    3
    def anyargs(*args, **kwargs):
    print(args) # A tuple
    print(kwargs) # A dict
  • 参数默认值不要设置 [],而是写成 None

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
      def spam(a, b=[]): # NO! ,后面如果多次调用spam方法,则b的值会一直传递使用
    ...

    ### 函数嵌套

    - 函数的嵌套能够保证内部函数的隐私。
    - 合理的使用函数嵌套,能够提高程序的运行效率

    ```python
    def f1():
    print('hello')
    def f2():
    print('world')
    f2()
    f1()

    # 输出
    hello
    world

函数变量

  • 局部变量:只在函数内部有效。一旦函数执行完毕,局部变量就会被回收

  • 全局变量:不能在函数内部随意改变全局变量的值,如果我们一定要在函数内部改变全局变量的值,就必须加上 global 这个声明

    1
    2
    3
    4
    5
    6
    7
    8
    MIN_VALUE = 1
    MAX_VALUE = 10
    def validation_check(value):
    global MIN_VALUE
    ...
    MIN_VALUE += 1
    ...
    validation_check(5)

闭包

1
2
3
4
5
6
7
8
9
10
11
12
13
def nth_power(exponent):
def exponent_of(base):
return base ** exponent
return exponent_of # 返回值是 exponent_of 函数

square = nth_power(2) # 计算一个数的平方
cube = nth_power(3) # 计算一个数的立方

print(square(2)) # 计算 2 的平方
print(cube(2)) # 计算 2 的立方
# 输出
4 # 2^2
8 # 2^3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def counter():
count = 0

def increment():
nonlocal count
count += 1
return count

return increment

# 创建一个计数器
c1 = counter()

# 调用计数器的 increment 函数,每次调用会使计数器值增加
print(c1()) # 输出: 1
print(c1()) # 输出: 2
print(c1()) # 输出: 3

由于闭包的存在,count 变量的状态会被保留下来,而不是每次调用 counter 函数都会重新初始化,这样就实现了一个持久的计数器功能。

  • 闭包的关键特点是:一个内部函数被外部函数返回,内部函数在执行时记住了外部函数的变量。它像一个”小盒子”,封闭并记住了外部变量,即使外部函数已经执行完毕。

  • 闭包的好处在于它可以将数据和功能打包在一起,形成一个独立的单元,可以在不同的地方重复使用,并且可以保持数据的状态。这种机制使得闭包在实现状态保持、回调函数、装饰器等方面非常有用。

  • 这种特性在实际编程中很有用,比如实现私有变量、缓存等。当然,过度使用也会影响代码可读性。总之,理解了闭包的工作原理,就能更好地掌握这个强大的工具。

匿名函数

  • lambda 是一个表达式(expression),并不是一个语句(statement)
  • lambda 的主体是只有一行的简单表达式,并不能扩展成一个多行的代码块
1
2
3
4
5
6
7
8
9
square = lambda x: x**2
square(3)
# 输出
9

# 等同于
def square(x):
return x**2
square(3)

函数式编程

  • 所谓函数式编程,是指代码中每一块都是不可变的(immutable),都由纯函数(pure function)的形式组成。这里的纯函数,是指函数本身相互独立、互不影响,对于相同的输入,总会有相同的输出,没有任何副作用

  • 主要提供了这么几个函数:map()、filter() 和 reduce(),通常结合匿名函数 lambda 一起使用

类,一群有着相同属性和函数的对象的集合。

OOP思想四要素: 类 对象 属性 函数

  • 类函数

  • 成员函数

  • 静态函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Document():

WELCOME_STR = 'Welcome! The context for this book is {}.'

def __init__(self, title, author, context):
print('init function called')
self.title = title
self.author = author
self.__context = context

# 类函数
@classmethod
def create_empty_book(cls, title, author):
return cls(title=title, author=author, context='nothing')

# 成员函数
def get_context_length(self):
return len(self.__context)

# 静态函数
@staticmethod
def get_welcome(context):
return Document.WELCOME_STR.format(context)


empty_book = Document.create_empty_book('What Every Man Thinks About Apart from Sex', 'Professor Sheridan Simove')


print(empty_book.get_context_length())
print(empty_book.get_welcome('indeed nothing'))

########## 输出 ##########

init function called
7
Welcome! The context for this book is indeed nothing.

构造函数

  • 每个类都有构造函数,继承类在生成对象的时候,是不会自动调用父类的构造函数的,因此你必须在 init() 函数中显式调用父类的构造函数。它们的执行顺序是 子类的构造函数 -> 父类的构造函数。

抽象函数/抽象类

  • 抽象类是一种特殊的类,它生下来就是作为父类存在的,一旦对象化就会报错。同样,抽象函数定义在抽象类之中,子类必须重写该函数才能使用。相应的抽象函数,则是使用装饰器 @abstractmethod 来表示。

  • 抽象类就是这么一种存在,它是一种自上而下的设计风范,你只需要用少量的代码描述清楚要做的事情,定义好接口,然后就可以交给不同开发人员去开发和对接。

装饰器

所谓的装饰器,其实就是通过装饰器函数,来修改原函数的一些功能,使得原函数不需要修改。

Decorators is to modify the behavior of the function through a wrapper so we don’t have to actually modify the function.

函数装饰器

  • 通常情况下,我们会把*args**kwargs,作为装饰器内部函数 wrapper() 的参数。*args**kwargs,表示接受任意数量和类型的参数,因此装饰器就可以写成下面的形式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
      def my_decorator(func):
    def wrapper(*args, **kwargs):
    print('wrapper of decorator')
    func(*args, **kwargs)
    return wrapper

    - 装饰器可以接受原函数任意类型和数量的参数,除此之外,它还可以接受自己定义的参数。

    ```python
    def repeat(num):
    def my_decorator(func):
    def wrapper(*args, **kwargs):
    for i in range(num):
    print('wrapper of decorator')
    func(*args, **kwargs)
    return wrapper
    return my_decorator

    @repeat(4)
    def greet(message):
    print(message)

    greet('hello world')

    # 输出:
    wrapper of decorator
    hello world
    wrapper of decorator
    hello world
    wrapper of decorator
    hello world
    wrapper of decorator
    hello world

类装饰器

  • 类装饰器主要依赖于函数__call_(),每当你调用一个类的示例时,函数__call__()就会被执行一次。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    class Count:
    def __init__(self, func):
    self.func = func
    self.num_calls = 0

    def __call__(self, *args, **kwargs):
    self.num_calls += 1
    print('num of calls is: {}'.format(self.num_calls))
    return self.func(*args, **kwargs)

    @Count
    def example():
    print("hello world")

    example()

    # 输出
    num of calls is: 1
    hello world

    example()

    # 输出
    num of calls is: 2
    hello world

装饰器的嵌套

  • 执行顺序从里到外

    1
    2
    3
    4
    5
    6
    7
    @decorator1
    @decorator2
    @decorator3
    def func():
    ...
    # 等同于
    decorator1(decorator2(decorator3(func)))

metaclass

协程(Asyncio)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))

async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
for task in tasks:
await task
'''
task遍历的另一种写法
await asyncio.gather(*tasks)
'''

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

########## 输出 ##########

crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4
Wall time: 3.99 s
  • 协程和多线程的区别,主要在于两点,一是协程为单线程;二是协程由用户决定,在哪些地方交出控制权,切换到下一个任务。
  • 协程的写法更加简洁清晰,把 async / await 语法和 create_task 结合来用,对于中小级别的并发需求已经毫无压力。
  • 写协程程序的时候,你的脑海中要有清晰的事件循环概念,知道程序在什么时候需要暂停、等待 I/O,什么时候需要一并执行到底。

数据隔离

  • 在协程中,通常会使用局部变量或协程上下文来存储数据,每个协程拥有自己的数据空间,不会受到其他协程的影响。协程之间可以通过参数传递或全局变量等方式来进行数据交互,但是数据的修改只会影响当前协程的数据空间,不会影响其他协程的数据。

concurrency

并发通常用于 I/O 操作频繁的场景,而并行则适用于 CPU heavy 的场景。

并发

在 Python 中,并发并不是指同一时刻有多个操作(thread、task)同时进行。相反,某个特定的时刻,它只允许有一个操作发生,只不过线程 / 任务之间会互相切换,直到完成

concurrency

图中出现了 thread 和 task 两种切换顺序的不同方式,分别对应 Python 中并发的两种形式——threading 和 asyncio。

futures实现并发

1
2
3
def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_one, sites)

虽然线程的数量可以自己定义,但是线程数并不是越多越好,因为线程的创建、维护和删除也会有一定的开销。所以如果你设置的很大,反而可能会导致速度变慢。我们往往需要根据实际的需求做一些测试,来寻找最优的线程数量。

多线程每次只能有一个线程执行的原因

事实上,Python 的解释器并不是线程安全的,为了解决由此带来的 race condition 等问题,Python 便引入了全局解释器锁,也就是同一时刻,只允许一个线程执行。当然,在执行 I/O 操作时,如果一个线程被 block 了,全局解释器锁便会被释放,从而让另一个线程能够继续执行。

并行

所谓的并行,指的是同一时刻、同时发生。Python 中的 multi-processing 便是这个意思

  • 并发通常应用于 I/O 操作频繁的场景,比如你要从网站上下载多个文件,I/O 操作的时间可能会比 CPU 运行处理的时间长得多。
  • 而并行则更多应用于 CPU heavy 的场景,比如 MapReduce 中的并行计算,为了加快运行速度,一般会用多台机器、多个处理器来完成。

futures实现并行

1
2
3
4
5
6
7
8
9
def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
to_do = []
for site in sites:
future = executor.submit(download_one, site)
to_do.append(future)

for future in concurrent.futures.as_completed(to_do):
future.result()

函数 ProcessPoolExecutor() 表示创建进程池,使用多个进程并行的执行程序。不过,这里我们通常省略参数 workers,因为系统会自动返回 CPU 的数量作为可以调用的进程数。

总结

1
2
3
4
5
6
7
8
# 伪代码
if io_bound:
if io_slow:
print('Use Asyncio')
else:
print('Use multi-threading')
else if cpu_bound:
print('Use multi-processing')

GIL

Global Interpreter Lock,即全局解释器锁

python引进GIL的原因

  • 一是设计者为了规避类似于内存管理这样的复杂的竞争风险问题(race condition);
  • 二是因为 CPython 大量使用 C 语言库,但大部分 C 语言库都不是原生线程安全的(线程安全会降低性能和增加复杂度)

工作机制

check interval

CPython 中还有另一个机制,叫做 check_interval,意思是 CPython 解释器会去轮询检查线程 GIL 的锁住情况。每隔一段时间,Python 解释器就会强制当前线程去释放 GIL,这样别的线程才能有执行的机会。

assert

assert 语句,可以说是一个 debug 的好工具,主要用于测试一个条件是否满足。如果测试的条件满足,则什么也不做,相当于执行了 pass 语句;如果测试条件不满足,便会抛出异常 AssertionError,并返回具体的错误信息(optional)

语法

1
assert_stmt ::=  "assert" expression ["," expression]

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
例子1
assert 1 == 2
相当于
if __debug__:
if not expression: raise AssertionError

例子2
assert 1 == 2, 'assertion is wrong'
相当于
if __debug__:
if not expression1: raise AssertionError(expression2)

例子3(促销价格大于0元):
def apply_discount(price, discount):
updated_price = price * (1 - discount)
assert 0 <= updated_price <= price, 'price should be greater or equal to 0 and less or equal to original price'
return updated_price
  • 不要在使用 assert 时加入括号,否则无论表达式对与错,assert 检查永远不会 fail

with

在 Python 中,解决资源泄露的方式是上下文管理器(context manager)。上下文管理器,能够帮助你自动分配并且释放资源,其中最典型的应用便是 with 语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
示例一:
for x in range(10000000):
with open('test.txt', 'w') as f:
f.write('hello')
等同于
f = open('test.txt', 'w')
try:
f.write('hello')
finally:
f.close()

示例二:
some_lock = threading.Lock()
with somelock:
...
等同于
some_lock = threading.Lock()
some_lock.acquire()
try:
...
finally:
some_lock.release()
  • 基于类的上下文管理器

    当我们用类来创建上下文管理器时,必须保证这个类包括方法”__enter__()”和方法“__exit__()”。其中,方法“__enter__()”返回需要被管理的资源,方法“__exit__()”里通常会存在一些释放、清理资源的操作,比如这个例子中的关闭文件等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class FileManager:
def __init__(self, name, mode):
print('calling __init__ method')
self.name = name
self.mode = mode
self.file = None

def __enter__(self):
print('calling __enter__ method')
self.file = open(self.name, self.mode)
return self.file


def __exit__(self, exc_type, exc_val, exc_tb):
print('calling __exit__ method')
if self.file:
self.file.close()
# 使用
with FileManager('test.txt', 'w') as f:
print('ready to write to file')
f.write('hello world')

  • 基于生成器的上下文管理器

使用装饰器 contextlib.contextmanager,来定义自己所需的基于生成器的上下文管理器,用以支持 with 语句

1
2
3
4
5
6
7
8
9
10
11
12
from contextlib import contextmanager

@contextmanager
def file_manager(name, mode):
try:
f = open(name, mode)
yield f
finally:
f.close()

with file_manager('test.txt', 'w') as f:
f.write('hello world')

性能调试

cProfile


用法 tips

  • 引用规范 from your_file import function_name, class_name

  • 定义函数时,所有非默认参数将在默认参数之前

    1
    2
    3
    def my_function(arg1, arg3, arg2="default"):
    # 函数实现
    pass
  • pass 是一个用于暂时占位或作为空占位符的关键字,它确保代码能够顺利通过语法检查而不做任何实际的操作。

  • 每个Python文件都有一个特殊的变量__name__。当一个Python文件被直接运行时,__name__的值被设置为'__main__'。当一个Python文件被导入到另一个文件中时,__name__的值被设置为该文件的名字,所以用if __name__ == '__main__'来避开 import 时执行。

  • 比较和拷贝

    • '=='操作符比较对象之间的值是否相等

    • 'is'操作符,相当于比较对象之间的 ID 是否相等

      对于整型数字来说,以上a is b为 True 的结论,只适用于 -5 到 256 范围内的数字

  • 值传递、引用传递

    • 变量的赋值,只是表示让变量指向了某个对象,并不表示拷贝对象给变量;而一个对象,可以被多个变量所指向。
    • 可变对象(列表,字典,集合等等)的改变,会影响所有指向该对象的变量。
    • 对于不可变对象(字符串,整型,元祖等等),所有指向该对象的变量的值总是一样的,也不会改变。但是通过某些操作(+= 等等)更新不可变对象的值时,会返回一个新的对象。
    • 变量可以被删除,但是对象无法被删除。
  • 容器是可迭代对象,可迭代对象调用 iter() 函数,可以得到一个迭代器。迭代器可以通过 next() 函数来得到下一个元素,从而支持遍历。

  • 生成器是一种特殊的迭代器(注意这个逻辑关系反之不成立)。使用生成器,你可以写出来更加清晰的代码;合理使用生成器,可以降低内存占用、优化程序结构、提高程序速度。

TIPS:

  1. Keep in mind that, in Ignite, the concepts of a SQL table and a key-value cache are two equivalent representations of the same (internal) data structure. You can access your data using either the key-value API or SQL statements, or both.

  2. A cache is a collection of key-value pairs that can be accessed through the key-value API. A SQL table in Ignite corresponds to the notion of tables in traditional RDBMSs with some additional constraints; for example, each SQL table must have a primary key.

    A table with a primary key can be presented as a key-value cache, in which the primary key column serves as the key, and the rest of the table columns represent the fields of the object (the value).

登录

1
$IGNITE_HOME/bin/sqlline.sh --verbose=true -u jdbc:ignite:thin://ip地址/PUBLIC

已验证 kafka 2.3.1

此方案可以动态创建用户,或修改用户账号信息

SASL(Simple Authentication and Security Layer)

参考 https://kafka.apache.org/documentation/#security_sasl_scram

服务端

环境 zookeeper端口22181 ,kafka broker端口39092

无需重启zookeeper

第一步:创建 SCRAM 证书

在broker启动之前

  • 创建admin用户证书

    启动之前(必须)

    1
    bin/kafka-configs.sh --zookeeper 172.20.58.93:22181 --alter --add-config 'SCRAM-SHA-256=[password=datacanvas],SCRAM-SHA-512=[password=datacanvas]' --entity-type users --entity-name admin

    会在 zookeeper生产目录 config,上面zookeeper参数值与kafka server.properties的zookeeper connect配置一致,也和offsetExplorer的chroot path一致

  • 列出用户已有证书

    1
    bin/kafka-configs.sh --zookeeper 172.20.58.93:22181 --describe --entity-type users --entity-name alice
  • 删除用户证书

    1
    bin/kafka-configs.sh --zookeeper 172.20.58.93:22181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
  • 创建普通用户alice证书

    可在启动之前或启动之后皆可

    1
    bin/kafka-configs.sh --zookeeper 172.20.58.93:22181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' --entity-type users --entity-name alice

第一步:准备kafka_server_jaas.conf文件

1
2
3
4
5
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="datacanvas";
};

第二步:设置kafka-server-start.sh

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/sasl/kafka_2.13-3.3.2/config/kafka_server_jaas.conf"
  • 注意修改路径

第三步:设置server.properties

1
2
3
4
5
listeners=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:29092
advertised.listeners=PLAINTEXT://172.20.58.93:9092,SASL_PLAINTEXT://172.20.58.93:29092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
  • listeners设置了两种协议 明文连接PLAINTEXT 和 安全连接 SASL_SSL

    • 0.0.0.0 表示 Kafka Broker 将监听所有可用的网络接口,这意味着它将接受来自任何 IP 地址的连接请求。
  • 注意zookeeer存储的位置 (/brokers)

  • 设置默认副本数 default.replication.factor=3 和 num.partitions=1

  • 测试使用log.dirs=/home/sasl/data/kafka-logs

  • ACLs相关配置

1
2
3
4
5
# authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 旧版本配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 这里添加ANONYMOUS为超级用户,主要为了listener for plain(如何只用sasl,可以不配置ANONYMOUS)
super.users=User:admin;User:ANONYMOUS
allow.everyone.if.no.acl.found=false

默认为true,默认情况只通过用户密码认证管控用户,acl只会对–deny-principal起效(所以默认同时使用 plain和scram,需要保持默认true。如果单独使用scram,则需要设置为false)

第四步:设置acl-config.properties

在config目录新增acl-config.properties设置 admin信息

1
2
3
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="datacanvas";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

第五步:启动kafka server

1
bin/kafka-server-start.sh -daemon ./config/server.properties

第六步:配置ACL授权

  • 授权bigdata用户可以访问主题前缀为ODS的数据,且限制消费组 GROUP-BIGDATA

    1
    bin/kafka-acls.sh --bootstrap-server 172.20.58.93:29092 --command-config /path/to/config/acl-config.properties --add --allow-principal User:bigdata --operation Read --group GROUP-BIGDATA --topic ODS --resource-pattern-type prefixed
    • –resource-pattern-type prefixed 指定ODS前缀
  • 移除权限

    1
    bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /home/sasl/kafka_2.13-3.3.2/config/acl-config.properties --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic 
  • 禁止删除指定主题的权限

    1
    bin/kafka-acls.sh --bootstrap-server 172.20.58.93:29092 --command-config /home/sasl/kafka_2.13-3.3.2/config/acl-config.properties --add --deny-principal User:bigdata --operation Write --operation Delete --topic ODS --resource-pattern-type prefixed

客户端

  • PLAIN连接保持原先操作

  • SASL_PLAINTEXT

    连接配置添加用户登录信息

    1
    org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="datacanvas";

已验证Kafka 3.3.2

此方案的缺点,在sasl_plaintext模式下,不能动态创建用户,或修改用户账号信息

优点是,无需在zookeeper上配置jaas

服务端

第一步:准备kafka_server_jaas.conf文件

1
2
3
4
5
6
7
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="datacanvas"
user_admin="datacanvas"
user_qlb="qlbrtdsp";
};

第二步:设置kafka-server-start.sh

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/sasl/kafka_2.13-3.3.2/config/kafka_server_jaas.conf"
  • 注意修改路径

第三步:设置server.properties

1
2
3
4
5
listeners=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:29092
advertised.listeners=PLAINTEXT://172.20.58.93:9092,SASL_PLAINTEXT://172.20.58.93:29092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
  • listeners设置了两种协议 明文连接PLAINTEXT 和 安全连接 SASL_PLAINTEXT

    • 0.0.0.0 表示 Kafka Broker 将监听所有可用的网络接口,这意味着它将接受来自任何 IP 地址的连接请求。
  • 注意zookeeer存储的位置 (/brokers)

  • 设置默认副本数 default.replication.factor=3 和 num.partitions=1

  • 测试使用log.dirs=/home/sasl/data/kafka-logs

1
2
3
4
# authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 旧版本配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin;User:ANONYMOUS
allow.everyone.if.no.acl.found=false

第四步:设置acl-config.properties

在config目录新增acl-config.properties设置 admin信息

1
2
3
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="datacanvas";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

第五步:启动kafka server

1
bin/kafka-server-start.sh -daemon ./config/server.properties

第六步:配置ACL授权

  • 授权bigdata用户可以访问主题前缀为ODS的数据,且限制消费组 GROUP-BIGDATA

    1
    bin/kafka-acls.sh --bootstrap-server 172.20.58.93:29092 --command-config /home/sasl/kafka_2.13-3.3.2/config/acl-config.properties --add --allow-principal User:bigdata --operation Read --topic ODS --group GROUP-BIGDATA --resource-pattern-type prefixed
  • 禁止指定用户写入、删除主题前缀为ODS的数据

    1
    bin/kafka-acls.sh --bootstrap-server 172.20.58.93:29092 --command-config /home/sasl/kafka_2.13-3.3.2/config/acl-config.properties --add --deny-principal User:bigdata --operation Write --operation Delete --topic ODS --resource-pattern-type prefixed

客户端

  • PLAINTEXT连接保持原先操作

  • SASL_PLAINTEXT

    连接配置添加用户登录信息

    1
    org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="datacanvas";