LLM-Agent场景题:贷款审批

摘要

本方案提出一个基于大型语言模型(LLM)和检索增强生成(RAG)的小企业贷款风险评估智能系统,通过多智能体(Multi-Agent)协作架构,实现贷款申请的自动化分析、风险评估和决策建议生成。系统整合传统信用评分模型与先进AI技术,显著提升贷款决策的效率、准确性和透明度,同时确保合规性和公平性。

一、系统架构概览

核心架构组件

多Agent协作框架:基于LangGraph构建的协作网络,实现不同功能智能体间的有序交互
RAG知识库系统:多层级、多向量的知识库架构,支持领域专业知识检索与应用
数据处理管道:处理结构化与非结构化金融数据的流水线系统
风险评估引擎:融合传统信用模型与LLM分析能力的混合风险评估机制
决策生成与解释系统:生成可解释的贷款决策建议,提供透明的评估依据

技术栈选择

±------------------±------------------±-----------------+
| 功能层 | 技术选择 | 备注 |
±------------------±------------------±-----------------+
| 框架基础 | LangChain | 灵活组合AI组件 |
| | LangGraph | 多Agent协作流编排 |
±------------------±------------------±-----------------+
| 基础模型 | Azure OpenAI | 企业级合规部署 |
| | Qwen-Max | 中文处理能力优秀 |
±------------------±------------------±-----------------+
| 向量数据库 | Pinecone/Qdrant | 支持混合检索策略 |
±------------------±------------------±-----------------+
| 嵌入模型 | BGE-Large-zh | 优秀的中文嵌入效果 |
±------------------±------------------±-----------------+
| 数据处理 | LangChain Doc Proc| 文档处理工具包 |
| | Unstructured | 非结构化数据解析 |
±------------------±------------------±-----------------+
| 监控与追踪 | LangSmith | 全流程监控追踪 |
±------------------±------------------±-----------------+

二、多Agent协作系统详解

Agent网络架构

系统采用LangGraph构建有向无环图(DAG)形式的Agent协作网络,实现信息流动和决策流程的可控性:

各Agent功能定义

协调Agent (Coordinator)
统筹整个评估流程,分发任务,整合各Agent输出
处理异常情况和决策冲突
维护评估流程的状态和进度

数据获取Agent (Data Acquisition)
从贷款申请表和相关文件中提取关键信息
识别缺失数据并提出补充需求
将提取的数据标准化为系统内部格式

信用评估Agent (Credit Analysis)
分析信用报告和历史记录
评估借款人的信用评分和趋势
识别信用记录中的异常模式和风险信号

财务分析Agent (Financial Analysis)
分析企业财务报表和现金流状况
计算关键财务比率和健康指标
评估企业的盈利能力和财务稳定性

市场分析Agent (Market Analysis)
评估企业所在行业的发展趋势和市场环境
分析宏观经济因素对企业的影响
考量行业竞争格局和企业市场定位

反欺诈Agent (Fraud Detection)
识别可疑的交易模式和异常数据
验证提供信息的一致性和真实性
生成欺诈风险评分和警报

合规检查Agent (Compliance)
确保评估流程符合相关法规和政策
检查决策过程中的偏见和歧视因素
生成合规性报告

决策合成Agent (Decision Synthesis)
整合各Agent的分析结果
生成最终风险评估报告和决策建议
提供决策依据和解释

沟通Agent (Communication)
生成客户友好的报告和反馈
准备内部评审文档
处理后续交互和问题回答

Agent协作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
graph TD
A[申请接收] --> B[协调Agent初始化]
B --> C[数据获取Agent]
C --> D[数据验证与预处理]
D --> E{数据完整性检查}
E -->|不完整| F[请求补充数据]
F --> C
E -->|完整| G[并行分析启动]
G --> H1[信用评估Agent]
G --> H2[财务分析Agent]
G --> H3[市场分析Agent]
G --> H4[反欺诈Agent]
H1 --> I[分析结果汇总]
H2 --> I
H3 --> I
H4 --> I
I --> J[合规检查Agent]
J --> K[决策合成Agent]
K --> L[风险评估报告]
L --> M[沟通Agent]
M --> N[生成反馈与解释]

三、RAG系统设计

知识库架构

系统采用多层级知识库架构,满足不同类型知识检索需求:
一级知识库:行业与政策知识
贷款政策法规条例
行业分析报告与指标标准
经济环境与市场趋势分析

二级知识库:风险评估指南
信用风险评估方法论
财务风险评估标准与指标
风险评分模型与解释文档
案例库与决策先例

三级知识库:操作指南
Agent操作流程和标准
异常处理与上报规则
数据解释与处理指南

索引与检索策略

混合检索技术
结合关键词检索(BM25)与向量检索的混合方法
对索引数据采用动态alpha值调整策略(0.2-0.8范围)
基于查询内容智能选择检索策略

多向量检索
为每个文档创建多种表示向量:
原始文档向量
摘要向量
假设问题向量(HyDE)
支持查询扩展与多路检索

重排序机制
采用BGE-reranker-large对初始检索结果重排
结合内容相关度与权威性的评分机制
支持基于上下文的智能过滤

RAG优化技术

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+------------------+----------------------+---------------------+
| 优化方向 | 采用技术 | 预期效果 |
+------------------+----------------------+---------------------+
| 查询理解 | 步退提示(Step Back) | 提高抽象理解能力 |
| | 多查询扩展 | 提高覆盖范围 |
+------------------+----------------------+---------------------+
| 检索质量 | 混合检索 | 平衡精确与相关性 |
| | 上下文感知重排序 | 提高指令的相关度 |
+------------------+----------------------+---------------------+
| 文档处理 | 递归分块策略 | 保持语义完整性 |
| | 句子窗口检索 | 丰富上下文环境 |
+------------------+----------------------+---------------------+
| 知识整合 | LLM引导的知识合成 | 减少知识冗余 |
| | 摘要压缩 | 降低内容复杂度 |
+------------------+----------------------+---------------------+
| 偏见控制 | 多模型交叉验证 | 减少单一模型偏见 |
| | 引导Prompt设计 | 控制偏见传播 |
+------------------+----------------------+---------------------+

四、数据处理流程

总体架构

基于RAG技术和多Agent协作的数据预处理流程,主要分为四个主要阶段:数据接入、数据解析与标准化、特征提取与增强、数据存储与索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
digraph PreprocessingFlow {
rankdir=TB;
node [shape=box, style=filled, color=skyblue];

subgraph cluster_0 {
label="数据接入层";
style=filled;
color=lightgrey;
node [style=filled, color=white];

DataIngestion [label="多源异构数据接入"];
ValidityCheck [label="数据有效性检查"];
DataParallelize [label="数据并行化处理"];
}

subgraph cluster_1 {
label="数据解析与标准化层";
style=filled;
color=lightcyan;
node [style=filled, color=white];

StructuredDataParsing [label="结构化数据解析"];
UnstructuredTextParsing [label="非结构化文本解析"];
DataCleansing [label="数据清洗"];
DataNormalization [label="数据标准化"];
}

subgraph cluster_2 {
label="特征提取与增强层";
style=filled;
color=lightsalmon;
node [style=filled, color=white];

FeatureExtraction [label="金融特征提取"];
TimeSeriesAnalysis [label="时间序列分析"];
TextualFeatures [label="文本特征提取"];
DataEnrichment [label="数据增强"];
}

subgraph cluster_3 {
label="数据存储与索引层";
style=filled;
color=lightgreen;
node [style=filled, color=white];

DataChunking [label="自适应分块"];
VectorEmbedding [label="向量嵌入"];
MetadataGeneration [label="元数据生成"];
VectorStorage [label="向量数据存储"];
}

// 连接
DataIngestion -> ValidityCheck -> DataParallelize;

DataParallelize -> StructuredDataParsing;
DataParallelize -> UnstructuredTextParsing;

StructuredDataParsing -> DataCleansing;
UnstructuredTextParsing -> DataCleansing;
DataCleansing -> DataNormalization;

DataNormalization -> FeatureExtraction;
DataNormalization -> TimeSeriesAnalysis;
DataNormalization -> TextualFeatures;
FeatureExtraction -> DataEnrichment;
TimeSeriesAnalysis -> DataEnrichment;
TextualFeatures -> DataEnrichment;

DataEnrichment -> DataChunking;
DataChunking -> VectorEmbedding;
VectorEmbedding -> MetadataGeneration;
MetadataGeneration -> VectorStorage;
}

1.1 多源异构数据接入

个人消费记录接入
• 支持多种格式接入:CSV、Excel、JSON、银行API导出文件
• 自动检测分隔符和编码格式
• 数据源元信息记录:来源、更新时间、数据版本
商家经营数据接入
• 支持标准财务报表格式:资产负债表、利润表、现金流量表
• 行业分类自动标记
• 支持结构化数据库接口和API集成
亲友信息表格接入
• 严格权限控制与访问记录
• 数据来源合法性验证
• 敏感数据自动识别和初步脱敏标记
专家案例文本接入
• 支持PDF、Word、HTML等格式
• 基于文档结构的自动分段
• OCR处理图片中嵌入的文本信息

1.2 数据有效性检查

数据完整性验证
• 必填字段检查:确保核心数据字段存在
• 数据量验证:确保数据量满足最低分析要求
• 日期范围验证:确保数据的时效性
格式有效性检查
• 数值型字段格式验证
• 日期格式标准化检查
• 文本编码一致性检查
业务规则验证
• 金额范围合理性检查
• 交易频率合理性检查
• 基本业务逻辑验证

1.3 数据并行化处理

数据分片策略
• 基于数据量自适应分片
• 时间序列数据的时间窗口分片
• 文本数据的自然段落分片
分布式处理框架
• 支持Spark/Dask等分布式计算框架
• 任务优先级调度
• 处理进度监控与失败重试机制

2.1 结构化数据解析

2.1.1 个人消费记录解析

数据结构化处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def parse_consumption_record(data_source):
# 自动检测文件格式和分隔符
file_format = detect_file_format(data_source)

# 根据格式选择解析器
if file_format == 'csv':
data = pd.read_csv(data_source, parse_dates=['transaction_date'])
elif file_format == 'excel':
data = pd.read_excel(data_source, parse_dates=['transaction_date'])
elif file_format == 'json':
data = pd.read_json(data_source)
data['transaction_date'] = pd.to_datetime(data['transaction_date'])

# 标准化列名
data = standardize_column_names(data, consumption_schema)

# 结构验证
validate_consumption_structure(data)

return data

字段标准化映射

1
2
3
4
5
6
7
8
9
10
标准字段映射 = {
'交易日期/时间': 'transaction_date',
'消费金额': 'amount',
'交易类型': 'transaction_type',
'商户名称': 'merchant_name',
'消费类别': 'category',
'支付方式': 'payment_method',
'交易状态': 'status',
'交易位置': 'location'
}

2.1.2 商家经营数据解析

财务数据标准化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def parse_business_data(data_source):
# 识别财务报表类型
report_type = identify_financial_report_type(data_source)

# 根据报表类型应用相应解析逻辑
if report_type == 'balance_sheet':
data = parse_balance_sheet(data_source)
elif report_type == 'income_statement':
data = parse_income_statement(data_source)
elif report_type == 'cash_flow':
data = parse_cash_flow_statement(data_source)
else:
data = parse_general_business_data(data_source)

# 财务指标计算与添加
data = calculate_financial_ratios(data)

# 行业标准对标
data = add_industry_benchmarks(data)

return data

行业代码映射
• 建立标准行业分类代码映射表
• 对接国家统计局行业分类标准
• 支持多级行业代码自动识别与分配

2.1.3 亲友信息解析

关系数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def parse_relatives_data(data_source):
# 加载数据并进行初步脱敏
data = load_sensitive_data(data_source, encryption_keys)

# 合规性检查
compliance_status = check_compliance_requirements(data)
if not compliance_status.is_compliant:
log_compliance_issue(compliance_status)
data = apply_compliance_restrictions(data, compliance_status)

# 构建关系图谱结构
relationship_graph = build_relationship_graph(data)

# 关系分类与标记
tagged_relationships = tag_relationships(relationship_graph)

return {
'data': data,
'compliance_status': compliance_status,
'relationship_graph': relationship_graph,
'tagged_relationships': tagged_relationships
}

隐私保护措施
• 个人身份信息(PII)识别与脱敏
• 基于角色的访问控制标记
• 数据使用目的限制标签

2.2 非结构化文本解析

2.2.1 专家案例文本解析

文档结构化处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def parse_expert_case(document_path):
# 文档格式检测与转换
doc_format = detect_document_format(document_path)
text_content = convert_to_text(document_path, doc_format)

# 文档结构识别
doc_structure = identify_document_structure(text_content)

# 分段提取
sections = extract_document_sections(text_content, doc_structure)

# 关键信息提取
key_information = extract_key_information(sections)

# 实体识别
entities = extract_named_entities(text_content)

# 文档摘要生成
summary = generate_document_summary(text_content)

return {
'full_text': text_content,
'sections': sections,
'key_information': key_information,
'entities': entities,
'summary': summary
}

文本结构识别规则
• 标题层级识别规则
• 案例分隔标记识别
• 关键分析部分识别模式

2.3 数据清洗

通用数据清洗策略
• 缺失值处理:基于业务规则和统计推断
• 异常值检测与处理:基于Z-分数、IQR或领域规则
• 重复数据检测与处理
• 数据一致性检查与修正
领域特定清洗规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def clean_financial_data(data):
# 处理金融数据中的常见问题

# 1. 负值检查与处理(例如:某些字段不应有负值)
data = process_negative_values(data)

# 2. 金额单位一致性检查
data = standardize_monetary_units(data)

# 3. 处理极端值/异常值
data = handle_financial_outliers(data)

# 4. 处理缺失的财务指标
data = impute_missing_financial_indicators(data)

return data

文本数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def clean_text_data(text_data):
# 1. 删除HTML/XML标签
cleaned_text = remove_html_tags(text_data)

# 2. 标准化空白字符
cleaned_text = normalize_whitespace(cleaned_text)

# 3. 处理特殊字符
cleaned_text = handle_special_characters(cleaned_text)

# 4. 修正常见错误拼写
cleaned_text = correct_common_misspellings(cleaned_text)

# 5. 删除非信息性内容(如页眉页脚)
cleaned_text = remove_non_informative_content(cleaned_text)

return cleaned_text

2.4 数据标准化

数值型数据标准化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def normalize_numerical_data(dataframe, normalization_type='z-score'):
normalized_df = dataframe.copy()

numerical_columns = dataframe.select_dtypes(include=['int64', 'float64']).columns

if normalization_type == 'z-score':
# Z-score标准化
for col in numerical_columns:
mean = dataframe[col].mean()
std = dataframe[col].std()
normalized_df[col] = (dataframe[col] - mean) / std

elif normalization_type == 'min-max':
# Min-Max标准化
for col in numerical_columns:
min_val = dataframe[col].min()
max_val = dataframe[col].max()
normalized_df[col] = (dataframe[col] - min_val) / (max_val - min_val)

elif normalization_type == 'robust':
# 鲁棒性标准化,使用中位数和四分位距
for col in numerical_columns:
median = dataframe[col].median()
q1 = dataframe[col].quantile(0.25)
q3 = dataframe[col].quantile(0.75)
iqr = q3 - q1
normalized_df[col] = (dataframe[col] - median) / iqr

return normalized_df

类别型数据标准化
• 统一编码方案设计(如消费类别、交易类型)
• 层级类别标准化(如行业分类)
• 自定义编码映射表维护与更新机制
时间数据标准化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def normalize_datetime_data(df, datetime_cols):
df_normalized = df.copy()

for col in datetime_cols:
# 转换为标准datetime格式
df_normalized[col] = pd.to_datetime(df[col], errors='coerce')

# 创建时间特征
df_normalized[f'{col}_year'] = df_normalized[col].dt.year
df_normalized[f'{col}_month'] = df_normalized[col].dt.month
df_normalized[f'{col}_day'] = df_normalized[col].dt.day
df_normalized[f'{col}_dayofweek'] = df_normalized[col].dt.dayofweek
df_normalized[f'{col}_quarter'] = df_normalized[col].dt.quarter

# 处理季节性
df_normalized[f'{col}_is_weekend'] = df_normalized[col].dt.dayofweek >= 5
df_normalized[f'{col}_is_month_start'] = df_normalized[col].dt.is_month_start
df_normalized[f'{col}_is_month_end'] = df_normalized[col].dt.is_month_end

return df_normalized

3. 特征提取与增强层

3.1 金融特征提取

个人消费特征提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def extract_consumption_features(consumption_data):
features = {}

# 基本消费统计
features['total_spending'] = consumption_data['amount'].sum()
features['avg_transaction_amount'] = consumption_data['amount'].mean()
features['max_transaction_amount'] = consumption_data['amount'].max()
features['transaction_count'] = len(consumption_data)

# 消费类别分布
category_distribution = consumption_data.groupby('category')['amount'].sum()
features['category_distribution'] = category_distribution.to_dict()

# 消费频率特征
date_range = (consumption_data['transaction_date'].max() -
consumption_data['transaction_date'].min()).days
features['transaction_frequency'] = len(consumption_data) / max(date_range, 1)

# 必要消费比例 (假设已定义必要消费类别)
essential_categories = ['groceries', 'utilities', 'healthcare', 'housing']
essential_spending = consumption_data[consumption_data['category'].isin(
essential_categories)]['amount'].sum()
features['essential_spending_ratio'] = essential_spending / features['total_spending']

# 消费波动性
features['spending_volatility'] = consumption_data.groupby(
consumption_data['transaction_date'].dt.month)['amount'].sum().std()

# 周期性消费模式
monthly_spending = consumption_data.groupby(
consumption_data['transaction_date'].dt.month)['amount'].sum()
features['monthly_spending_pattern'] = monthly_spending.to_dict()

return features

商家经营特征提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def extract_business_financial_features(business_data):
features = {}

# 盈利能力指标
features['profit_margin'] = business_data['net_income'] / business_data['revenue']
features['return_on_assets'] = business_data['net_income'] / business_data['total_assets']
features['gross_margin'] = (business_data['revenue'] - business_data['cost_of_goods_sold']) / business_data['revenue']

# 流动性指标
features['current_ratio'] = business_data['current_assets'] / business_data['current_liabilities']
features['quick_ratio'] = (business_data['current_assets'] - business_data['inventory']) / business_data['current_liabilities']

# 运营效率指标
features['inventory_turnover'] = business_data['cost_of_goods_sold'] / business_data['average_inventory']
features['asset_turnover'] = business_data['revenue'] / business_data['total_assets']

# 偿债能力指标
features['debt_to_equity'] = business_data['total_liabilities'] / business_data['total_equity']
features['interest_coverage'] = business_data['ebit'] / business_data['interest_expense']

# 增长指标
features['revenue_growth'] = calculate_growth_rate(business_data['revenue'])
features['profit_growth'] = calculate_growth_rate(business_data['net_income'])

# 现金流指标
features['operating_cash_flow_ratio'] = business_data['operating_cash_flow'] / business_data['current_liabilities']
features['cash_flow_to_debt'] = business_data['operating_cash_flow'] / business_data['total_debt']

return features

3.2 时间序列分析

时间序列分解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def decompose_time_series(time_series_data, date_column, value_column):
# 确保日期索引
ts_data = time_series_data.set_index(date_column)[value_column]

# 处理季节性
from statsmodels.tsa.seasonal import seasonal_decompose

# 尝试进行季节性分解
try:
# 周期设置需要根据数据频率调整
result = seasonal_decompose(ts_data, model='additive')

decomposition = {
'trend': result.trend,
'seasonal': result.seasonal,
'residual': result.resid,
'observed': result.observed
}
except:
# 如果数据不足或不适合季节性分解,退回到基本统计
decomposition = {
'trend': calculate_rolling_average(ts_data),
'volatility': calculate_volatility(ts_data),
'observed': ts_data
}

return decomposition

消费模式检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def detect_consumption_patterns(consumption_data):
patterns = {}

# 按日期聚合消费
daily_spending = consumption_data.groupby(consumption_data['transaction_date'].dt.date)['amount'].sum()

# 检测周期性模式
from statsmodels.tsa.stattools import acf

# 自相关分析
acf_values = acf(daily_spending, nlags=30)
significant_lags = [i for i, value in enumerate(acf_values) if abs(value) > 0.2]

if 7 in significant_lags:
patterns['weekly_pattern'] = True

if any(lag in significant_lags for lag in [28, 29, 30, 31]):
patterns['monthly_pattern'] = True

# 异常消费检测
mean_spending = daily_spending.mean()
std_spending = daily_spending.std()

anomaly_days = daily_spending[abs(daily_spending - mean_spending) > 2 * std_spending]
patterns['spending_anomalies'] = anomaly_days.to_dict()

return patterns

趋势分析与预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def analyze_financial_trends(financial_data, time_column, target_columns):
trend_analysis = {}

for column in target_columns:
# 提取目标列的时间序列数据
time_series = financial_data[[time_column, column]].dropna()

# 根据数据点数量选择合适的模型
if len(time_series) >= 30:
# 对于较长的时间序列,尝试ARIMA模型
from statsmodels.tsa.arima.model import ARIMA

try:
model = ARIMA(time_series[column], order=(1, 1, 1))
model_fit = model.fit()
forecast = model_fit.forecast(steps=3)

trend_analysis[column] = {
'historical_data': time_series[column].to_dict(),
'forecast_next_3_periods': forecast.to_dict(),
'model_summary': model_fit.summary()
}
except:
# 如果ARIMA失败,退回到简单线性回归
trend_analysis[column] = calculate_linear_trend(time_series, column)
else:
# 对于较短的时间序列,使用简单线性回归
trend_analysis[column] = calculate_linear_trend(time_series, column)

return trend_analysis

3.3 文本特征提取

关键信息提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def extract_key_information_from_text(text):
# 初始化NLP处理器
import spacy
nlp = spacy.load("zh_core_web_trf") # 使用中文预训练模型

# 处理文本
doc = nlp(text)

# 提取实体
entities = {
'PERSON': [],
'ORG': [],
'DATE': [],
'MONEY': [],
'PERCENT': [],
'GPE': [] # 地理政治实体
}

for ent in doc.ents:
if ent.label_ in entities:
entities[ent.label_].append({
'text': ent.text,
'start': ent.start_char,
'end': ent.end_char
})

# 提取关键短语
from keybert import KeyBERT
kw_model = KeyBERT()
keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1, 3), top_n=10)

# 提取主题句
from summa import summarizer
summary = summarizer.summarize(text, ratio=0.2)

return {
'entities': entities,
'keywords': keywords,
'summary': summary
}

情感分析与风险评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def analyze_text_sentiment_and_risk(text):
from transformers import pipeline

# 情感分析
sentiment_analyzer = pipeline("sentiment-analysis", model="uer/roberta-base-finetuned-jd-binary-chinese")
sentiment_result = sentiment_analyzer(text)

# 风险词汇检测
risk_terms = ['违约', '拖欠', '破产', '诉讼', '失信', '高负债', '资金链断裂', '监管处罚']
risk_mentions = {}

for term in risk_terms:
if term in text:
import re
# 提取包含风险词的上下文
context_matches = re.finditer(f".{{0,50}}{term}.{{0,50}}", text)
risk_mentions[term] = [match.group(0) for match in context_matches]

# 综合风险评分
risk_score = len(risk_mentions) * 10
if sentiment_result[0]['label'] == 'negative':
risk_score += sentiment_result[0]['score'] * 50

return {
'sentiment': sentiment_result,
'risk_mentions': risk_mentions,
'risk_score': risk_score
}

3.4 数据增强

交叉特征生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def generate_cross_features(personal_data, business_data=None, relatives_data=None):
cross_features = {}

# 个人消费与信贷历史交叉特征
if 'credit_history' in personal_data and 'consumption_patterns' in personal_data:
# 消费行为与信用评分的关系
cross_features['consumption_credit_correlation'] = calculate_correlation(
personal_data['consumption_patterns']['monthly_spending'],
personal_data['credit_history']['monthly_payment_history']
)

# 个人与商家数据交叉特征
if business_data is not None:
# 个人消费与商业收入的关系
if 'consumption_amount' in personal_data and 'revenue' in business_data:
cross_features['consumption_to_revenue_ratio'] = (
personal_data['consumption_amount'] / business_data['revenue']
)

# 个人与亲友数据的安全交叉特征
if relatives_data is not None and is_compliant_to_use(relatives_data):
# 紧急联系人稳定性指标 (合规使用)
if 'emergency_contacts' in relatives_data:
cross_features['emergency_contact_stability'] = calculate_emergency_contact_stability(
relatives_data['emergency_contacts']
)

return cross_features

领域知识增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def enhance_with_domain_knowledge(features, domain='consumer_finance'):
enhanced_features = features.copy()

if domain == 'consumer_finance':
# 应用消费金融领域知识规则
if 'debt_to_income_ratio' in features:
# 根据行业标准评估债务收入比
if features['debt_to_income_ratio'] < 0.36:
enhanced_features['dti_risk_level'] = 'low'
elif features['debt_to_income_ratio'] < 0.43:
enhanced_features['dti_risk_level'] = 'medium'
else:
enhanced_features['dti_risk_level'] = 'high'

# 计算自定义财务健康指数
if all(k in features for k in ['savings_rate', 'essential_spending_ratio', 'credit_utilization']):
enhanced_features['financial_health_index'] = calculate_financial_health_index(
features['savings_rate'],
features['essential_spending_ratio'],
features['credit_utilization']
)

elif domain == 'small_business':
# 应用小企业融资领域知识
if all(k in features for k in ['current_ratio', 'debt_to_equity', 'profit_margin']):
enhanced_features['business_viability_score'] = calculate_business_viability(
features['current_ratio'],
features['debt_to_equity'],
features['profit_margin']
)

return enhanced_features

4. 数据存储与索引层

4.1 自适应分块

智能分块策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def chunk_data_intelligently(data, data_type):
chunks = []

if data_type == 'financial_record':
# 财务数据按周期分块
chunks = chunk_by_financial_period(data)

elif data_type == 'consumption_record':
# 消费记录按消费模式分块
chunks = chunk_by_consumption_pattern(data)

elif data_type == 'expert_text':
# 专家文本按语义单元分块
chunks = chunk_text_by_semantic_units(data)

# 应用递归分块提高复杂文档的划分质量
chunks = apply_recursive_chunking(chunks)

# 确保每个块内的语义完整性
chunks = ensure_semantic_integrity(chunks)

return chunks

多粒度分块机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def create_multi_granularity_chunks(text_data):
# 大型块 (章节级) - 约2000字符
large_chunks = split_text_into_chunks(text_data, chunk_size=2000, overlap=200)

# 中型块 (段落级) - 约500字符
medium_chunks = split_text_into_chunks(text_data, chunk_size=500, overlap=100)

# 小型块 (句子级) - 约150字符
small_chunks = split_text_into_chunks(text_data, chunk_size=150, overlap=30)

# 建立块之间的层级关系
chunk_hierarchy = build_chunk_hierarchy(large_chunks, medium_chunks, small_chunks)

return {
'large_chunks': large_chunks,
'medium_chunks': medium_chunks,
'small_chunks': small_chunks,
'hierarchy': chunk_hierarchy
}

4.2 向量嵌入

多模型嵌入策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def create_embeddings(chunks, embedding_models=None):
if embedding_models is None:
# 默认使用BERT模型
embedding_models = {
'primary': 'BAAI/bge-large-zh',
'backup': 'shibing624/text2vec-base-chinese'
}

from sentence_transformers import SentenceTransformer

# 初始化模型
primary_model = SentenceTransformer(embedding_models['primary'])
backup_model = SentenceTransformer(embedding_models['backup'])

embeddings = {}

# 使用主模型生成嵌入
try:
embeddings['primary'] = primary_model.encode(
[chunk['text'] for chunk in chunks],
batch_size=32,
show_progress_bar=True
)
except Exception as e:
print(f"Primary model encoding failed: {e}")

# 使用备用模型生成嵌入
try:
embeddings['backup'] = backup_model.encode(
[chunk['text'] for chunk in chunks],
batch_size=32,
show_progress_bar=True
)
except Exception as e:
print(f"Backup model encoding failed: {e}")

# 确保至少有一个成功的嵌入
if 'primary' not in embeddings and 'backup' not in embeddings:
raise ValueError("All embedding models failed")

return embeddings

特殊内容处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def embed_special_content(chunks):
embeddings = {}

# 识别包含表格的块
table_chunks = [chunk for chunk in chunks if is_table_chunk(chunk)]

# 识别包含数值数据的块
numerical_chunks = [chunk for chunk in chunks if is_numerical_chunk(chunk)]

# 处理表格块
if table_chunks:
table_embeddings = generate_table_embeddings(table_chunks)
embeddings['table_embeddings'] = table_embeddings

# 处理数值块
if numerical_chunks:
numerical_embeddings = generate_numerical_embeddings(numerical_chunks)
embeddings['numerical_embeddings'] = numerical_embeddings

return embeddings

4.3 元数据生成

元数据提取与标准化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def generate_metadata(chunks, original_data_source):
metadata = []

for i, chunk in enumerate(chunks):
chunk_metadata = {
'chunk_id': f"{original_data_source['id']}_{i}",
'source_type': original_data_source['type'],
'creation_date': original_data_source.get('date'),
'last_modified': original_data_source.get('last_modified'),
'chunk_position': i,
'total_chunks': len(chunks)
}

# 针对不同类型数据添加特定元数据
if original_data_source['type'] == 'personal_consumption':
# 添加时间范围元数据
if 'date_range' in chunk:
chunk_metadata['start_date'] = chunk['date_range'][0]
chunk_metadata['end_date'] = chunk['date_range'][1]

# 添加消费类别元数据
if 'categories' in chunk:
chunk_metadata['categories'] = chunk['categories']

elif original_data_source['type'] == 'business_data':
# 添加财务周期元数据
if 'financial_period' in chunk:
chunk_metadata['period'] = chunk['financial_period']

# 添加报表类型元数据
if 'report_type' in chunk:
chunk_metadata['report_type'] = chunk['report_type']

elif original_data_source['type'] == 'expert_case':
# 添加文档部分元数据
if 'section_title' in chunk:
chunk_metadata['section_title'] = chunk['section_title']

# 添加实体元数据
if 'entities' in chunk:
chunk_metadata['entities'] = chunk['entities']

metadata.append(chunk_metadata)

return metadata

时间属性与过滤标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def enrich_metadata_with_filters(metadata, analysis_results):
enriched_metadata = []

for item in metadata:
# 复制基本元数据
enriched_item = item.copy()

# 添加时间维度标签
if 'creation_date' in item:
import datetime
now = datetime.datetime.now()
creation_date = item['creation_date']

# 计算数据年龄
if isinstance(creation_date, datetime.datetime):
age_days = (now - creation_date).days

if age_days < 30:
enriched_item['time_relevance'] = 'very_recent'
elif age_days < 90:
enriched_item['time_relevance'] = 'recent'
elif age_days < 365:
enriched_item['time_relevance'] = 'within_year'
else:
enriched_item['time_relevance'] = 'historical'

# 添加内容类型标签
if item['source_type'] == 'expert_case':
# 根据内容分析结果添加标签
chunk_id = item['chunk_id']
if chunk_id in analysis_results:
enriched_item['content_type'] = analysis_results[chunk_id]['content_type']
enriched_item['sentiment'] = analysis_results[chunk_id]['sentiment']
enriched_item['complexity'] = analysis_results[chunk_id]['complexity']

enriched_metadata.append(enriched_item)

return enriched_metadata

4.4 向量数据存储

混合存储策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def store_processed_data(chunks, embeddings, metadata, storage_config):
# 验证输入数据完整性
assert len(chunks) == len(embeddings['primary']) == len(metadata), "Data length mismatch"

# 初始化存储客户端
vector_db = initialize_vector_db(storage_config['vector_db'])
document_db = initialize_document_db(storage_config['document_db'])

# 准备批量插入数据
vector_records = []
document_records = []

for i, (chunk, embedding, meta) in enumerate(zip(chunks, embeddings['primary'], metadata)):
# 准备向量记录
vector_record = {
'id': meta['chunk_id'],
'vector': embedding,
'metadata': {k: v for k, v in meta.items() if k != 'chunk_id'}
}
vector_records.append(vector_record)

# 准备文档记录
document_record = {
'id': meta['chunk_id'],
'text': chunk['text'],
'metadata': meta,
'embedding_id': meta['chunk_id'] # 链接到向量记录
}
document_records.append(document_record)

# 批量插入向量数据
vector_db.insert_batch(vector_records)

# 批量插入文档数据
document_db.insert_batch(document_records)

# 创建必要索引
create_indices(vector_db, document_db)

return {
'vector_db_info': vector_db.get_collection_stats(),
'document_db_info': document_db.get_collection_stats()
}

检索优化配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def configure_retrieval_settings(vector_db, application_type='loan_approval'):
# 基于应用场景配置最佳检索参数

if application_type == 'loan_approval':
# 贷款审批优化配置
retrieval_config = {
'search_algorithm': 'hybrid', # 混合搜索(向量+关键词)
'sparse_weight': 0.3, # 稀疏搜索权重
'dense_weight': 0.7, # 稠密搜索权重
'top_k': 15, # 初始检索结果数量
'reranker_model': 'BAAI/bge-reranker-large', # 重排模型
'minimum_relevance_score': 0.75, # 最低相关性阈值
'default_filters': {
'time_relevance': ['very_recent', 'recent'] # 默认时间相关性过滤
}
}

elif application_type == 'risk_analysis':
# 风险分析优化配置
retrieval_config = {
'search_algorithm': 'dense', # 仅向量搜索
'top_k': 25, # 检索更多相关内容
'reranker_model': 'BAAI/bge-reranker-large',
'minimum_relevance_score': 0.65, # 更低的阈值以获取更广泛的内容
'default_filters': {} # 不应用默认过滤器
}

# 应用配置到向量数据库
vector_db.configure_retrieval(retrieval_config)

return retrieval_config

5. 数据安全与合规性保障

5.1 隐私保护机制

数据脱敏实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def apply_data_anonymization(data, sensitivity_level='high'):
anonymized_data = data.copy()

# 个人身份信息(PII)处理
pii_fields = {
'id_number': mask_id_number,
'phone_number': mask_phone_number,
'email': mask_email,
'address': mask_address,
'full_name': mask_name,
'bank_account': mask_bank_account
}

# 应用脱敏函数
for field, mask_func in pii_fields.items():
if field in anonymized_data:
anonymized_data[field] = mask_func(anonymized_data[field], sensitivity_level)

# 处理敏感文本字段
text_fields = [field for field in anonymized_data if isinstance(anonymized_data[field], str)]
for field in text_fields:
anonymized_data[field] = anonymize_text(anonymized_data[field], sensitivity_level)

return anonymized_data

差分隐私实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def apply_differential_privacy(aggregate_data, epsilon=0.1):
"""
对聚合数据应用差分隐私保护
epsilon: 隐私预算 (较小值提供更强的隐私保护)
"""
from diffprivlib.tools import mean, std, var

dp_results = {}

# 对数值字段应用差分隐私
for field, values in aggregate_data.items():
if isinstance(values, list) and all(isinstance(x, (int, float)) for x in values):
dp_results[f'{field}_mean'] = mean(values, epsilon=epsilon)
dp_results[f'{field}_std'] = std(values, epsilon=epsilon)
dp_results[f'{field}_var'] = var(values, epsilon=epsilon)

return dp_results

5.2 合规性审查流程

数据使用合规检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def check_data_compliance(data_source, intended_use):
compliance_status = {
'is_compliant': True,
'issues': [],
'recommendations': []
}

# 检查数据来源合法性
if not is_legal_data_source(data_source):
compliance_status['is_compliant'] = False
compliance_status['issues'].append('数据来源不合规')
compliance_status['recommendations'].append('仅使用合法授权的数据来源')

# 检查数据使用目的
if not is_purpose_compliant(intended_use, data_source):
compliance_status['is_compliant'] = False
compliance_status['issues'].append('数据使用目的与用户授权不符')
compliance_status['recommendations'].append('获取明确的数据使用授权')

# 特别注意亲友信息的使用合规性
if data_source.get('type') == 'relatives_info':
relatives_compliance = check_relatives_data_compliance(data_source, intended_use)
if not relatives_compliance['is_compliant']:
compliance_status['is_compliant'] = False
compliance_status['issues'].extend(relatives_compliance['issues'])
compliance_status['recommendations'].extend(relatives_compliance['recommendations'])

# 对于亲友数据,建议严格限制使用范围
compliance_status['recommendations'].append('仅使用作为紧急联系人的基本联系信息,禁止用于信用评估')

return compliance_status

数据访问控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def apply_access_controls(processed_data, compliance_status):
# 基于合规状态应用访问控制
if not compliance_status['is_compliant']:
# 对不合规数据应用严格限制
restricted_data = apply_strict_restrictions(processed_data, compliance_status['issues'])

# 记录访问限制日志
log_access_restrictions(restricted_data, compliance_status)

return restricted_data
else:
# 添加最小必要访问控制
protected_data = add_minimal_access_controls(processed_data)

# 添加数据使用追踪标记
protected_data = add_usage_tracking(protected_data)

return protected_data

五、风险评估引擎

多维度风险指标体系

信用风险指标
信用评分与历史
还款记录分析
违约概率模型
财务风险指标
现金流健康度
偿债能力比率
盈利能力指标
经营效率指标
市场与行业风险指标
行业景气指数
市场竞争格局
宏观经济影响因素
运营风险指标
经营稳定性评估
管理团队分析
业务模式风险评估
综合风险评分计算
多指标加权模型
机器学习综合评分
LLM辅助风险综合评价

风险模型集成

传统风险模型
逻辑回归信用模型
决策树风险分类
随机森林风险预测
基于LLM的风险分析
多特征关联性分析
非结构化数据风险识别
定性因素风险判断
混合模型决策机制
模型权重动态调整
风险评级与置信度估计
异常检测与人工介入触发

风险评估流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sequenceDiagram
参与者 信贷员
参与者 系统
参与者 风控审核

信贷员->>系统: 提交贷款申请
系统->>系统: 数据收集与预处理
系统->>系统: 基础风险指标计算

par 并行风险评估
系统->>系统: 信用风险评估
系统->>系统: 财务风险评估
系统->>系统: 市场风险评估
系统->>系统: 运营风险评估
end

系统->>系统: 综合风险评分生成
系统->>系统: 风险解释生成

alt 常规风险案例
系统->>信贷员: 返回风险评估报告
else 异常或高风险案例
系统->>风控审核: 转交人工审核
风控审核->>系统: 审核结果输入
系统->>信贷员: 返回风险评估报告
end

六、LangGraph协作机制

工作流定义

系统基于LangGraph构建的协作工作流包含以下核心元素:
节点定义
每个Agent作为一个功能节点
定义节点输入输出接口
设置节点处理逻辑与条件
边与流转控制
定义节点间信息传递规则
设置条件分支与循环机制
错误处理与回退路径
状态管理
全局评估流程状态追踪
中间结果缓存与恢复
多轮交互状态保持

代码示例: 核心工作流定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from langchain.graphs import StateGraph
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI

# 定义模型和工具
llm = ChatOpenAI(model="gpt-4")

# 定义状态图
workflow = StateGraph(name="贷款风险评估")

# 定义协调Agent节点
coordinator_node = workflow.add_node("coordinator", coordinator_agent)
data_acquisition_node = workflow.add_node("data_acquisition", data_acquisition_agent)
credit_analysis_node = workflow.add_node("credit_analysis", credit_analysis_agent)
financial_analysis_node = workflow.add_node("financial_analysis", financial_analysis_agent)
market_analysis_node = workflow.add_node("market_analysis", market_analysis_agent)
fraud_detection_node = workflow.add_node("fraud_detection", fraud_detection_agent)
compliance_node = workflow.add_node("compliance", compliance_agent)
decision_node = workflow.add_node("decision_synthesis", decision_synthesis_agent)
communication_node = workflow.add_node("communication", communication_agent)

# 定义工作流转路径
workflow.add_edge(coordinator_node, data_acquisition_node)
workflow.add_conditional_edges(
data_acquisition_node,
lambda x: "data_complete" if x["data_status"] == "complete" else "data_incomplete",
{
"data_complete": [credit_analysis_node, financial_analysis_node,
market_analysis_node, fraud_detection_node],
"data_incomplete": coordinator_node
}
)

# 从分析节点到结果汇总
for node in [credit_analysis_node, financial_analysis_node,
market_analysis_node, fraud_detection_node]:
workflow.add_edge(node, compliance_node)

workflow.add_edge(compliance_node, decision_node)
workflow.add_edge(decision_node, communication_node)

# 设置工作流入口和出口
workflow.set_entry_point(coordinator_node)
workflow.set_finish_point(communication_node)

# 编译工作流
app = workflow.compile()

并行处理与同步机制

并行分析策略
信用、财务、市场等分析并行执行
采用"fork-join"模式进行结果汇总
支持分析依赖性管理与优先级排序
同步点设计
关键决策点强制同步
中间结果共享机制
冲突检测与解决策略
异常处理机制
Agent失败重试策略
降级处理路径
人工介入触发条件

七、LLM应用策略

模型选择策略

多层级模型架构
基础任务:Qwen-Max或国产替代模型
复杂推理:GPT-4或等效高性能模型
特定功能:专业微调模型
动态模型选择
基于任务复杂度选择模型
考虑成本与性能平衡
支持模型回退机制

Prompt工程最佳实践

角色与指令定义
明确Agent角色和任务边界
提供详细操作指南和评估标准
设置输出格式和质量要求
情境化提示设计
包含相关背景信息和前置知识
预设任务流程和决策框架
提供示例输出引导模型行为
去偏策略
强调公平评估和避免歧视
设置多角度考量要求
引入交叉验证机制

示例: 财务分析Agent提示模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
你是一位专业的小企业财务分析专家,负责评估贷款申请企业的财务健康状况。
现在你需要分析以下企业财务数据,并生成财务风险评估报告。

企业基本信息:
{business_info}

财务数据:
{financial_data}

行业标准参考:
{industry_benchmarks}

评估要求:
1. 分析关键财务比率(包括但不限于流动比率、速动比率、资产负债率、利息保障倍数)
2. 评估企业现金流状况和趋势
3. 分析盈利能力和可持续性
4. 与行业标准进行对比
5. 识别财务风险点和异常指标
6. 评估企业偿债能力和财务弹性
7. 给出财务风险等级(低、中、高)和详细理由

输出格式:
- 结构化JSON格式,包含各项指标评估和总体评价
- 每项评估必须包含数值分析和定性解释
- 必须包含风险等级和关键风险因素列表

避免:
- 不要仅根据单一指标做出结论
- 不要忽略行业特性和企业发展阶段
- 不要使用未提供的信息进行假设
- 必须客观公正,避免任何形式的偏见

八、RAG应用方案

应用场景映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+-------------------------+------------------------+---------------------+
| 应用场景 | RAG功能 | 关键优化点 |
+-------------------------+------------------------+---------------------+
| 政策法规查询 | 准确定位相关法规条例 | 关键词与向量混合检索 |
| | 解释应用于具体案例 | 内容重排与合规验证 |
+-------------------------+------------------------+---------------------+
| 行业风险分析 | 提取行业特定风险因素 | 多类型文档整合能力 |
| | 关联市场趋势数据 | 时效性内容优先展示 |
+-------------------------+------------------------+---------------------+
| 案例参考与决策支持 | 查找相似案例的处理方式 | 相似度算法优化 |
| | 提供决策依据与解释 | 公平性和合规性保障 |
+-------------------------+------------------------+---------------------+
| 异常情况处理指南 | 识别异常模式并提供处理 | 多路径检索策略 |
| | 方案 | 异常情况分类精度 |
+-------------------------+------------------------+---------------------+
| 风险指标解释 | 生成易理解的指标解释 | 内容简化与通俗化 |
| | 提供改善建议 | 术语释义自动增强 |
+-------------------------+------------------------+---------------------+

RAG实现流程

数据准备
收集相关政策、法规、行业分析报告
整理历史案例库和决策记录
制作专业术语解释文档和评估标准说明
知识库构建
数据清洗与格式标准化
文档分块与多向量索引创建
元数据标注与分类系统建立
检索引擎实现
查询理解与改写模块开发
混合检索策略实现
结果重排与过滤机制构建
结果合成与应用
检索内容与当前任务整合
基于检索增强提示工程
结果验证与引用追踪

代码示例: RAG检索增强处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.chat_models import ChatOpenAI

# 定义LLM
llm = ChatOpenAI(model="gpt-4")

# 创建向量检索器
vector_retriever = vectordb.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 6, "score_threshold": 0.5}
)

# 创建BM25检索器
bm25_retriever = BM25Retriever.from_documents(
documents,
preprocess_func=lambda x: x.page_content
)

# 创建混合检索器
ensemble_retriever = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.7, 0.3]
)

# 创建多查询检索器
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=ensemble_retriever,
llm=llm
)

# 创建上下文压缩器
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=multi_query_retriever
)

# 查询优化与检索
def retrieve_enhanced_context(query, business_context):
# 查询增强
enhanced_query = f"""
以下是关于一家小企业贷款申请的查询,企业背景:
{business_context}

原始查询: {query}
"""

# 执行检索
docs = compression_retriever.get_relevant_documents(enhanced_query)

# 后处理与排序
processed_docs = []
for doc in docs:
# 添加元数据与相关性分数
processed_docs.append({
"content": doc.page_content,
"source": doc.metadata.get("source", "未知"),
"relevance": doc.metadata.get("score", 0.0),
"date": doc.metadata.get("date", "未知")
})

# 按相关性排序
processed_docs = sorted(processed_docs, key=lambda x: x["relevance"], reverse=True)

return processed_docs