LLM-Agent场景题:贷款审批 
摘要 
本方案提出一个基于大型语言模型(LLM)和检索增强生成(RAG)的小企业贷款风险评估智能系统,通过多智能体(Multi-Agent)协作架构,实现贷款申请的自动化分析、风险评估和决策建议生成。系统整合传统信用评分模型与先进AI技术,显著提升贷款决策的效率、准确性和透明度,同时确保合规性和公平性。
一、系统架构概览 
核心架构组件 
多Agent协作框架:基于LangGraph构建的协作网络,实现不同功能智能体间的有序交互 
RAG知识库系统:多层级、多向量的知识库架构,支持领域专业知识检索与应用 
数据处理管道:处理结构化与非结构化金融数据的流水线系统 
风险评估引擎:融合传统信用模型与LLM分析能力的混合风险评估机制 
决策生成与解释系统:生成可解释的贷款决策建议,提供透明的评估依据
技术栈选择 
±------------------±------------------±-----------------+ 
|    功能层         |     技术选择       |      备注         | 
±------------------±------------------±-----------------+ 
| 框架基础          | LangChain         | 灵活组合AI组件     | 
|                   | LangGraph         | 多Agent协作流编排  | 
±------------------±------------------±-----------------+ 
| 基础模型          | Azure OpenAI      | 企业级合规部署     | 
|                   | Qwen-Max          | 中文处理能力优秀   | 
±------------------±------------------±-----------------+ 
| 向量数据库        | Pinecone/Qdrant   | 支持混合检索策略   | 
±------------------±------------------±-----------------+ 
| 嵌入模型          | BGE-Large-zh      | 优秀的中文嵌入效果 | 
±------------------±------------------±-----------------+ 
| 数据处理          | LangChain Doc Proc| 文档处理工具包     | 
|                   | Unstructured      | 非结构化数据解析   | 
±------------------±------------------±-----------------+ 
| 监控与追踪        | LangSmith         | 全流程监控追踪     | 
±------------------±------------------±-----------------+
二、多Agent协作系统详解 
Agent网络架构 
系统采用LangGraph构建有向无环图(DAG)形式的Agent协作网络,实现信息流动和决策流程的可控性:
各Agent功能定义 
协调Agent (Coordinator) 
统筹整个评估流程,分发任务,整合各Agent输出 
处理异常情况和决策冲突 
维护评估流程的状态和进度
数据获取Agent (Data Acquisition) 
从贷款申请表和相关文件中提取关键信息 
识别缺失数据并提出补充需求 
将提取的数据标准化为系统内部格式
信用评估Agent (Credit Analysis) 
分析信用报告和历史记录 
评估借款人的信用评分和趋势 
识别信用记录中的异常模式和风险信号
财务分析Agent (Financial Analysis) 
分析企业财务报表和现金流状况 
计算关键财务比率和健康指标 
评估企业的盈利能力和财务稳定性
市场分析Agent (Market Analysis) 
评估企业所在行业的发展趋势和市场环境 
分析宏观经济因素对企业的影响 
考量行业竞争格局和企业市场定位
反欺诈Agent (Fraud Detection) 
识别可疑的交易模式和异常数据 
验证提供信息的一致性和真实性 
生成欺诈风险评分和警报
合规检查Agent (Compliance) 
确保评估流程符合相关法规和政策 
检查决策过程中的偏见和歧视因素 
生成合规性报告
决策合成Agent (Decision Synthesis) 
整合各Agent的分析结果 
生成最终风险评估报告和决策建议 
提供决策依据和解释
沟通Agent (Communication) 
生成客户友好的报告和反馈 
准备内部评审文档 
处理后续交互和问题回答
Agent协作流程 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 graph TD     A[申请接收] --> B[协调Agent初始化]     B --> C[数据获取Agent]     C --> D[数据验证与预处理]     D --> E{数据完整性检查}     E -->|不完整| F[请求补充数据]     F --> C     E -->|完整| G[并行分析启动]     G --> H1[信用评估Agent]     G --> H2[财务分析Agent]     G --> H3[市场分析Agent]     G --> H4[反欺诈Agent]     H1 --> I[分析结果汇总]     H2 --> I     H3 --> I     H4 --> I     I --> J[合规检查Agent]     J --> K[决策合成Agent]     K --> L[风险评估报告]     L --> M[沟通Agent]     M --> N[生成反馈与解释] 
 
三、RAG系统设计 
知识库架构 
系统采用多层级知识库架构,满足不同类型知识检索需求: 
一级知识库:行业与政策知识 
贷款政策法规条例 
行业分析报告与指标标准 
经济环境与市场趋势分析
二级知识库:风险评估指南 
信用风险评估方法论 
财务风险评估标准与指标 
风险评分模型与解释文档 
案例库与决策先例
三级知识库:操作指南 
Agent操作流程和标准 
异常处理与上报规则 
数据解释与处理指南
索引与检索策略 
混合检索技术 
结合关键词检索(BM25)与向量检索的混合方法 
对索引数据采用动态alpha值调整策略(0.2-0.8范围) 
基于查询内容智能选择检索策略
多向量检索 
为每个文档创建多种表示向量: 
原始文档向量 
摘要向量 
假设问题向量(HyDE) 
支持查询扩展与多路检索
重排序机制 
采用BGE-reranker-large对初始检索结果重排 
结合内容相关度与权威性的评分机制 
支持基于上下文的智能过滤
RAG优化技术 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 +------------------+----------------------+---------------------+ |   优化方向        |      采用技术        |       预期效果       | +------------------+----------------------+---------------------+ | 查询理解          | 步退提示(Step Back)  | 提高抽象理解能力      | |                  | 多查询扩展           | 提高覆盖范围          | +------------------+----------------------+---------------------+ | 检索质量          | 混合检索             | 平衡精确与相关性      | |                  | 上下文感知重排序     | 提高指令的相关度      | +------------------+----------------------+---------------------+ | 文档处理          | 递归分块策略         | 保持语义完整性        | |                  | 句子窗口检索         | 丰富上下文环境        | +------------------+----------------------+---------------------+ | 知识整合          | LLM引导的知识合成    | 减少知识冗余          | |                  | 摘要压缩            | 降低内容复杂度        | +------------------+----------------------+---------------------+ | 偏见控制          | 多模型交叉验证      | 减少单一模型偏见       | |                  | 引导Prompt设计      | 控制偏见传播          | +------------------+----------------------+---------------------+ 
 
四、数据处理流程 
总体架构 
基于RAG技术和多Agent协作的数据预处理流程,主要分为四个主要阶段:数据接入、数据解析与标准化、特征提取与增强、数据存储与索引。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 digraph PreprocessingFlow {   rankdir=TB;   node [shape=box, style=filled, color=skyblue];      subgraph cluster_0 {     label="数据接入层";     style=filled;     color=lightgrey;     node [style=filled, color=white];          DataIngestion [label="多源异构数据接入"];     ValidityCheck [label="数据有效性检查"];     DataParallelize [label="数据并行化处理"];   }      subgraph cluster_1 {     label="数据解析与标准化层";     style=filled;     color=lightcyan;     node [style=filled, color=white];          StructuredDataParsing [label="结构化数据解析"];     UnstructuredTextParsing [label="非结构化文本解析"];     DataCleansing [label="数据清洗"];     DataNormalization [label="数据标准化"];   }      subgraph cluster_2 {     label="特征提取与增强层";     style=filled;     color=lightsalmon;     node [style=filled, color=white];          FeatureExtraction [label="金融特征提取"];     TimeSeriesAnalysis [label="时间序列分析"];     TextualFeatures [label="文本特征提取"];     DataEnrichment [label="数据增强"];   }      subgraph cluster_3 {     label="数据存储与索引层";     style=filled;     color=lightgreen;     node [style=filled, color=white];          DataChunking [label="自适应分块"];     VectorEmbedding [label="向量嵌入"];     MetadataGeneration [label="元数据生成"];     VectorStorage [label="向量数据存储"];   }      // 连接   DataIngestion -> ValidityCheck -> DataParallelize;      DataParallelize -> StructuredDataParsing;   DataParallelize -> UnstructuredTextParsing;      StructuredDataParsing -> DataCleansing;   UnstructuredTextParsing -> DataCleansing;   DataCleansing -> DataNormalization;      DataNormalization -> FeatureExtraction;   DataNormalization -> TimeSeriesAnalysis;   DataNormalization -> TextualFeatures;   FeatureExtraction -> DataEnrichment;   TimeSeriesAnalysis -> DataEnrichment;   TextualFeatures -> DataEnrichment;      DataEnrichment -> DataChunking;   DataChunking -> VectorEmbedding;   VectorEmbedding -> MetadataGeneration;   MetadataGeneration -> VectorStorage; } 
 
1.1 多源异构数据接入 
个人消费记录接入 
• 支持多种格式接入:CSV、Excel、JSON、银行API导出文件 
• 自动检测分隔符和编码格式 
• 数据源元信息记录:来源、更新时间、数据版本 
商家经营数据接入 
• 支持标准财务报表格式:资产负债表、利润表、现金流量表 
• 行业分类自动标记 
• 支持结构化数据库接口和API集成 
亲友信息表格接入 
• 严格权限控制与访问记录 
• 数据来源合法性验证 
• 敏感数据自动识别和初步脱敏标记 
专家案例文本接入 
• 支持PDF、Word、HTML等格式 
• 基于文档结构的自动分段 
• OCR处理图片中嵌入的文本信息
1.2 数据有效性检查 
数据完整性验证 
• 必填字段检查:确保核心数据字段存在 
• 数据量验证:确保数据量满足最低分析要求 
• 日期范围验证:确保数据的时效性 
格式有效性检查 
• 数值型字段格式验证 
• 日期格式标准化检查 
• 文本编码一致性检查 
业务规则验证 
• 金额范围合理性检查 
• 交易频率合理性检查 
• 基本业务逻辑验证
1.3 数据并行化处理 
数据分片策略 
• 基于数据量自适应分片 
• 时间序列数据的时间窗口分片 
• 文本数据的自然段落分片 
分布式处理框架 
• 支持Spark/Dask等分布式计算框架 
• 任务优先级调度 
• 处理进度监控与失败重试机制
2.1 结构化数据解析 
2.1.1 个人消费记录解析 
数据结构化处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def  parse_consumption_record (data_source ):         file_format = detect_file_format(data_source)               if  file_format == 'csv' :         data = pd.read_csv(data_source, parse_dates=['transaction_date' ])     elif  file_format == 'excel' :         data = pd.read_excel(data_source, parse_dates=['transaction_date' ])     elif  file_format == 'json' :         data = pd.read_json(data_source)         data['transaction_date' ] = pd.to_datetime(data['transaction_date' ])               data = standardize_column_names(data, consumption_schema)               validate_consumption_structure(data)          return  data 
 
字段标准化映射
1 2 3 4 5 6 7 8 9 10 标准字段映射 = {     '交易日期/时间': 'transaction_date',     '消费金额': 'amount',     '交易类型': 'transaction_type',     '商户名称': 'merchant_name',     '消费类别': 'category',     '支付方式': 'payment_method',     '交易状态': 'status',     '交易位置': 'location' } 
 
2.1.2 商家经营数据解析 
财务数据标准化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def  parse_business_data (data_source ):         report_type = identify_financial_report_type(data_source)               if  report_type == 'balance_sheet' :         data = parse_balance_sheet(data_source)     elif  report_type == 'income_statement' :         data = parse_income_statement(data_source)     elif  report_type == 'cash_flow' :         data = parse_cash_flow_statement(data_source)     else :         data = parse_general_business_data(data_source)               data = calculate_financial_ratios(data)               data = add_industry_benchmarks(data)          return  data 
 
行业代码映射 
• 建立标准行业分类代码映射表 
• 对接国家统计局行业分类标准 
• 支持多级行业代码自动识别与分配
2.1.3 亲友信息解析 
关系数据处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def  parse_relatives_data (data_source ):         data = load_sensitive_data(data_source, encryption_keys)               compliance_status = check_compliance_requirements(data)     if  not  compliance_status.is_compliant:         log_compliance_issue(compliance_status)         data = apply_compliance_restrictions(data, compliance_status)               relationship_graph = build_relationship_graph(data)               tagged_relationships = tag_relationships(relationship_graph)          return  {         'data' : data,         'compliance_status' : compliance_status,         'relationship_graph' : relationship_graph,         'tagged_relationships' : tagged_relationships     } 
 
隐私保护措施 
• 个人身份信息(PII)识别与脱敏 
• 基于角色的访问控制标记 
• 数据使用目的限制标签
2.2 非结构化文本解析 
2.2.1 专家案例文本解析 
文档结构化处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def  parse_expert_case (document_path ):         doc_format = detect_document_format(document_path)     text_content = convert_to_text(document_path, doc_format)               doc_structure = identify_document_structure(text_content)               sections = extract_document_sections(text_content, doc_structure)               key_information = extract_key_information(sections)               entities = extract_named_entities(text_content)               summary = generate_document_summary(text_content)          return  {         'full_text' : text_content,         'sections' : sections,         'key_information' : key_information,         'entities' : entities,         'summary' : summary     } 
 
文本结构识别规则 
• 标题层级识别规则 
• 案例分隔标记识别 
• 关键分析部分识别模式
2.3 数据清洗 
通用数据清洗策略 
• 缺失值处理:基于业务规则和统计推断 
• 异常值检测与处理:基于Z-分数、IQR或领域规则 
• 重复数据检测与处理 
• 数据一致性检查与修正 
领域特定清洗规则
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def  clean_financial_data (data ):                   data = process_negative_values(data)               data = standardize_monetary_units(data)               data = handle_financial_outliers(data)               data = impute_missing_financial_indicators(data)          return  data 
 
文本数据清洗
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def  clean_text_data (text_data ):         cleaned_text = remove_html_tags(text_data)               cleaned_text = normalize_whitespace(cleaned_text)               cleaned_text = handle_special_characters(cleaned_text)               cleaned_text = correct_common_misspellings(cleaned_text)               cleaned_text = remove_non_informative_content(cleaned_text)          return  cleaned_text 
 
2.4 数据标准化 
数值型数据标准化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def  normalize_numerical_data (dataframe, normalization_type='z-score'  ):    normalized_df = dataframe.copy()          numerical_columns = dataframe.select_dtypes(include=['int64' , 'float64' ]).columns          if  normalization_type == 'z-score' :                  for  col in  numerical_columns:             mean = dataframe[col].mean()             std = dataframe[col].std()             normalized_df[col] = (dataframe[col] - mean) / std          elif  normalization_type == 'min-max' :                  for  col in  numerical_columns:             min_val = dataframe[col].min ()             max_val = dataframe[col].max ()             normalized_df[col] = (dataframe[col] - min_val) / (max_val - min_val)          elif  normalization_type == 'robust' :                  for  col in  numerical_columns:             median = dataframe[col].median()             q1 = dataframe[col].quantile(0.25 )             q3 = dataframe[col].quantile(0.75 )             iqr = q3 - q1             normalized_df[col] = (dataframe[col] - median) / iqr          return  normalized_df 
 
类别型数据标准化 
• 统一编码方案设计(如消费类别、交易类型) 
• 层级类别标准化(如行业分类) 
• 自定义编码映射表维护与更新机制 
时间数据标准化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def  normalize_datetime_data (df, datetime_cols ):    df_normalized = df.copy()          for  col in  datetime_cols:                  df_normalized[col] = pd.to_datetime(df[col], errors='coerce' )                           df_normalized[f'{col} _year' ] = df_normalized[col].dt.year         df_normalized[f'{col} _month' ] = df_normalized[col].dt.month         df_normalized[f'{col} _day' ] = df_normalized[col].dt.day         df_normalized[f'{col} _dayofweek' ] = df_normalized[col].dt.dayofweek         df_normalized[f'{col} _quarter' ] = df_normalized[col].dt.quarter                           df_normalized[f'{col} _is_weekend' ] = df_normalized[col].dt.dayofweek >= 5          df_normalized[f'{col} _is_month_start' ] = df_normalized[col].dt.is_month_start         df_normalized[f'{col} _is_month_end' ] = df_normalized[col].dt.is_month_end          return  df_normalized 
 
3. 特征提取与增强层 
3.1 金融特征提取 
个人消费特征提取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def  extract_consumption_features (consumption_data ):    features = {}               features['total_spending' ] = consumption_data['amount' ].sum ()     features['avg_transaction_amount' ] = consumption_data['amount' ].mean()     features['max_transaction_amount' ] = consumption_data['amount' ].max ()     features['transaction_count' ] = len (consumption_data)               category_distribution = consumption_data.groupby('category' )['amount' ].sum ()     features['category_distribution' ] = category_distribution.to_dict()               date_range = (consumption_data['transaction_date' ].max () -                   consumption_data['transaction_date' ].min ()).days     features['transaction_frequency' ] = len (consumption_data) / max (date_range, 1 )               essential_categories = ['groceries' , 'utilities' , 'healthcare' , 'housing' ]     essential_spending = consumption_data[consumption_data['category' ].isin(         essential_categories)]['amount' ].sum ()     features['essential_spending_ratio' ] = essential_spending / features['total_spending' ]               features['spending_volatility' ] = consumption_data.groupby(         consumption_data['transaction_date' ].dt.month)['amount' ].sum ().std()               monthly_spending = consumption_data.groupby(         consumption_data['transaction_date' ].dt.month)['amount' ].sum ()     features['monthly_spending_pattern' ] = monthly_spending.to_dict()          return  features 
 
商家经营特征提取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def  extract_business_financial_features (business_data ):    features = {}               features['profit_margin' ] = business_data['net_income' ] / business_data['revenue' ]     features['return_on_assets' ] = business_data['net_income' ] / business_data['total_assets' ]     features['gross_margin' ] = (business_data['revenue' ] - business_data['cost_of_goods_sold' ]) / business_data['revenue' ]               features['current_ratio' ] = business_data['current_assets' ] / business_data['current_liabilities' ]     features['quick_ratio' ] = (business_data['current_assets' ] - business_data['inventory' ]) / business_data['current_liabilities' ]               features['inventory_turnover' ] = business_data['cost_of_goods_sold' ] / business_data['average_inventory' ]     features['asset_turnover' ] = business_data['revenue' ] / business_data['total_assets' ]               features['debt_to_equity' ] = business_data['total_liabilities' ] / business_data['total_equity' ]     features['interest_coverage' ] = business_data['ebit' ] / business_data['interest_expense' ]               features['revenue_growth' ] = calculate_growth_rate(business_data['revenue' ])     features['profit_growth' ] = calculate_growth_rate(business_data['net_income' ])               features['operating_cash_flow_ratio' ] = business_data['operating_cash_flow' ] / business_data['current_liabilities' ]     features['cash_flow_to_debt' ] = business_data['operating_cash_flow' ] / business_data['total_debt' ]          return  features 
 
3.2 时间序列分析 
时间序列分解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def  decompose_time_series (time_series_data, date_column, value_column ):         ts_data = time_series_data.set_index(date_column)[value_column]               from  statsmodels.tsa.seasonal import  seasonal_decompose               try :                  result = seasonal_decompose(ts_data, model='additive' )                  decomposition = {             'trend' : result.trend,             'seasonal' : result.seasonal,             'residual' : result.resid,             'observed' : result.observed         }     except :                  decomposition = {             'trend' : calculate_rolling_average(ts_data),             'volatility' : calculate_volatility(ts_data),             'observed' : ts_data         }          return  decomposition 
 
消费模式检测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def  detect_consumption_patterns (consumption_data ):    patterns = {}               daily_spending = consumption_data.groupby(consumption_data['transaction_date' ].dt.date)['amount' ].sum ()               from  statsmodels.tsa.stattools import  acf               acf_values = acf(daily_spending, nlags=30 )     significant_lags = [i for  i, value in  enumerate (acf_values) if  abs (value) > 0.2 ]          if  7  in  significant_lags:         patterns['weekly_pattern' ] = True               if  any (lag in  significant_lags for  lag in  [28 , 29 , 30 , 31 ]):         patterns['monthly_pattern' ] = True                mean_spending = daily_spending.mean()     std_spending = daily_spending.std()          anomaly_days = daily_spending[abs (daily_spending - mean_spending) > 2  * std_spending]     patterns['spending_anomalies' ] = anomaly_days.to_dict()          return  patterns 
 
趋势分析与预测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def  analyze_financial_trends (financial_data, time_column, target_columns ):    trend_analysis = {}          for  column in  target_columns:                  time_series = financial_data[[time_column, column]].dropna()                           if  len (time_series) >= 30 :                          from  statsmodels.tsa.arima.model import  ARIMA                          try :                 model = ARIMA(time_series[column], order=(1 , 1 , 1 ))                 model_fit = model.fit()                 forecast = model_fit.forecast(steps=3 )                                  trend_analysis[column] = {                     'historical_data' : time_series[column].to_dict(),                     'forecast_next_3_periods' : forecast.to_dict(),                     'model_summary' : model_fit.summary()                 }             except :                                  trend_analysis[column] = calculate_linear_trend(time_series, column)         else :                          trend_analysis[column] = calculate_linear_trend(time_series, column)          return  trend_analysis 
 
3.3 文本特征提取 
关键信息提取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 def  extract_key_information_from_text (text ):         import  spacy     nlp = spacy.load("zh_core_web_trf" )                 doc = nlp(text)               entities = {         'PERSON' : [],         'ORG' : [],         'DATE' : [],         'MONEY' : [],         'PERCENT' : [],         'GPE' : []       }          for  ent in  doc.ents:         if  ent.label_ in  entities:             entities[ent.label_].append({                 'text' : ent.text,                 'start' : ent.start_char,                 'end' : ent.end_char             })               from  keybert import  KeyBERT     kw_model = KeyBERT()     keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1 , 3 ), top_n=10 )               from  summa import  summarizer     summary = summarizer.summarize(text, ratio=0.2 )          return  {         'entities' : entities,         'keywords' : keywords,         'summary' : summary     } 
 
情感分析与风险评估
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def  analyze_text_sentiment_and_risk (text ):    from  transformers import  pipeline               sentiment_analyzer = pipeline("sentiment-analysis" , model="uer/roberta-base-finetuned-jd-binary-chinese" )     sentiment_result = sentiment_analyzer(text)               risk_terms = ['违约' , '拖欠' , '破产' , '诉讼' , '失信' , '高负债' , '资金链断裂' , '监管处罚' ]     risk_mentions = {}          for  term in  risk_terms:         if  term in  text:             import  re                          context_matches = re.finditer(f".{{0,50}}{term} .{{0,50}}" , text)             risk_mentions[term] = [match .group(0 ) for  match  in  context_matches]               risk_score = len (risk_mentions) * 10      if  sentiment_result[0 ]['label' ] == 'negative' :         risk_score += sentiment_result[0 ]['score' ] * 50           return  {         'sentiment' : sentiment_result,         'risk_mentions' : risk_mentions,         'risk_score' : risk_score     } 
 
3.4 数据增强 
交叉特征生成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def  generate_cross_features (personal_data, business_data=None , relatives_data=None  ):    cross_features = {}               if  'credit_history'  in  personal_data and  'consumption_patterns'  in  personal_data:                  cross_features['consumption_credit_correlation' ] = calculate_correlation(             personal_data['consumption_patterns' ]['monthly_spending' ],             personal_data['credit_history' ]['monthly_payment_history' ]         )               if  business_data is  not  None :                  if  'consumption_amount'  in  personal_data and  'revenue'  in  business_data:             cross_features['consumption_to_revenue_ratio' ] = (                 personal_data['consumption_amount' ] / business_data['revenue' ]             )               if  relatives_data is  not  None  and  is_compliant_to_use(relatives_data):                  if  'emergency_contacts'  in  relatives_data:             cross_features['emergency_contact_stability' ] = calculate_emergency_contact_stability(                 relatives_data['emergency_contacts' ]             )          return  cross_features 
 
领域知识增强
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def  enhance_with_domain_knowledge (features, domain='consumer_finance'  ):    enhanced_features = features.copy()          if  domain == 'consumer_finance' :                  if  'debt_to_income_ratio'  in  features:                          if  features['debt_to_income_ratio' ] < 0.36 :                 enhanced_features['dti_risk_level' ] = 'low'              elif  features['debt_to_income_ratio' ] < 0.43 :                 enhanced_features['dti_risk_level' ] = 'medium'              else :                 enhanced_features['dti_risk_level' ] = 'high'                            if  all (k in  features for  k in  ['savings_rate' , 'essential_spending_ratio' , 'credit_utilization' ]):             enhanced_features['financial_health_index' ] = calculate_financial_health_index(                 features['savings_rate' ],                 features['essential_spending_ratio' ],                 features['credit_utilization' ]             )          elif  domain == 'small_business' :                  if  all (k in  features for  k in  ['current_ratio' , 'debt_to_equity' , 'profit_margin' ]):             enhanced_features['business_viability_score' ] = calculate_business_viability(                 features['current_ratio' ],                 features['debt_to_equity' ],                 features['profit_margin' ]             )          return  enhanced_features 
 
4. 数据存储与索引层 
4.1 自适应分块 
智能分块策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def  chunk_data_intelligently (data, data_type ):    chunks = []          if  data_type == 'financial_record' :                  chunks = chunk_by_financial_period(data)          elif  data_type == 'consumption_record' :                  chunks = chunk_by_consumption_pattern(data)          elif  data_type == 'expert_text' :                  chunks = chunk_text_by_semantic_units(data)                           chunks = apply_recursive_chunking(chunks)               chunks = ensure_semantic_integrity(chunks)          return  chunks 
 
多粒度分块机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def  create_multi_granularity_chunks (text_data ):         large_chunks = split_text_into_chunks(text_data, chunk_size=2000 , overlap=200 )               medium_chunks = split_text_into_chunks(text_data, chunk_size=500 , overlap=100 )               small_chunks = split_text_into_chunks(text_data, chunk_size=150 , overlap=30 )               chunk_hierarchy = build_chunk_hierarchy(large_chunks, medium_chunks, small_chunks)          return  {         'large_chunks' : large_chunks,         'medium_chunks' : medium_chunks,         'small_chunks' : small_chunks,         'hierarchy' : chunk_hierarchy     } 
 
4.2 向量嵌入 
多模型嵌入策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 def  create_embeddings (chunks, embedding_models=None  ):    if  embedding_models is  None :                  embedding_models = {             'primary' : 'BAAI/bge-large-zh' ,             'backup' : 'shibing624/text2vec-base-chinese'          }          from  sentence_transformers import  SentenceTransformer               primary_model = SentenceTransformer(embedding_models['primary' ])     backup_model = SentenceTransformer(embedding_models['backup' ])          embeddings = {}               try :         embeddings['primary' ] = primary_model.encode(             [chunk['text' ] for  chunk in  chunks],             batch_size=32 ,             show_progress_bar=True          )     except  Exception as  e:         print (f"Primary model encoding failed: {e} " )               try :         embeddings['backup' ] = backup_model.encode(             [chunk['text' ] for  chunk in  chunks],             batch_size=32 ,             show_progress_bar=True          )     except  Exception as  e:         print (f"Backup model encoding failed: {e} " )               if  'primary'  not  in  embeddings and  'backup'  not  in  embeddings:         raise  ValueError("All embedding models failed" )          return  embeddings 
 
特殊内容处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def  embed_special_content (chunks ):    embeddings = {}               table_chunks = [chunk for  chunk in  chunks if  is_table_chunk(chunk)]               numerical_chunks = [chunk for  chunk in  chunks if  is_numerical_chunk(chunk)]               if  table_chunks:         table_embeddings = generate_table_embeddings(table_chunks)         embeddings['table_embeddings' ] = table_embeddings               if  numerical_chunks:         numerical_embeddings = generate_numerical_embeddings(numerical_chunks)         embeddings['numerical_embeddings' ] = numerical_embeddings          return  embeddings 
 
4.3 元数据生成 
元数据提取与标准化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 def  generate_metadata (chunks, original_data_source ):    metadata = []          for  i, chunk in  enumerate (chunks):         chunk_metadata = {             'chunk_id' : f"{original_data_source['id' ]} _{i} " ,             'source_type' : original_data_source['type' ],             'creation_date' : original_data_source.get('date' ),             'last_modified' : original_data_source.get('last_modified' ),             'chunk_position' : i,             'total_chunks' : len (chunks)         }                           if  original_data_source['type' ] == 'personal_consumption' :                          if  'date_range'  in  chunk:                 chunk_metadata['start_date' ] = chunk['date_range' ][0 ]                 chunk_metadata['end_date' ] = chunk['date_range' ][1 ]                                       if  'categories'  in  chunk:                 chunk_metadata['categories' ] = chunk['categories' ]                  elif  original_data_source['type' ] == 'business_data' :                          if  'financial_period'  in  chunk:                 chunk_metadata['period' ] = chunk['financial_period' ]                                       if  'report_type'  in  chunk:                 chunk_metadata['report_type' ] = chunk['report_type' ]                  elif  original_data_source['type' ] == 'expert_case' :                          if  'section_title'  in  chunk:                 chunk_metadata['section_title' ] = chunk['section_title' ]                                       if  'entities'  in  chunk:                 chunk_metadata['entities' ] = chunk['entities' ]                  metadata.append(chunk_metadata)          return  metadata 
 
时间属性与过滤标签
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def  enrich_metadata_with_filters (metadata, analysis_results ):    enriched_metadata = []          for  item in  metadata:                  enriched_item = item.copy()                           if  'creation_date'  in  item:             import  datetime             now = datetime.datetime.now()             creation_date = item['creation_date' ]                                       if  isinstance (creation_date, datetime.datetime):                 age_days = (now - creation_date).days                                  if  age_days < 30 :                     enriched_item['time_relevance' ] = 'very_recent'                  elif  age_days < 90 :                     enriched_item['time_relevance' ] = 'recent'                  elif  age_days < 365 :                     enriched_item['time_relevance' ] = 'within_year'                  else :                     enriched_item['time_relevance' ] = 'historical'                            if  item['source_type' ] == 'expert_case' :                          chunk_id = item['chunk_id' ]             if  chunk_id in  analysis_results:                 enriched_item['content_type' ] = analysis_results[chunk_id]['content_type' ]                 enriched_item['sentiment' ] = analysis_results[chunk_id]['sentiment' ]                 enriched_item['complexity' ] = analysis_results[chunk_id]['complexity' ]                  enriched_metadata.append(enriched_item)          return  enriched_metadata 
 
4.4 向量数据存储 
混合存储策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def  store_processed_data (chunks, embeddings, metadata, storage_config ):         assert  len (chunks) == len (embeddings['primary' ]) == len (metadata), "Data length mismatch"                vector_db = initialize_vector_db(storage_config['vector_db' ])     document_db = initialize_document_db(storage_config['document_db' ])               vector_records = []     document_records = []          for  i, (chunk, embedding, meta) in  enumerate (zip (chunks, embeddings['primary' ], metadata)):                  vector_record = {             'id' : meta['chunk_id' ],             'vector' : embedding,             'metadata' : {k: v for  k, v in  meta.items() if  k != 'chunk_id' }         }         vector_records.append(vector_record)                           document_record = {             'id' : meta['chunk_id' ],             'text' : chunk['text' ],             'metadata' : meta,             'embedding_id' : meta['chunk_id' ]           }         document_records.append(document_record)               vector_db.insert_batch(vector_records)               document_db.insert_batch(document_records)               create_indices(vector_db, document_db)          return  {         'vector_db_info' : vector_db.get_collection_stats(),         'document_db_info' : document_db.get_collection_stats()     } 
 
检索优化配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def  configure_retrieval_settings (vector_db, application_type='loan_approval'  ):              if  application_type == 'loan_approval' :                  retrieval_config = {             'search_algorithm' : 'hybrid' ,               'sparse_weight' : 0.3 ,                       'dense_weight' : 0.7 ,                        'top_k' : 15 ,                                'reranker_model' : 'BAAI/bge-reranker-large' ,               'minimum_relevance_score' : 0.75 ,                           'default_filters' : {                 'time_relevance' : ['very_recent' , 'recent' ]               }         }          elif  application_type == 'risk_analysis' :                  retrieval_config = {             'search_algorithm' : 'dense' ,                'top_k' : 25 ,                                'reranker_model' : 'BAAI/bge-reranker-large' ,             'minimum_relevance_score' : 0.65 ,               'default_filters' : {}           }               vector_db.configure_retrieval(retrieval_config)          return  retrieval_config 
 
5. 数据安全与合规性保障 
5.1 隐私保护机制 
数据脱敏实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def  apply_data_anonymization (data, sensitivity_level='high'  ):    anonymized_data = data.copy()               pii_fields = {         'id_number' : mask_id_number,         'phone_number' : mask_phone_number,         'email' : mask_email,         'address' : mask_address,         'full_name' : mask_name,         'bank_account' : mask_bank_account     }               for  field, mask_func in  pii_fields.items():         if  field in  anonymized_data:             anonymized_data[field] = mask_func(anonymized_data[field], sensitivity_level)               text_fields = [field for  field in  anonymized_data if  isinstance (anonymized_data[field], str )]     for  field in  text_fields:         anonymized_data[field] = anonymize_text(anonymized_data[field], sensitivity_level)          return  anonymized_data 
 
差分隐私实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def  apply_differential_privacy (aggregate_data, epsilon=0.1  ):    """      对聚合数据应用差分隐私保护     epsilon: 隐私预算 (较小值提供更强的隐私保护)     """     from  diffprivlib.tools import  mean, std, var          dp_results = {}               for  field, values in  aggregate_data.items():         if  isinstance (values, list ) and  all (isinstance (x, (int , float )) for  x in  values):             dp_results[f'{field} _mean' ] = mean(values, epsilon=epsilon)             dp_results[f'{field} _std' ] = std(values, epsilon=epsilon)             dp_results[f'{field} _var' ] = var(values, epsilon=epsilon)          return  dp_results 
 
5.2 合规性审查流程 
数据使用合规检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def  check_data_compliance (data_source, intended_use ):    compliance_status = {         'is_compliant' : True ,         'issues' : [],         'recommendations' : []     }               if  not  is_legal_data_source(data_source):         compliance_status['is_compliant' ] = False          compliance_status['issues' ].append('数据来源不合规' )         compliance_status['recommendations' ].append('仅使用合法授权的数据来源' )               if  not  is_purpose_compliant(intended_use, data_source):         compliance_status['is_compliant' ] = False          compliance_status['issues' ].append('数据使用目的与用户授权不符' )         compliance_status['recommendations' ].append('获取明确的数据使用授权' )               if  data_source.get('type' ) == 'relatives_info' :         relatives_compliance = check_relatives_data_compliance(data_source, intended_use)         if  not  relatives_compliance['is_compliant' ]:             compliance_status['is_compliant' ] = False              compliance_status['issues' ].extend(relatives_compliance['issues' ])             compliance_status['recommendations' ].extend(relatives_compliance['recommendations' ])                                       compliance_status['recommendations' ].append('仅使用作为紧急联系人的基本联系信息,禁止用于信用评估' )          return  compliance_status 
 
数据访问控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def  apply_access_controls (processed_data, compliance_status ):         if  not  compliance_status['is_compliant' ]:                  restricted_data = apply_strict_restrictions(processed_data, compliance_status['issues' ])                           log_access_restrictions(restricted_data, compliance_status)                  return  restricted_data     else :                  protected_data = add_minimal_access_controls(processed_data)                           protected_data = add_usage_tracking(protected_data)                  return  protected_data 
 
五、风险评估引擎 
多维度风险指标体系 
信用风险指标 
信用评分与历史 
还款记录分析 
违约概率模型 
财务风险指标 
现金流健康度 
偿债能力比率 
盈利能力指标 
经营效率指标 
市场与行业风险指标 
行业景气指数 
市场竞争格局 
宏观经济影响因素 
运营风险指标 
经营稳定性评估 
管理团队分析 
业务模式风险评估 
综合风险评分计算 
多指标加权模型 
机器学习综合评分 
LLM辅助风险综合评价
风险模型集成 
传统风险模型 
逻辑回归信用模型 
决策树风险分类 
随机森林风险预测 
基于LLM的风险分析 
多特征关联性分析 
非结构化数据风险识别 
定性因素风险判断 
混合模型决策机制 
模型权重动态调整 
风险评级与置信度估计 
异常检测与人工介入触发
风险评估流程 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 sequenceDiagram     参与者 信贷员     参与者 系统     参与者 风控审核          信贷员->>系统: 提交贷款申请     系统->>系统: 数据收集与预处理     系统->>系统: 基础风险指标计算          par 并行风险评估         系统->>系统: 信用风险评估         系统->>系统: 财务风险评估         系统->>系统: 市场风险评估         系统->>系统: 运营风险评估     end          系统->>系统: 综合风险评分生成     系统->>系统: 风险解释生成          alt 常规风险案例         系统->>信贷员: 返回风险评估报告     else 异常或高风险案例         系统->>风控审核: 转交人工审核         风控审核->>系统: 审核结果输入         系统->>信贷员: 返回风险评估报告     end 
 
六、LangGraph协作机制 
工作流定义 
系统基于LangGraph构建的协作工作流包含以下核心元素: 
节点定义 
每个Agent作为一个功能节点 
定义节点输入输出接口 
设置节点处理逻辑与条件 
边与流转控制 
定义节点间信息传递规则 
设置条件分支与循环机制 
错误处理与回退路径 
状态管理 
全局评估流程状态追踪 
中间结果缓存与恢复 
多轮交互状态保持
代码示例: 核心工作流定义 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from  langchain.graphs import  StateGraphfrom  langchain.prompts import  ChatPromptTemplatefrom  langchain.chat_models import  ChatOpenAIllm = ChatOpenAI(model="gpt-4" ) workflow = StateGraph(name="贷款风险评估" ) coordinator_node = workflow.add_node("coordinator" , coordinator_agent) data_acquisition_node = workflow.add_node("data_acquisition" , data_acquisition_agent) credit_analysis_node = workflow.add_node("credit_analysis" , credit_analysis_agent) financial_analysis_node = workflow.add_node("financial_analysis" , financial_analysis_agent) market_analysis_node = workflow.add_node("market_analysis" , market_analysis_agent) fraud_detection_node = workflow.add_node("fraud_detection" , fraud_detection_agent) compliance_node = workflow.add_node("compliance" , compliance_agent) decision_node = workflow.add_node("decision_synthesis" , decision_synthesis_agent) communication_node = workflow.add_node("communication" , communication_agent) workflow.add_edge(coordinator_node, data_acquisition_node) workflow.add_conditional_edges(     data_acquisition_node,     lambda  x: "data_complete"  if  x["data_status" ] == "complete"  else  "data_incomplete" ,     {         "data_complete" : [credit_analysis_node, financial_analysis_node,                            market_analysis_node, fraud_detection_node],         "data_incomplete" : coordinator_node     } ) for  node in  [credit_analysis_node, financial_analysis_node,              market_analysis_node, fraud_detection_node]:     workflow.add_edge(node, compliance_node) workflow.add_edge(compliance_node, decision_node) workflow.add_edge(decision_node, communication_node) workflow.set_entry_point(coordinator_node) workflow.set_finish_point(communication_node) app = workflow.compile () 
 
并行处理与同步机制 
并行分析策略 
信用、财务、市场等分析并行执行 
采用"fork-join"模式进行结果汇总 
支持分析依赖性管理与优先级排序 
同步点设计 
关键决策点强制同步 
中间结果共享机制 
冲突检测与解决策略 
异常处理机制 
Agent失败重试策略 
降级处理路径 
人工介入触发条件
七、LLM应用策略 
模型选择策略 
多层级模型架构 
基础任务:Qwen-Max或国产替代模型 
复杂推理:GPT-4或等效高性能模型 
特定功能:专业微调模型 
动态模型选择 
基于任务复杂度选择模型 
考虑成本与性能平衡 
支持模型回退机制
Prompt工程最佳实践 
角色与指令定义 
明确Agent角色和任务边界 
提供详细操作指南和评估标准 
设置输出格式和质量要求 
情境化提示设计 
包含相关背景信息和前置知识 
预设任务流程和决策框架 
提供示例输出引导模型行为 
去偏策略 
强调公平评估和避免歧视 
设置多角度考量要求 
引入交叉验证机制
示例: 财务分析Agent提示模板 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 你是一位专业的小企业财务分析专家,负责评估贷款申请企业的财务健康状况。 现在你需要分析以下企业财务数据,并生成财务风险评估报告。 企业基本信息: {business_info} 财务数据: {financial_data} 行业标准参考: {industry_benchmarks} 评估要求: 1. 分析关键财务比率(包括但不限于流动比率、速动比率、资产负债率、利息保障倍数) 2. 评估企业现金流状况和趋势 3. 分析盈利能力和可持续性 4. 与行业标准进行对比 5. 识别财务风险点和异常指标 6. 评估企业偿债能力和财务弹性 7. 给出财务风险等级(低、中、高)和详细理由 输出格式: - 结构化JSON格式,包含各项指标评估和总体评价 - 每项评估必须包含数值分析和定性解释 - 必须包含风险等级和关键风险因素列表 避免: - 不要仅根据单一指标做出结论 - 不要忽略行业特性和企业发展阶段 - 不要使用未提供的信息进行假设 - 必须客观公正,避免任何形式的偏见 
 
八、RAG应用方案 
应用场景映射 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 +-------------------------+------------------------+---------------------+ |      应用场景           |       RAG功能         |     关键优化点      | +-------------------------+------------------------+---------------------+ | 政策法规查询           | 准确定位相关法规条例   | 关键词与向量混合检索 | |                        | 解释应用于具体案例     | 内容重排与合规验证   | +-------------------------+------------------------+---------------------+ | 行业风险分析           | 提取行业特定风险因素   | 多类型文档整合能力   | |                        | 关联市场趋势数据       | 时效性内容优先展示   | +-------------------------+------------------------+---------------------+ | 案例参考与决策支持     | 查找相似案例的处理方式 | 相似度算法优化       | |                        | 提供决策依据与解释     | 公平性和合规性保障   | +-------------------------+------------------------+---------------------+ | 异常情况处理指南       | 识别异常模式并提供处理 | 多路径检索策略       | |                        | 方案                   | 异常情况分类精度     | +-------------------------+------------------------+---------------------+ | 风险指标解释           | 生成易理解的指标解释   | 内容简化与通俗化     | |                        | 提供改善建议           | 术语释义自动增强     | +-------------------------+------------------------+---------------------+ 
 
RAG实现流程 
数据准备 
收集相关政策、法规、行业分析报告 
整理历史案例库和决策记录 
制作专业术语解释文档和评估标准说明 
知识库构建 
数据清洗与格式标准化 
文档分块与多向量索引创建 
元数据标注与分类系统建立 
检索引擎实现 
查询理解与改写模块开发 
混合检索策略实现 
结果重排与过滤机制构建 
结果合成与应用 
检索内容与当前任务整合 
基于检索增强提示工程 
结果验证与引用追踪
代码示例: RAG检索增强处理 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 from  langchain.retrievers import  ContextualCompressionRetrieverfrom  langchain.retrievers.document_compressors import  LLMChainExtractorfrom  langchain.retrievers.multi_query import  MultiQueryRetrieverfrom  langchain.retrievers import  BM25Retriever, EnsembleRetrieverfrom  langchain.chat_models import  ChatOpenAIllm = ChatOpenAI(model="gpt-4" ) vector_retriever = vectordb.as_retriever(     search_type="similarity_score_threshold" ,     search_kwargs={"k" : 6 , "score_threshold" : 0.5 } ) bm25_retriever = BM25Retriever.from_documents(     documents,      preprocess_func=lambda  x: x.page_content ) ensemble_retriever = EnsembleRetriever(     retrievers=[vector_retriever, bm25_retriever],     weights=[0.7 , 0.3 ] ) multi_query_retriever = MultiQueryRetriever.from_llm(     retriever=ensemble_retriever,     llm=llm ) compressor = LLMChainExtractor.from_llm(llm) compression_retriever = ContextualCompressionRetriever(     base_compressor=compressor,     base_retriever=multi_query_retriever ) def  retrieve_enhanced_context (query, business_context ):         enhanced_query = f"""      以下是关于一家小企业贷款申请的查询,企业背景:     {business_context}           原始查询: {query}      """               docs = compression_retriever.get_relevant_documents(enhanced_query)               processed_docs = []     for  doc in  docs:                  processed_docs.append({             "content" : doc.page_content,             "source" : doc.metadata.get("source" , "未知" ),             "relevance" : doc.metadata.get("score" , 0.0 ),             "date" : doc.metadata.get("date" , "未知" )         })               processed_docs = sorted (processed_docs, key=lambda  x: x["relevance" ], reverse=True )          return  processed_docs