flowchart TD
A["QuerySet"] --> B["Query"]
B --> C["SQLCompiler"]
C --> D["SQL String"]
subgraph "Query 对象"
E["模型信息"]
F["WHERE 条件"]
G["SELECT 字段"]
H["JOIN 信息"]
I["ORDER BY"]
J["GROUP BY"]
end
subgraph "SQLCompiler"
K["as_sql"]
L["get_select"]
M["get_from_clause"]
N["compile"]
end
B --> E
B --> F
B --> G
B --> H
B --> I
B --> J
C --> K
C --> L
C --> M
C --> N
style A fill:#e1f5fe
style B fill:#f3e5f5
style C fill:#e8f5e8
style D fill:#fff3e0
defadd_q(self, q_object): """添加 Q 对象到查询中""" # 将 Q 对象解析为 WHERE 条件 clause, _ = self._add_q(q_object, self.used_aliases) if clause: self.where.add(clause, AND)
添加关联查询
1 2 3 4 5 6 7 8 9
defadd_select_related(self, fields): """添加 select_related 字段""" # 设置预加载标志 field_dict = {} for field in fields: d = field_dict for part in field.split(LOOKUP_SEP): d = d.setdefault(part, {}) self.select_related = field_dict
# 7. 构建 SELECT 子句 out_cols = [] col_idx = 1 for _, (s_sql, s_params), alias inself.select + extra_select: if alias: s_sql = '%s AS %s' % (s_sql, self.connection.ops.quote_name(alias)) elif with_col_aliases: s_sql = '%s AS %s' % (s_sql, 'Col%d' % col_idx) col_idx += 1 params.extend(s_params) out_cols.append(s_sql)
result.append(', '.join(out_cols))
# 8. 添加 FROM 子句 result.append('FROM') result.extend(from_) params.extend(f_params)
# 9. 添加 WHERE 子句 if where: result.append('WHERE %s' % where) params.extend(w_params)
# 10. 添加 GROUP BY 子句 grouping = [] for g_sql, g_params in group_by: grouping.append(g_sql) params.extend(g_params) if grouping: result.append('GROUP BY %s' % ', '.join(grouping))
# 11. 添加 HAVING 子句 if having: result.append('HAVING %s' % having) params.extend(h_params)
# 12. 添加 ORDER BY 子句 if order_by: ordering = [] for _, (o_sql, o_params, _) in order_by: ordering.append(o_sql) params.extend(o_params) result.append('ORDER BY %s' % ', '.join(ordering))
flowchart TD
A["QuerySet 方法调用"] --> B["修改 Query 对象"]
B --> C["触发求值操作"]
C --> D["get_compiler()"]
D --> E["pre_sql_setup()"]
E --> F["setup_query()"]
E --> G["get_order_by()"]
E --> H["split_having()"]
E --> I["get_extra_select()"]
E --> J["get_group_by()"]
F --> K["get_select()"]
K --> L["get_default_columns()"]
K --> M["get_related_selections()"]
K --> N["处理注解字段"]
G --> O["解析排序字段"]
O --> P["处理表达式排序"]
subgraph "as_sql() 核心方法"
Q["get_from_clause()"]
R["编译 WHERE 条件"]
S["构建 SELECT 子句"]
T["构建 FROM 子句"]
U["构建 WHERE 子句"]
V["构建 GROUP BY 子句"]
W["构建 HAVING 子句"]
X["构建 ORDER BY 子句"]
Y["构建 LIMIT/OFFSET 子句"]
end
F --> Q
R --> U
S --> T
T --> U
U --> V
V --> W
W --> X
X --> Y
Y --> Z["最终 SQL 字符串"]
style A fill:#e1f5fe
style Z fill:#e8f5e8
style Q fill:#fff3e0
style R fill:#fff3e0
style S fill:#fff3e0
5.2 关键步骤详解
步骤1:setup_query()
1 2 3 4 5 6 7 8 9
defsetup_query(self): """设置查询的基本信息""" # 确保至少有一个表别名 ifall(self.query.alias_refcount[a] == 0for a inself.query.tables): self.query.get_initial_alias()
SELECT auth_user.department, COUNT(auth_user.id) AS count, AVG(auth_user.age) AS avg_age FROM auth_user GROUPBY auth_user.department HAVINGCOUNT(auth_user.id) >%s
# ❌ 避免:N+1 查询问题 posts = Post.objects.all() for post in posts: print(post.author.name) # 每次都查询数据库
# ✅ 推荐:使用 select_related (ForeignKey, OneToOne) posts = Post.objects.select_related('author', 'category') for post in posts: print(post.author.name) # 只查询一次数据库
# ✅ 推荐:使用 prefetch_related (ManyToMany, 反向ForeignKey) authors = Author.objects.prefetch_related('posts') for author in authors: for post in author.posts.all(): print(post.title)
# ✅ 推荐:使用 iterator() 处理大数据集 for user in User.objects.all().iterator(chunk_size=1000): process_user(user)
# ✅ 推荐:使用 values() 减少内存占用 for user_data in User.objects.values('id', 'name').iterator(): process_user_data(user_data)
# ✅ 推荐:分页处理大量数据 defprocess_users_in_batches(batch_size=1000): users = User.objects.all() for i inrange(0, users.count(), batch_size): batch = users[i:i + batch_size] for user in batch: process_user(user)
# ✅ 推荐:使用 select_for_update 避免并发问题 with transaction.atomic(): user = User.objects.select_for_update().get(pk=user_id) user.balance -= amount user.save()
# ✅ 推荐:使用 raw SQL 处理复杂查询 users = User.objects.raw(""" SELECT u.*, COUNT(p.id) as post_count FROM auth_user u LEFT JOIN blog_post p ON u.id = p.author_id GROUP BY u.id HAVING COUNT(p.id) > %s """, [5])
# ✅ 推荐:使用数据库函数 from django.db.models import F, Value from django.db.models.functions import Concat
for i, query inenumerate(self.queries, 1): print(f"查询 {i}: {query['time']}s") print(f"SQL: {query['sql']}") print("-" * 50)
# 使用示例 with QueryAnalyzer() as analyzer: users = list(User.objects.select_related('profile').all())
analyzer.report()
9.4 常见反模式及解决方案
1. 避免在循环中查询
1 2 3 4 5 6 7 8 9 10
# ❌ 反模式:在循环中查询 posts = Post.objects.all() for post in posts: author = User.objects.get(id=post.author_id) # N+1 查询 print(f"{post.title} by {author.name}")
# ✅ 解决方案:使用 select_related posts = Post.objects.select_related('author').all() for post in posts: print(f"{post.title} by {post.author.name}")
flowchart LR
A["QuerySet API"] --> B["Query 对象<br/>抽象语法树"]
B --> C["SQLCompiler<br/>代码生成器"]
C --> D["SQL 字符串<br/>目标代码"]
style A fill:#e1f5fe
style B fill:#f3e5f5
style C fill:#e8f5e8
style D fill:#fff3e0