From f895ab663793e9a2e016b5b5be24006920b30375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=AF=E8=BF=9B?= Date: Fri, 28 Nov 2025 18:21:44 +0800 Subject: [PATCH 1/2] lora-support --- F2LLM/arguments.py | 4 + F2LLM/configs/config_bert_base.json | 22 ++ F2LLM/configs/config_llama3_8b.json | 22 ++ F2LLM/configs/config_qwen3_mean_pooling.json | 22 ++ ...77\347\224\250\346\226\207\346\241\243.md" | 180 ++++++++++++ ...64\346\230\216\346\226\207\346\241\243.md" | 99 +++++++ ...00\346\261\202\346\226\207\346\241\243.md" | 95 +++++++ F2LLM/model.py | 162 ++++++++++- F2LLM/run.py | 39 ++- F2LLM/tokenize_data_bert_improved.py | 126 ++++++++ F2LLM/tokenize_data_generic.py | 268 ++++++++++++++++++ F2LLM/tokenize_data_llama_improved.py | 128 +++++++++ F2LLM/tokenize_data_qwen.py | 153 +++++++--- F2LLM/tokenize_data_qwen_improved.py | 126 ++++++++ F2LLM/utils.py | 51 +++- 15 files changed, 1431 insertions(+), 66 deletions(-) create mode 100644 F2LLM/configs/config_bert_base.json create mode 100644 F2LLM/configs/config_llama3_8b.json create mode 100644 F2LLM/configs/config_qwen3_mean_pooling.json create mode 100644 "F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" create mode 100644 "F2LLM/docs/\351\205\215\347\275\256\346\226\207\344\273\266\350\257\264\346\230\216\346\226\207\346\241\243.md" create mode 100644 "F2LLM/docs/\351\234\200\346\261\202\346\226\207\346\241\243.md" create mode 100644 F2LLM/tokenize_data_bert_improved.py create mode 100644 F2LLM/tokenize_data_generic.py create mode 100644 F2LLM/tokenize_data_llama_improved.py create mode 100644 F2LLM/tokenize_data_qwen_improved.py diff --git a/F2LLM/arguments.py b/F2LLM/arguments.py index b967c8f..ac29481 100644 --- a/F2LLM/arguments.py +++ b/F2LLM/arguments.py @@ -21,6 +21,10 @@ class Args: warmup_steps: int = 100 # embedding-related settings num_hard_neg: int = 7 + # model configuration + model_type: str = "auto" + embedding_strategy: str = "last_token" + pooling_strategy: str = "cls" # train steps take precedence over epochs, set to -1 to disable train_steps: int = -1 train_epochs: int = 5 diff --git a/F2LLM/configs/config_bert_base.json b/F2LLM/configs/config_bert_base.json new file mode 100644 index 0000000..0c146ae --- /dev/null +++ b/F2LLM/configs/config_bert_base.json @@ -0,0 +1,22 @@ +{ + "model_path": "models/bert-base-uncased", + "model_type": "bert", + "experiment_id": "bert-base-uncased-embedding", + "train_data_path": "data_tokenized_bert", + "output_dir": "output", + "tb_dir": "output/tb", + "cache_dir": "cache", + "train_batch_size": 32, + "checkpointing_steps": 5000, + "validation_steps": 5000, + "max_seq_length": 512, + "learning_rate": 2e-5, + "min_lr": 1e-7, + "weight_decay": 0.01, + "warmup_steps": 500, + "train_epochs": 3, + "log_interval": 100, + "num_hard_neg": 7, + "embedding_strategy": "cls", + "pooling_strategy": "cls" +} diff --git a/F2LLM/configs/config_llama3_8b.json b/F2LLM/configs/config_llama3_8b.json new file mode 100644 index 0000000..570f41f --- /dev/null +++ b/F2LLM/configs/config_llama3_8b.json @@ -0,0 +1,22 @@ +{ + "model_path": "models/llama-3-8b", + "model_type": "llama", + "experiment_id": "llama3-8b-embedding", + "train_data_path": "training_data/data_tokenized_llama", + "output_dir": "output", + "tb_dir": "output/tb", + "cache_dir": "cache", + "train_batch_size": 16, + "checkpointing_steps": 5000, + "validation_steps": 5000, + "max_seq_length": 2048, + "learning_rate": 1e-5, + "min_lr": 1e-7, + "weight_decay": 0.01, + "warmup_steps": 500, + "train_epochs": 2, + "log_interval": 100, + "num_hard_neg": 7, + "embedding_strategy": "last_token", + "pooling_strategy": "last_token" +} diff --git a/F2LLM/configs/config_qwen3_mean_pooling.json b/F2LLM/configs/config_qwen3_mean_pooling.json new file mode 100644 index 0000000..f111268 --- /dev/null +++ b/F2LLM/configs/config_qwen3_mean_pooling.json @@ -0,0 +1,22 @@ +{ + "model_path": "models/qwen3-0.6b", + "model_type": "qwen", + "experiment_id": "qwen3-0.6b-mean-pooling", + "train_data_path": "training_data/data_tokenized_qwen", + "output_dir": "output", + "tb_dir": "output/tb", + "cache_dir": "cache", + "train_batch_size": 16, + "checkpointing_steps": 5000, + "validation_steps": 5000, + "max_seq_length": 1024, + "learning_rate": 8e-6, + "min_lr": 1e-7, + "weight_decay": 0.01, + "warmup_steps": 500, + "train_epochs": 2, + "log_interval": 100, + "num_hard_neg": 7, + "embedding_strategy": "mean_pooling", + "pooling_strategy": "mean" +} diff --git "a/F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" "b/F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" new file mode 100644 index 0000000..3294ec9 --- /dev/null +++ "b/F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" @@ -0,0 +1,180 @@ +# F2LLM 不同模型使用文档 + +## 概述 + +本文档详细说明了F2LLM项目中支持的不同模型的使用方法,包括BERT、LLaMA、Qwen等模型的训练和推理流程。 + +## 支持的模型列表 + +1. **BERT类模型** + - bert-base-uncased + - bert-large-uncased + +2. **LLaMA类模型** + - llama-3-8b + +3. **Qwen类模型** + - qwen3-0.6b + - qwen3-4b + +## 模型使用指南 + +### 1. BERT模型使用 + +#### 训练流程 + +1. 数据预处理: + ```bash + # 使用通用数据预处理脚本 + python tokenize_data_generic.py --config configs/config_generic.json --model_path models/bert-base-uncased --model_type bert + ``` + +2. 模型训练: + ```bash + python run.py --config configs/config_bert_base.json + ``` + +#### 模型特点 + +- 适用于句子相似度、文本分类等任务 +- 使用[CLS]标记的隐藏状态作为句子表示 +- 最大序列长度通常为512 + +### 2. LLaMA模型使用 + +#### 训练流程 + +1. 数据预处理: + ```bash + # 使用通用数据预处理脚本 + python tokenize_data_generic.py --config configs/config_generic.json --model_path models/llama-3-8b --model_type llama + ``` + +2. 模型训练: + ```bash + python run.py --config configs/config_llama3_8b.json + ``` + +#### 模型特点 + +- 解码器架构模型 +- 使用最后一个token的隐藏状态作为句子表示 +- 支持更长的序列长度(可达2048) +- 需要更多的计算资源 + +### 3. Qwen模型使用 + +#### 训练流程 + +1. 数据预处理: + ```bash + # 使用通用数据预处理脚本 + python tokenize_data_generic.py --config configs/config_generic.json --model_path models/qwen3-0.6b --model_type qwen + ``` + +2. 模型训练(使用平均池化): + ```bash + python run.py --config configs/config_qwen3_mean_pooling.json + ``` + +#### 模型特点 + +- 阿里巴巴集团开发的大语言模型 +- 支持多种池化策略(CLS、平均池化、最后token等) +- 针对中文和多语言任务进行了优化 +- Qwen3系列支持更长的上下文长度 + +## 配置参数说明 + +### 模型相关参数 + +| 参数名 | BERT | LLaMA | Qwen | +|--------|------|-------|------| +| `model_path` | models/bert-base-uncased | models/llama-3-8b | models/qwen3-0.6b | +| `model_type` | bert | llama | qwen | +| `max_seq_length` | 512 | 2048 | 1024 | +| `train_batch_size` | 32 | 16 | 16 | +| `learning_rate` | 2e-5 | 1e-5 | 8e-6 | +| `train_epochs` | 3 | 2 | 2 | +| `embedding_strategy` | cls | last_token | mean_pooling | +| `pooling_strategy` | cls | last_token | mean | + +### 处理模式参数 + +对于数据预处理脚本,支持多种处理模式: + +1. **自动模式(auto)**: + 根据`num_processes`参数自动选择处理方式 + ```bash + python tokenize_data_generic.py --config configs/config_generic.json --mode auto + ``` + +2. **串行模式(serial)**: + 使用单进程处理,适用于调试和小数据集 + ```bash + python tokenize_data_generic.py --config configs/config_generic.json --mode serial + ``` + +3. **并行模式(parallel)**: + 使用多进程处理,提高处理速度 + ```bash + python tokenize_data_generic.py --config configs/config_generic.json --mode parallel --num_processes 4 + ``` + +4. **GPU加速模式(gpu)**: + 使用GPU进行处理,需要支持CUDA或MPS的设备 + ```bash + python tokenize_data_generic.py --config configs/config_generic.json --mode gpu --device cuda + ``` + +## 设备支持 + +### CPU处理 + +默认使用CPU进行处理,适用于所有环境: +```bash +python tokenize_data_generic.py --config configs/config_generic.json --device cpu +``` + +### CUDA GPU处理 + +使用NVIDIA GPU进行加速处理: +```bash +python tokenize_data_generic.py --config configs/config_generic.json --device cuda +``` + +### MPS GPU处理 + +使用Apple Silicon GPU进行加速处理(仅适用于Mac M系列芯片): +```bash +python tokenize_data_generic.py --config configs/config_generic.json --device mps +``` + +## 注意事项 + +1. **模型路径**:确保模型路径正确,且包含模型的配置文件和分词器文件 +2. **序列长度**:不同模型支持的最大序列长度不同,设置时需注意 +3. **批次大小**:根据GPU显存大小调整批次大小,避免显存溢出 +4. **学习率**:不同模型的最优学习率不同,需要根据实际情况调整 +5. **嵌入策略**:根据模型架构选择合适的嵌入策略(CLS、平均池化、最后token等) +6. **设备兼容性**:确保指定的设备在当前环境中可用 + +## 常见问题 + +### 1. 如何选择合适的模型? + +- **BERT**:适用于传统的NLP任务,如句子相似度、文本分类等 +- **LLaMA**:适用于需要更长上下文理解和生成能力的任务 +- **Qwen**:适用于中文和多语言任务,具有良好的对话能力 + +### 2. 如何调整超参数? + +- **学习率**:从推荐值开始,根据验证集表现进行调整 +- **批次大小**:根据GPU显存调整,通常为2的幂次 +- **训练轮数**:根据验证集表现决定,避免过拟合 + +### 3. 如何处理长文本? + +- 选择支持长序列的模型(如LLaMA、Qwen) +- 调整`max_seq_length`参数 +- 考虑使用滑动窗口等策略处理超长文本 diff --git "a/F2LLM/docs/\351\205\215\347\275\256\346\226\207\344\273\266\350\257\264\346\230\216\346\226\207\346\241\243.md" "b/F2LLM/docs/\351\205\215\347\275\256\346\226\207\344\273\266\350\257\264\346\230\216\346\226\207\346\241\243.md" new file mode 100644 index 0000000..228f9aa --- /dev/null +++ "b/F2LLM/docs/\351\205\215\347\275\256\346\226\207\344\273\266\350\257\264\346\230\216\346\226\207\346\241\243.md" @@ -0,0 +1,99 @@ +# F2LLM 配置文件说明文档 + +## 概述 + +本文档详细说明了 `F2LLM/configs` 目录下各个配置文件的用途和参数含义。这些配置文件用于控制模型训练和数据处理的不同方面。 + +## 配置文件列表 + +### 1. config_bert_base.json + +用于BERT基础模型的训练配置。 + +**主要参数:** +- `model_path`: "models/bert-base-uncased" - 模型路径 +- `model_type`: "bert" - 模型类型 +- `experiment_id`: "bert-base-uncased-embedding" - 实验标识 +- `max_seq_length`: 512 - 最大序列长度 +- `train_batch_size`: 32 - 训练批次大小 +- `learning_rate`: 2e-5 - 学习率 +- `train_epochs`: 3 - 训练轮数 +- `embedding_strategy`: "cls" - 嵌入策略 +- `pooling_strategy`: "cls" - 池化策略 + +### 2. config_llama3_8b.json + +用于LLaMA3 8B模型的训练配置。 + +**主要参数:** +- `model_path`: "models/llama-3-8b" - 模型路径 +- `model_type`: "llama" - 模型类型 +- `experiment_id`: "llama3-8b-embedding" - 实验标识 +- `max_seq_length`: 2048 - 最大序列长度 +- `train_batch_size`: 16 - 训练批次大小 +- `learning_rate`: 1e-5 - 学习率 +- `train_epochs`: 2 - 训练轮数 +- `embedding_strategy`: "last_token" - 嵌入策略 +- `pooling_strategy`: "last_token" - 池化策略 + +### 3. config_qwen3_mean_pooling.json + +用于Qwen3模型使用平均池化策略的训练配置。 + +**主要参数:** +- `model_path`: "models/qwen3-0.6b" - 模型路径 +- `model_type`: "qwen" - 模型类型 +- `experiment_id`: "qwen3-0.6b-mean-pooling" - 实验标识 +- `max_seq_length`: 1024 - 最大序列长度 +- `train_batch_size`: 16 - 训练批次大小 +- `learning_rate`: 8e-6 - 学习率 +- `train_epochs`: 2 - 训练轮数 +- `embedding_strategy`: "mean_pooling" - 嵌入策略 +- `pooling_strategy`: "mean" - 池化策略 + +### 4. config.json + +通用训练配置文件,用于Qwen3 4B模型。 + +**主要参数:** +- `model_path`: "models/qwen3-4b" - 模型路径 +- `experiment_id`: "4b+lr.8e-6+bs.16x32+context.1024+2epochs" - 实验标识 +- `max_seq_length`: 1024 - 最大序列长度 +- `train_batch_size`: 16 - 训练批次大小 +- `learning_rate`: 8e-6 - 学习率 +- `train_epochs`: 2 - 训练轮数 + +### 5. accelerate_config.yaml + +Accelerate库的配置文件,用于分布式训练设置。 + +**主要参数:** +- `distributed_type`: "DEEPSPEED" - 分布式训练类型 +- `deepspeed_config`: DeepSpeed配置 + - `zero_stage`: 2 - ZeRO优化阶段 + - `gradient_clipping`: 1.0 - 梯度裁剪 +- `num_processes`: 8 - 进程数 +- `mixed_precision`: "bf16" - 混合精度训练 + +## 使用方法 + +### 训练时指定配置文件 + +```bash +python run.py --config configs/config_bert_base.json +``` + +### 数据预处理时指定配置文件 + +对于新实现的 `tokenize_data_generic.py` 脚本,可以使用类似的配置方式: + +```bash +python tokenize_data_generic.py --config configs/config_generic.json +``` + +## 注意事项 + +1. 不同模型的配置文件中,`max_seq_length`、`train_batch_size` 和 `learning_rate` 等参数会根据模型特性进行调整 +2. `embedding_strategy` 和 `pooling_strategy` 参数根据模型架构和任务需求进行设置 +3. Accelerate配置文件用于控制分布式训练的行为 +4. 实验标识 `experiment_id` 用于区分不同的实验配置,影响输出目录的命名 diff --git "a/F2LLM/docs/\351\234\200\346\261\202\346\226\207\346\241\243.md" "b/F2LLM/docs/\351\234\200\346\261\202\346\226\207\346\241\243.md" new file mode 100644 index 0000000..dea80dc --- /dev/null +++ "b/F2LLM/docs/\351\234\200\346\261\202\346\226\207\346\241\243.md" @@ -0,0 +1,95 @@ +# 多模型数据预处理需求文档 + +## 需求背景 + +随着深度学习模型的快速发展,不同的模型架构(如BERT、LLaMA、Qwen等)在处理文本数据时有不同的要求。为了支持多种模型的训练需求,需要一个灵活的数据预处理工具,能够根据不同的模型类型进行相应的数据处理。 + +## 需求变更 + +### 原始需求 +- 支持单一模型类型的数据预处理 +- 固定的处理参数 + +### 变更后需求 +- 支持多种模型类型(BERT、LLaMA、Qwen等) +- 支持灵活的配置参数 +- 支持串行、并行和GPU加速处理模式 +- 支持不同设备(CPU、CUDA、MPS) +- 支持配置文件方式管理参数 + +## 需求方案 + +### 方案概述 + +实现一个通用的数据预处理脚本 `tokenize_data_generic.py`,支持以下功能: + +1. **多模型支持**: + - BERT类编码器模型 + - LLaMA类解码器模型 + - Qwen类解码器模型 + +2. **多种处理模式**: + - 串行处理(Serial) + - 并行处理(Parallel) + - GPU加速处理(GPU) + - 自动模式(Auto) + +3. **设备支持**: + - CPU处理 + - CUDA GPU处理 + - MPS GPU处理(Apple Silicon) + +4. **配置管理**: + - 支持JSON格式配置文件 + - 命令行参数覆盖配置文件参数 + +### 技术实现 + +#### 核心功能模块 + +1. **数据处理函数**: + - `process_sent`:处理单个句子 + - `process_sent_batch_serial`:串行批处理 + - `process_sent_batch_parallel`:并行批处理 + - `process_sent_batch_gpu`:GPU加速批处理 + +2. **主处理函数**: + - `tokenize_data`:根据配置选择处理模式 + - `tokenize_dataset`:处理整个数据集 + +3. **参数管理**: + - 支持配置文件读取 + - 支持命令行参数覆盖 + +#### 配置文件支持 + +- 使用JSON格式配置文件 +- 支持参数继承和覆盖 +- 配置文件示例: + ```json + { + "model_path": "models/bert-base-uncased", + "model_type": "bert", + "max_seq_length": 512, + "input_dir": "training_data", + "output_dir": "data_tokenized_bert", + "num_processes": 1, + "mode": "auto", + "device": "cpu" + } + ``` + +#### 命令行接口 + +支持通过命令行参数覆盖配置文件中的参数: + +```bash +python tokenize_data_generic.py --config configs/config_generic.json --model_path models/qwen3-0.6b --model_type qwen +``` + +### 预期收益 + +1. **提高开发效率**:统一的预处理工具,减少重复开发 +2. **增强灵活性**:支持多种模型和处理模式 +3. **提升处理性能**:支持并行和GPU加速处理 +4. **简化配置管理**:通过配置文件管理不同场景的参数 diff --git a/F2LLM/model.py b/F2LLM/model.py index d33ade7..1f46042 100644 --- a/F2LLM/model.py +++ b/F2LLM/model.py @@ -1,37 +1,179 @@ import torch from transformers import AutoModel, AutoTokenizer +import torch.nn.functional as F + + +class BaseEmbeddingAdapter: + """基础嵌入适配器""" + + def get_embeddings(self, outputs, batch): + """获取嵌入向量""" + passage_features_all_tokens = outputs.last_hidden_state + bs = batch['bs'] + num_hard_neg = int((len(batch['input_ids']) - 2*bs) / bs) + + return { + 'query_passage_features': torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(bs)]), + 'passage_passage_features': torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(bs, 2*bs)]), + 'negative_passage_features': None if num_hard_neg == 0 else torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(2*bs, len(batch['seq_lens']))]).view(bs, num_hard_neg, -1) + } + + +class BertEmbeddingAdapter(BaseEmbeddingAdapter): + """BERT系列模型的嵌入适配器""" + + def get_embeddings(self, outputs, batch): + """获取BERT模型的嵌入向量,使用[CLS]标记""" + passage_features_all_tokens = outputs.last_hidden_state + bs = batch['bs'] + num_hard_neg = int((len(batch['input_ids']) - 2*bs) / bs) + + # BERT使用[CLS]标记的嵌入 (第0个位置) + return { + 'query_passage_features': passage_features_all_tokens[:bs, 0, :].unsqueeze(1), + 'passage_passage_features': passage_features_all_tokens[bs:2*bs, 0, :].unsqueeze(1), + 'negative_passage_features': None if num_hard_neg == 0 else passage_features_all_tokens[2*bs:, 0, :].view(bs, num_hard_neg, -1) + } + + +class LlamaEmbeddingAdapter(BaseEmbeddingAdapter): + """LLaMA系列模型的嵌入适配器""" + + def get_embeddings(self, outputs, batch): + """获取LLaMA模型的嵌入向量,使用最后一个标记""" + passage_features_all_tokens = outputs.last_hidden_state + bs = batch['bs'] + num_hard_neg = int((len(batch['input_ids']) - 2*bs) / bs) + + # LLaMA使用最后一个标记的嵌入 + return { + 'query_passage_features': torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(bs)]), + 'passage_passage_features': torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(bs, 2*bs)]), + 'negative_passage_features': None if num_hard_neg == 0 else torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(2*bs, len(batch['seq_lens']))]).view(bs, num_hard_neg, -1) + } + + +class MeanPoolingAdapter(BaseEmbeddingAdapter): + """平均池化适配器""" + + def get_embeddings(self, outputs, batch): + """使用平均池化获取嵌入向量""" + passage_features_all_tokens = outputs.last_hidden_state + bs = batch['bs'] + num_hard_neg = int((len(batch['input_ids']) - 2*bs) / bs) + + # 计算平均池化嵌入 + attention_mask = batch['attention_mask'] + input_mask_expanded = attention_mask.unsqueeze(-1).expand(passage_features_all_tokens.size()).float() + embeddings = torch.sum(passage_features_all_tokens * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) + + # 重新组织输出格式 + query_embeddings = embeddings[:bs] + passage_embeddings = embeddings[bs:2*bs] + negative_embeddings = None if num_hard_neg == 0 else embeddings[2*bs:].view(bs, num_hard_neg, -1) + + return { + 'query_passage_features': query_embeddings.unsqueeze(1), + 'passage_passage_features': passage_embeddings.unsqueeze(1), + 'negative_passage_features': negative_embeddings + } class F2LLM: def __init__(self, model_path, max_seq_length=512, - args=None + args=None, + model_type="auto", + embedding_strategy="last_token", + pooling_strategy="cls" ): self.args = args self.dtype = torch.bfloat16 self.device = None # set after accelerator.prepare - self.lm = AutoModel.from_pretrained(model_path, trust_remote_code=True, torch_dtype=self.dtype, attn_implementation='flash_attention_2') + + # Check if CUDA is available and set the attention implementation accordingly + # Only use flash_attention_2 if CUDA is available and flash_attn is installed + attn_implementation = None + if torch.cuda.is_available(): + try: + import flash_attn + attn_implementation = 'flash_attention_2' + except ImportError: + attn_implementation = 'eager' # or 'sdpa' if available + + self.lm = AutoModel.from_pretrained(model_path, trust_remote_code=True, torch_dtype=self.dtype, attn_implementation=attn_implementation) self.lm.config.use_cache = False self.tokenizer = AutoTokenizer.from_pretrained(model_path) self.max_seq_length = max_seq_length + + # 设置模型类型和嵌入策略 + self.model_type = self.lm.config.model_type if model_type == "auto" else model_type + self.embedding_strategy = embedding_strategy + self.pooling_strategy = pooling_strategy + + # 根据模型类型和策略选择适配器 + self.embedding_adapter = self._get_embedding_adapter() + + def _get_embedding_adapter(self): + """根据模型类型和策略返回相应的嵌入适配器""" + # 如果指定了池化策略,使用池化适配器 + if self.pooling_strategy != "cls": + if self.pooling_strategy == "mean": + return MeanPoolingAdapter() + # 可以添加更多池化策略 + + # 根据模型类型选择适配器 + if self.model_type in ['bert', 'roberta', 'distilbert', 'albert', 'electra']: + return BertEmbeddingAdapter() + elif self.model_type in ['llama', 'mistral', 'qwen', 'gemma', 'phi']: + return LlamaEmbeddingAdapter() + else: + # 默认适配器 + return BaseEmbeddingAdapter() def set_device(self): self.device = self.lm.device def forward(self, batch): - bs = batch['bs'] - num_hard_neg = int((len(batch['input_ids']) - 2*bs) / bs) - outputs = self.lm(batch['input_ids'], batch['attention_mask'], ) + + # 使用适配器获取嵌入 + return self.embedding_adapter.get_embeddings(outputs, batch) - passage_features_all_tokens = outputs.last_hidden_state - return { - 'query_passage_features': torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(bs)]), - 'passage_passage_features': torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(bs, 2*bs)]), - 'negative_passage_features': None if num_hard_neg == 0 else torch.stack([passage_features_all_tokens[i, [batch['seq_lens'][i]-1]] for i in range(2*bs, len(batch['seq_lens']))]).view(bs, num_hard_neg, -1) + def encode(self, texts, max_length=None): + """编码文本为嵌入向量""" + if max_length is None: + max_length = self.max_seq_length + + # 对文本进行tokenize + if isinstance(texts, str): + texts = [texts] + + encoded = self.tokenizer(texts, max_length=max_length, padding=True, truncation=True, return_tensors='pt') + + # 如果模型有设备信息,将输入移动到相应设备 + if self.device is not None: + encoded = {k: v.to(self.device) for k, v in encoded.items()} + + # 获取模型输出 + with torch.no_grad(): + outputs = self.lm(**encoded) + + # 构造batch字典用于适配器处理 + batch = { + 'input_ids': encoded['input_ids'], + 'attention_mask': encoded['attention_mask'], + 'seq_lens': encoded['attention_mask'].sum(dim=1).tolist(), + 'bs': len(texts) } + + # 使用适配器获取嵌入 + embeddings_dict = self.embedding_adapter.get_embeddings(outputs, batch) + + # 返回查询嵌入作为默认输出 + return embeddings_dict['query_passage_features'].squeeze(1) diff --git a/F2LLM/run.py b/F2LLM/run.py index e40b707..76ea28f 100644 --- a/F2LLM/run.py +++ b/F2LLM/run.py @@ -38,8 +38,23 @@ def collate_fn(batch_raw): 2*bs+num_hard_neg*(i-1) - 2*bs+num_hard_neg*i-1: hard neg for sample i (i from 1 to bs) ''' num_hard_neg = 1 if batch_raw[0]['dataset_name'] in CLASSIFICATION_DATASETS else args.num_hard_neg - # select args.num_hard_neg hard negatives from a total of 24 - hard_neg_indices = [0] if num_hard_neg == 1 else random.sample(list(range(24)), num_hard_neg) + + # 动态确定实际可用的负样本数量 + available_negatives = 0 + for i in range(1, 25): # 假设最多有24个负样本 + if f'negative_{i}_input_ids' in batch_raw[0]: + available_negatives = i + else: + break + + # 根据实际可用的负样本数量调整采样数量 + actual_num_hard_neg = min(num_hard_neg, available_negatives) + if actual_num_hard_neg <= 0: + actual_num_hard_neg = 1 # 至少需要一个负样本 + + # 选择实际可用的负样本索引 + hard_neg_indices = [0] if actual_num_hard_neg == 1 else random.sample(list(range(available_negatives)), actual_num_hard_neg) + input_ids = _stack( [s['query_input_ids'] for s in batch_raw]+\ [s['passage_input_ids'] for s in batch_raw]+\ @@ -119,8 +134,12 @@ def __iter__(self): override_train_step = True accelerator.print(f"******************************** Training step before prepare: {args.train_steps} ********************************") -model = F2LLM(args.model_path, args.max_seq_length, args=args) -model.lm.gradient_checkpointing_enable() +model = F2LLM(args.model_path, args.max_seq_length, args=args, model_type=args.model_type, embedding_strategy=args.embedding_strategy, pooling_strategy=args.pooling_strategy) + +# Only enable gradient checkpointing if CUDA is available +if torch.cuda.is_available(): + model.lm.gradient_checkpointing_enable() + # set seed again to make sure that different models share the same seed set_seed(0) @@ -134,7 +153,10 @@ def __iter__(self): num_warmup_steps=args.warmup_steps, num_training_steps=args.train_steps) -AcceleratorState().deepspeed_plugin.deepspeed_config['train_micro_batch_size_per_gpu'] = args.train_batch_size +# Check if deepspeed plugin is available before accessing its config +if hasattr(AcceleratorState(), 'deepspeed_plugin') and AcceleratorState().deepspeed_plugin is not None: + AcceleratorState().deepspeed_plugin.deepspeed_config['train_micro_batch_size_per_gpu'] = args.train_batch_size + model.lm, optimizer, lr_scheduler = accelerator.prepare( model.lm, optimizer, lr_scheduler ) @@ -148,6 +170,11 @@ def __iter__(self): args.train_steps = len(train_dataloader) * args.train_epochs accelerator.print(f"******************************** Training step after prepare: {args.train_steps} ********************************") +# Fix: Use the length of the first dataset or a default value if no datasets +train_datasets_dict = dict(train_datasets) +first_dataset_name = next(iter(train_datasets_dict)) if train_datasets_dict else None +dataset = train_datasets_dict[first_dataset_name] if first_dataset_name else None +num_train_samples = len(dataset) if dataset is not None else 0 accelerate_train(args, accelerator, model, train_dataloader, valid_loaders, - optimizer, lr_scheduler, len(dataset)) \ No newline at end of file + optimizer, lr_scheduler, num_train_samples) \ No newline at end of file diff --git a/F2LLM/tokenize_data_bert_improved.py b/F2LLM/tokenize_data_bert_improved.py new file mode 100644 index 0000000..b99be70 --- /dev/null +++ b/F2LLM/tokenize_data_bert_improved.py @@ -0,0 +1,126 @@ +from multiprocessing import Pool +import numpy as np +import pandas as pd +import os +import argparse +from transformers import AutoTokenizer +from tqdm.auto import tqdm + + +def process_sent(sentence, tokenizer, max_seq_length): + """Process a single sentence with the given tokenizer""" + # BERT tokenizer handles special tokens automatically + tokenizer_outputs = tokenizer(sentence, max_length=max_seq_length, truncation=True, padding=False) + return np.array(tokenizer_outputs.input_ids) + + +def process_sent_batch_serial(s, tokenizer, max_seq_length): + """Process a batch of sentences in serial mode""" + return s.apply(lambda x: process_sent(x, tokenizer, max_seq_length)) + + +def process_sent_batch_parallel(s, tokenizer, max_seq_length, num_processes=8): + """Process a batch of sentences in parallel mode""" + def _process_single(sentence): + return process_sent(sentence, tokenizer, max_seq_length) + + def _process_batch(batch_s): + return batch_s.apply(_process_single) + + indices = np.array_split(s.index, min(num_processes, len(s))) + data_split = [s.iloc[idx] for idx in indices if len(idx) > 0] + + if len(data_split) == 0: + return pd.Series([], dtype=object) + + with Pool(min(num_processes, len(data_split))) as pool: + results = pool.map(_process_batch, data_split) + + if results: + return pd.concat(results) + else: + return pd.Series([], dtype=object) + + +def tokenize_data(data, column, tokenizer, max_seq_length, use_parallel=True, num_processes=8): + """Tokenize data with option for serial or parallel processing""" + if use_parallel and num_processes > 1: + return process_sent_batch_parallel(data[column], tokenizer, max_seq_length, num_processes) + else: + return process_sent_batch_serial(data[column], tokenizer, max_seq_length) + + +def main(): + parser = argparse.ArgumentParser(description='Tokenize datasets for BERT models') + parser.add_argument('--model_path', type=str, default='models/bert-base-uncased', help='Path to the BERT model') + parser.add_argument('--input_dir', type=str, default='training_data', help='Input directory with parquet files') + parser.add_argument('--output_dir', type=str, default='data_tokenized_bert', help='Output directory for tokenized data') + parser.add_argument('--max_seq_length', type=int, default=512, help='Maximum sequence length') + parser.add_argument('--num_processes', type=int, default=1, help='Number of processes for parallel processing (0 or 1 for serial)') + parser.add_argument('--mode', type=str, choices=['auto', 'serial', 'parallel'], default='auto', + help='Processing mode: auto (based on num_processes), serial, or parallel') + + args = parser.parse_args() + + # Determine processing mode + if args.mode == 'serial': + use_parallel = False + elif args.mode == 'parallel': + use_parallel = True + else: # auto mode + use_parallel = args.num_processes > 1 + + # Load tokenizer + tokenizer = AutoTokenizer.from_pretrained(args.model_path) + + # Create output directory if it doesn't exist + os.makedirs(args.output_dir, exist_ok=True) + + print(f"Processing mode: {'Parallel' if use_parallel else 'Serial'} with {args.num_processes if use_parallel else 1} processes") + + for ds_name in tqdm(sorted(os.listdir(args.input_dir))): + if not ds_name.endswith('.parquet'): + continue + + print(f"Processing {ds_name}", flush=True) + + df = pd.read_parquet(f"{args.input_dir}/{ds_name}") + + # Tokenize query column + df['query_input_ids'] = tokenize_data( + df, 'query', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + + # Handle passage and negative samples + num_neg = 24 if 'negative_2' in df.keys() else 1 + + ls = df.passage.to_list() + for i in range(1, num_neg+1): + if f'negative_{i}' in df.columns: + ls += df[f'negative_{i}'].to_list() + ls = list(set(ls)) + + df_tmp = pd.DataFrame({'text': ls}) + df_tmp['input_ids'] = tokenize_data( + df_tmp, 'text', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + df_tmp = df_tmp.set_index('text') + + # Map tokenized passages back to original dataframe + df['passage_input_ids'] = df.passage.map(df_tmp.input_ids) + + for i in range(1, num_neg+1): + col_name = f'negative_{i}' + if col_name in df.columns: + new_col_name = f'negative_{i}_input_ids' + df[new_col_name] = df[col_name].map(df_tmp.input_ids) + + # Save tokenized data + output_path = f'{args.output_dir}/{ds_name}' + df.to_parquet(output_path, index=False) + print(f"Saved tokenized data to {output_path}") + + +if __name__ == "__main__": + main() + diff --git a/F2LLM/tokenize_data_generic.py b/F2LLM/tokenize_data_generic.py new file mode 100644 index 0000000..350df43 --- /dev/null +++ b/F2LLM/tokenize_data_generic.py @@ -0,0 +1,268 @@ +from multiprocessing import Pool +import numpy as np +import pandas as pd +import os +import argparse +from transformers import AutoTokenizer +from tqdm.auto import tqdm +import torch +import torch.nn.functional as F + + +def process_sent(sentence, tokenizer, max_seq_length, add_eos_token=False, device='cpu'): + """Process a single sentence with the given tokenizer""" + if add_eos_token: + # For decoder-only models like Qwen, LLaMA, etc. + tokenizer_outputs = tokenizer(sentence, max_length=max_seq_length, truncation=True, add_special_tokens=False) + return np.array(tokenizer_outputs.input_ids + [tokenizer.eos_token_id]) + else: + # For encoder models like BERT, RoBERTa, etc. + tokenizer_outputs = tokenizer(sentence, max_length=max_seq_length, truncation=True, padding=False) + return np.array(tokenizer_outputs.input_ids) + + +def process_sent_batch_serial(s, tokenizer, max_seq_length, add_eos_token=False, device='cpu'): + """Process a batch of sentences in serial mode""" + return s.apply(lambda x: process_sent(x, tokenizer, max_seq_length, add_eos_token, device)) + + +def process_sent_batch_parallel(s, tokenizer, max_seq_length, add_eos_token=False, num_processes=8, device='cpu'): + """Process a batch of sentences in parallel mode""" + def _process_single(sentence): + return process_sent(sentence, tokenizer, max_seq_length, add_eos_token, device) + + def _process_batch(batch_s): + return batch_s.apply(_process_single) + + indices = np.array_split(s.index, min(num_processes, len(s))) + data_split = [s.iloc[idx] for idx in indices if len(idx) > 0] + + if len(data_split) == 0: + return pd.Series([], dtype=object) + + with Pool(min(num_processes, len(data_split))) as pool: + results = pool.map(_process_batch, data_split) + + if results: + return pd.concat(results) + else: + return pd.Series([], dtype=object) + + +def process_sent_batch_gpu(s, tokenizer, max_seq_length, add_eos_token=False, device='cuda'): + """Process a batch of sentences using GPU acceleration""" + try: + # Check if CUDA is available + if not torch.cuda.is_available(): + print("CUDA is not available, falling back to CPU processing") + return process_sent_batch_serial(s, tokenizer, max_seq_length, add_eos_token, 'cpu') + + # Move tokenizer to GPU if possible + if hasattr(tokenizer, 'to'): + tokenizer = tokenizer.to(device) + + # Process in batches to avoid GPU memory issues + batch_size = 1000 # Adjust based on GPU memory + results = [] + + for i in range(0, len(s), batch_size): + batch_sentences = s.iloc[i:i+batch_size].tolist() + + # Tokenize batch + if add_eos_token: + # For decoder-only models + tokenizer_outputs = tokenizer( + batch_sentences, + max_length=max_seq_length, + truncation=True, + add_special_tokens=False, + padding=True, + return_tensors='pt' + ) + + # Add EOS token to each sequence + input_ids = tokenizer_outputs.input_ids.to(device) + eos_tokens = torch.full((input_ids.shape[0], 1), tokenizer.eos_token_id, device=device) + input_ids = torch.cat([input_ids, eos_tokens], dim=1) + + # Convert to list of numpy arrays + for ids in input_ids.cpu().numpy(): + results.append(np.array(ids)) + else: + # For encoder models + tokenizer_outputs = tokenizer( + batch_sentences, + max_length=max_seq_length, + truncation=True, + padding=True, + return_tensors='pt' + ) + + input_ids = tokenizer_outputs.input_ids.to(device) + + # Convert to list of numpy arrays + for ids in input_ids.cpu().numpy(): + # Remove padding tokens + non_pad_tokens = ids[ids != tokenizer.pad_token_id] if tokenizer.pad_token_id is not None else ids + results.append(np.array(non_pad_tokens)) + + return pd.Series(results, index=s.index) + + except Exception as e: + print(f"GPU processing failed: {e}, falling back to CPU processing") + return process_sent_batch_serial(s, tokenizer, max_seq_length, add_eos_token, 'cpu') + + +def tokenize_data(data, column, tokenizer, max_seq_length, add_eos_token=False, use_parallel=True, num_processes=8, use_gpu=False, device='cpu'): + """Tokenize data with option for serial, parallel, or GPU processing""" + if use_gpu: + return process_sent_batch_gpu(data[column], tokenizer, max_seq_length, add_eos_token, device) + elif use_parallel and num_processes > 1: + return process_sent_batch_parallel(data[column], tokenizer, max_seq_length, add_eos_token, num_processes, device) + else: + return process_sent_batch_serial(data[column], tokenizer, max_seq_length, add_eos_token, device) + + +def tokenize_dataset(model_path, max_seq_length, add_eos_token, input_dir, output_dir, use_parallel=True, num_processes=8, use_gpu=False, device='cpu'): + """Tokenize datasets with the specified model""" + # Load tokenizer + tokenizer = AutoTokenizer.from_pretrained(model_path) + + # Add padding token if not present (for LLaMA, Qwen, etc.) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + + # Create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + mode_str = 'GPU' if use_gpu else ('Parallel' if use_parallel else 'Serial') + process_str = f"{num_processes if use_parallel and not use_gpu else 1} processes" if not use_gpu else "GPU acceleration" + print(f"Processing mode: {mode_str} with {process_str}") + + for ds_name in tqdm(sorted(os.listdir(input_dir))): + if not ds_name.endswith('.parquet'): + continue + + print(f"Processing {ds_name}", flush=True) + + df = pd.read_parquet(f"{input_dir}/{ds_name}") + df['query_input_ids'] = tokenize_data( + df, 'query', tokenizer, max_seq_length, add_eos_token, use_parallel, num_processes, use_gpu, device + ) + + num_neg = 24 if 'negative_2' in df.keys() else 1 + + ls = df.passage.to_list() + for i in range(1, num_neg+1): + if f'negative_{i}' in df.columns: + ls += df[f'negative_{i}'].to_list() + ls = list(set(ls)) + df_tmp = pd.DataFrame({'text': ls}) + df_tmp['input_ids'] = tokenize_data( + df_tmp, 'text', tokenizer, max_seq_length, add_eos_token, use_parallel, num_processes, use_gpu, device + ) + df_tmp = df_tmp.set_index('text') + + df['passage_input_ids'] = df.passage.map(df_tmp.input_ids) + + for i in range(1, num_neg+1): + col_name = f'negative_{i}' + if col_name in df.columns: + new_col_name = f'negative_{i}_input_ids' + df[new_col_name] = df[col_name].map(df_tmp.input_ids) + + df.to_parquet(f'{output_dir}/{ds_name}', index=False) + + +def main(): + parser = argparse.ArgumentParser(description='Tokenize datasets for different models') + parser.add_argument('--config', type=str, help='Path to config file') + parser.add_argument('--model_path', type=str, help='Path to the model') + parser.add_argument('--model_type', type=str, choices=['bert', 'llama', 'qwen'], + help='Type of model (affects tokenization strategy)') + parser.add_argument('--max_seq_length', type=int, help='Maximum sequence length') + parser.add_argument('--input_dir', type=str, help='Input directory with parquet files') + parser.add_argument('--output_dir', type=str, help='Output directory for tokenized data') + parser.add_argument('--num_processes', type=int, help='Number of processes for parallel processing (0 or 1 for serial)') + parser.add_argument('--mode', type=str, choices=['auto', 'serial', 'parallel', 'gpu'], + help='Processing mode: auto (based on num_processes), serial, parallel, or gpu') + parser.add_argument('--device', type=str, help='Device to use for processing (cpu, cuda, mps)') + + args = parser.parse_args() + + # Load config file if provided + config = {} + if args.config: + import json + with open(args.config, 'r') as f: + config = json.load(f) + + # Override config with command line arguments + def get_param(param_name, default=None): + arg_value = getattr(args, param_name) + return arg_value if arg_value is not None else config.get(param_name, default) + + model_path = get_param('model_path') + model_type = get_param('model_type') + max_seq_length = get_param('max_seq_length', 512) + input_dir = get_param('input_dir', 'training_data') + output_dir = get_param('output_dir') + num_processes = get_param('num_processes', 1) + mode = get_param('mode', 'auto') + device = get_param('device', 'cpu') + + # Validate required parameters + if not model_path: + raise ValueError("model_path is required") + if not model_type: + raise ValueError("model_type is required") + if not output_dir: + raise ValueError("output_dir is required") + + # Determine processing mode + if mode == 'gpu': + use_gpu = True + use_parallel = False + elif mode == 'serial': + use_gpu = False + use_parallel = False + elif mode == 'parallel': + use_gpu = False + use_parallel = True + else: # auto mode + use_gpu = False + use_parallel = num_processes > 1 + + # Determine if EOS token should be added based on model type + add_eos_token = model_type in ['llama', 'qwen'] + + # Set device + if device == 'cuda' and not torch.cuda.is_available(): + print("CUDA is not available, falling back to CPU") + device = 'cpu' + elif device == 'mps' and not torch.backends.mps.is_available(): + print("MPS is not available, falling back to CPU") + device = 'cpu' + + # If mode is gpu but device is cpu, switch to parallel or serial + if mode == 'gpu' and device == 'cpu': + use_gpu = False + use_parallel = num_processes > 1 + print("Switching from GPU to CPU processing") + + tokenize_dataset( + model_path=model_path, + max_seq_length=max_seq_length, + add_eos_token=add_eos_token, + input_dir=input_dir, + output_dir=output_dir, + use_parallel=use_parallel, + num_processes=num_processes, + use_gpu=use_gpu, + device=device + ) + + +if __name__ == "__main__": + main() + diff --git a/F2LLM/tokenize_data_llama_improved.py b/F2LLM/tokenize_data_llama_improved.py new file mode 100644 index 0000000..218c566 --- /dev/null +++ b/F2LLM/tokenize_data_llama_improved.py @@ -0,0 +1,128 @@ +from multiprocessing import Pool +import numpy as np +import pandas as pd +import os +import argparse +from transformers import AutoTokenizer +from tqdm.auto import tqdm + + +def process_sent(sentence, tokenizer, max_seq_length): + """Process a single sentence with the given tokenizer for LLaMA models""" + # LLaMA tokenizer - similar to Qwen but with different max length + tokenizer_outputs = tokenizer(sentence, max_length=max_seq_length, truncation=True, add_special_tokens=False) + return np.array(tokenizer_outputs.input_ids + [tokenizer.eos_token_id]) + + +def process_sent_batch_serial(s, tokenizer, max_seq_length): + """Process a batch of sentences in serial mode""" + return s.apply(lambda x: process_sent(x, tokenizer, max_seq_length)) + + +def process_sent_batch_parallel(s, tokenizer, max_seq_length, num_processes=8): + """Process a batch of sentences in parallel mode""" + def _process_single(sentence): + return process_sent(sentence, tokenizer, max_seq_length) + + def _process_batch(batch_s): + return batch_s.apply(_process_single) + + indices = np.array_split(s.index, min(num_processes, len(s))) + data_split = [s.iloc[idx] for idx in indices if len(idx) > 0] + + if len(data_split) == 0: + return pd.Series([], dtype=object) + + with Pool(min(num_processes, len(data_split))) as pool: + results = pool.map(_process_batch, data_split) + + if results: + return pd.concat(results) + else: + return pd.Series([], dtype=object) + + +def tokenize_data(data, column, tokenizer, max_seq_length, use_parallel=True, num_processes=8): + """Tokenize data with option for serial or parallel processing""" + if use_parallel and num_processes > 1: + return process_sent_batch_parallel(data[column], tokenizer, max_seq_length, num_processes) + else: + return process_sent_batch_serial(data[column], tokenizer, max_seq_length) + + +def main(): + parser = argparse.ArgumentParser(description='Tokenize datasets for LLaMA models') + parser.add_argument('--model_path', type=str, default='models/llama-3-8b', help='Path to the LLaMA model') + parser.add_argument('--input_dir', type=str, default='training_data', help='Input directory with parquet files') + parser.add_argument('--output_dir', type=str, default='data_tokenized_llama', help='Output directory for tokenized data') + parser.add_argument('--max_seq_length', type=int, default=2048, help='Maximum sequence length') + parser.add_argument('--num_processes', type=int, default=1, help='Number of processes for parallel processing (0 or 1 for serial)') + parser.add_argument('--mode', type=str, choices=['auto', 'serial', 'parallel'], default='auto', + help='Processing mode: auto (based on num_processes), serial, or parallel') + + args = parser.parse_args() + + # Determine processing mode + if args.mode == 'serial': + use_parallel = False + elif args.mode == 'parallel': + use_parallel = True + else: # auto mode + use_parallel = args.num_processes > 1 + + # Load tokenizer + tokenizer = AutoTokenizer.from_pretrained(args.model_path) + # Add padding token if not present + tokenizer.pad_token = tokenizer.eos_token + + # Create output directory if it doesn't exist + os.makedirs(args.output_dir, exist_ok=True) + + print(f"Processing mode: {'Parallel' if use_parallel else 'Serial'} with {args.num_processes if use_parallel else 1} processes") + + for ds_name in tqdm(sorted(os.listdir(args.input_dir))): + if not ds_name.endswith('.parquet'): + continue + + print(f"Processing {ds_name}", flush=True) + + df = pd.read_parquet(f"{args.input_dir}/{ds_name}") + + # Tokenize query column + df['query_input_ids'] = tokenize_data( + df, 'query', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + + # Handle passage and negative samples + num_neg = 24 if 'negative_2' in df.keys() else 1 + + ls = df.passage.to_list() + for i in range(1, num_neg+1): + if f'negative_{i}' in df.columns: + ls += df[f'negative_{i}'].to_list() + ls = list(set(ls)) + + df_tmp = pd.DataFrame({'text': ls}) + df_tmp['input_ids'] = tokenize_data( + df_tmp, 'text', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + df_tmp = df_tmp.set_index('text') + + # Map tokenized passages back to original dataframe + df['passage_input_ids'] = df.passage.map(df_tmp.input_ids) + + for i in range(1, num_neg+1): + col_name = f'negative_{i}' + if col_name in df.columns: + new_col_name = f'negative_{i}_input_ids' + df[new_col_name] = df[col_name].map(df_tmp.input_ids) + + # Save tokenized data + output_path = f'{args.output_dir}/{ds_name}' + df.to_parquet(output_path, index=False) + print(f"Saved tokenized data to {output_path}") + + +if __name__ == "__main__": + main() + diff --git a/F2LLM/tokenize_data_qwen.py b/F2LLM/tokenize_data_qwen.py index 2d9c47e..407cd8e 100644 --- a/F2LLM/tokenize_data_qwen.py +++ b/F2LLM/tokenize_data_qwen.py @@ -2,53 +2,124 @@ import numpy as np import pandas as pd import os +import argparse from transformers import AutoTokenizer from tqdm.auto import tqdm -tokenizer = AutoTokenizer.from_pretrained('models/qwen3-0.6b') -max_seq_length = 1023 - - -def process_sent(sentence): - +def process_sent(sentence, tokenizer, max_seq_length): + """Process a single sentence with the given tokenizer""" # We make sure there's always an eos token at the end of each sequence tokenizer_outputs = tokenizer(sentence, max_length=max_seq_length, truncation=True, add_special_tokens=False) - return np.array(tokenizer_outputs.input_ids + [tokenizer.eos_token_id]) -def process_sent_batch(s): - return s.apply(process_sent) - -def parallelize(data, func, num_of_processes=8): - indices = np.array_split(data.index, num_of_processes) - data_split = [data.iloc[idx] for idx in indices] - with Pool(num_of_processes) as pool: - data = pd.concat(pool.map(func, data_split)) - return data - - -root_dir = 'training_data' -for ds_name in tqdm(sorted(os.listdir(root_dir))): - print(ds_name, flush=True) - - df = pd.read_parquet(f"{root_dir}/{ds_name}") - df['query_input_ids'] = parallelize(df['query'], process_sent_batch, 62) - - num_neg = 24 if 'negative_2' in df.keys() else 1 - - ls = df.passage.to_list() - for i in range(1, num_neg+1): - ls += df[f'negative_{i}'].to_list() - ls = list(set(ls)) - df_tmp = pd.DataFrame({'text': ls}) - df_tmp['input_ids'] = parallelize(df_tmp['text'], process_sent_batch, 62) - df_tmp = df_tmp.set_index('text') - - df['passage_input_ids'] = df.passage.map(df_tmp.input_ids) - - for i in range(1, num_neg+1): - df[f'negative_{i}_input_ids'] = df[f'negative_{i}'].map(df_tmp.input_ids) - - df.to_parquet(f'data_tokenized_qwen/{ds_name}', index=False) +def process_sent_batch_serial(s, tokenizer, max_seq_length): + """Process a batch of sentences in serial mode""" + return s.apply(lambda x: process_sent(x, tokenizer, max_seq_length)) + + +def process_sent_batch_parallel(s, tokenizer, max_seq_length, num_processes=8): + """Process a batch of sentences in parallel mode""" + def _process_single(sentence): + return process_sent(sentence, tokenizer, max_seq_length) + + def _process_batch(batch_s): + return batch_s.apply(_process_single) + + indices = np.array_split(s.index, min(num_processes, len(s))) + data_split = [s.iloc[idx] for idx in indices if len(idx) > 0] + + if len(data_split) == 0: + return pd.Series([], dtype=object) + + with Pool(min(num_processes, len(data_split))) as pool: + results = pool.map(_process_batch, data_split) + + if results: + return pd.concat(results) + else: + return pd.Series([], dtype=object) + + +def tokenize_data(data, column, tokenizer, max_seq_length, use_parallel=True, num_processes=8): + """Tokenize data with option for serial or parallel processing""" + if use_parallel and num_processes > 1: + return process_sent_batch_parallel(data[column], tokenizer, max_seq_length, num_processes) + else: + return process_sent_batch_serial(data[column], tokenizer, max_seq_length) + + +def main(): + parser = argparse.ArgumentParser(description='Tokenize datasets for Qwen models') + parser.add_argument('--model_path', type=str, default='models/qwen3-0.6b', help='Path to the Qwen model') + parser.add_argument('--input_dir', type=str, default='training_data', help='Input directory with parquet files') + parser.add_argument('--output_dir', type=str, default='data_tokenized_qwen', help='Output directory for tokenized data') + parser.add_argument('--max_seq_length', type=int, default=1023, help='Maximum sequence length') + parser.add_argument('--num_processes', type=int, default=1, help='Number of processes for parallel processing (0 or 1 for serial)') + parser.add_argument('--mode', type=str, choices=['auto', 'serial', 'parallel'], default='auto', + help='Processing mode: auto (based on num_processes), serial, or parallel') + + args = parser.parse_args() + + # Determine processing mode + if args.mode == 'serial': + use_parallel = False + elif args.mode == 'parallel': + use_parallel = True + else: # auto mode + use_parallel = args.num_processes > 1 + + # Load tokenizer + tokenizer = AutoTokenizer.from_pretrained(args.model_path) + + # Create output directory if it doesn't exist + os.makedirs(args.output_dir, exist_ok=True) + + print(f"Processing mode: {'Parallel' if use_parallel else 'Serial'} with {args.num_processes if use_parallel else 1} processes") + + for ds_name in tqdm(sorted(os.listdir(args.input_dir))): + if not ds_name.endswith('.parquet'): + continue + + print(f"Processing {ds_name}", flush=True) + + df = pd.read_parquet(f"{args.input_dir}/{ds_name}") + + # Tokenize query column + df['query_input_ids'] = tokenize_data( + df, 'query', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + + # Handle passage and negative samples + num_neg = 24 if 'negative_2' in df.keys() else 1 + + ls = df.passage.to_list() + for i in range(1, num_neg+1): + if f'negative_{i}' in df.columns: + ls += df[f'negative_{i}'].to_list() + ls = list(set(ls)) + + df_tmp = pd.DataFrame({'text': ls}) + df_tmp['input_ids'] = tokenize_data( + df_tmp, 'text', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + df_tmp = df_tmp.set_index('text') + + # Map tokenized passages back to original dataframe + df['passage_input_ids'] = df.passage.map(df_tmp.input_ids) + + for i in range(1, num_neg+1): + col_name = f'negative_{i}' + if col_name in df.columns: + new_col_name = f'negative_{i}_input_ids' + df[new_col_name] = df[col_name].map(df_tmp.input_ids) + + # Save tokenized data + output_path = f'{args.output_dir}/{ds_name}' + df.to_parquet(output_path, index=False) + print(f"Saved tokenized data to {output_path}") + + +if __name__ == "__main__": + main() diff --git a/F2LLM/tokenize_data_qwen_improved.py b/F2LLM/tokenize_data_qwen_improved.py new file mode 100644 index 0000000..39c650e --- /dev/null +++ b/F2LLM/tokenize_data_qwen_improved.py @@ -0,0 +1,126 @@ +from multiprocessing import Pool +import numpy as np +import pandas as pd +import os +import argparse +from transformers import AutoTokenizer +from tqdm.auto import tqdm + + +def process_sent(sentence, tokenizer, max_seq_length): + """Process a single sentence with the given tokenizer for Qwen models""" + # We make sure there's always an eos token at the end of each sequence + tokenizer_outputs = tokenizer(sentence, max_length=max_seq_length, truncation=True, add_special_tokens=False) + return np.array(tokenizer_outputs.input_ids + [tokenizer.eos_token_id]) + + +def process_sent_batch_serial(s, tokenizer, max_seq_length): + """Process a batch of sentences in serial mode""" + return s.apply(lambda x: process_sent(x, tokenizer, max_seq_length)) + + +def process_sent_batch_parallel(s, tokenizer, max_seq_length, num_processes=8): + """Process a batch of sentences in parallel mode""" + def _process_single(sentence): + return process_sent(sentence, tokenizer, max_seq_length) + + def _process_batch(batch_s): + return batch_s.apply(_process_single) + + indices = np.array_split(s.index, min(num_processes, len(s))) + data_split = [s.iloc[idx] for idx in indices if len(idx) > 0] + + if len(data_split) == 0: + return pd.Series([], dtype=object) + + with Pool(min(num_processes, len(data_split))) as pool: + results = pool.map(_process_batch, data_split) + + if results: + return pd.concat(results) + else: + return pd.Series([], dtype=object) + + +def tokenize_data(data, column, tokenizer, max_seq_length, use_parallel=True, num_processes=8): + """Tokenize data with option for serial or parallel processing""" + if use_parallel and num_processes > 1: + return process_sent_batch_parallel(data[column], tokenizer, max_seq_length, num_processes) + else: + return process_sent_batch_serial(data[column], tokenizer, max_seq_length) + + +def main(): + parser = argparse.ArgumentParser(description='Tokenize datasets for Qwen models') + parser.add_argument('--model_path', type=str, default='models/qwen3-0.6b', help='Path to the Qwen model') + parser.add_argument('--input_dir', type=str, default='training_data', help='Input directory with parquet files') + parser.add_argument('--output_dir', type=str, default='data_tokenized_qwen', help='Output directory for tokenized data') + parser.add_argument('--max_seq_length', type=int, default=1023, help='Maximum sequence length') + parser.add_argument('--num_processes', type=int, default=1, help='Number of processes for parallel processing (0 or 1 for serial)') + parser.add_argument('--mode', type=str, choices=['auto', 'serial', 'parallel'], default='auto', + help='Processing mode: auto (based on num_processes), serial, or parallel') + + args = parser.parse_args() + + # Determine processing mode + if args.mode == 'serial': + use_parallel = False + elif args.mode == 'parallel': + use_parallel = True + else: # auto mode + use_parallel = args.num_processes > 1 + + # Load tokenizer + tokenizer = AutoTokenizer.from_pretrained(args.model_path) + + # Create output directory if it doesn't exist + os.makedirs(args.output_dir, exist_ok=True) + + print(f"Processing mode: {'Parallel' if use_parallel else 'Serial'} with {args.num_processes if use_parallel else 1} processes") + + for ds_name in tqdm(sorted(os.listdir(args.input_dir))): + if not ds_name.endswith('.parquet'): + continue + + print(f"Processing {ds_name}", flush=True) + + df = pd.read_parquet(f"{args.input_dir}/{ds_name}") + + # Tokenize query column + df['query_input_ids'] = tokenize_data( + df, 'query', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + + # Handle passage and negative samples + num_neg = 24 if 'negative_2' in df.keys() else 1 + + ls = df.passage.to_list() + for i in range(1, num_neg+1): + if f'negative_{i}' in df.columns: + ls += df[f'negative_{i}'].to_list() + ls = list(set(ls)) + + df_tmp = pd.DataFrame({'text': ls}) + df_tmp['input_ids'] = tokenize_data( + df_tmp, 'text', tokenizer, args.max_seq_length, use_parallel, args.num_processes + ) + df_tmp = df_tmp.set_index('text') + + # Map tokenized passages back to original dataframe + df['passage_input_ids'] = df.passage.map(df_tmp.input_ids) + + for i in range(1, num_neg+1): + col_name = f'negative_{i}' + if col_name in df.columns: + new_col_name = f'negative_{i}_input_ids' + df[new_col_name] = df[col_name].map(df_tmp.input_ids) + + # Save tokenized data + output_path = f'{args.output_dir}/{ds_name}' + df.to_parquet(output_path, index=False) + print(f"Saved tokenized data to {output_path}") + + +if __name__ == "__main__": + main() + diff --git a/F2LLM/utils.py b/F2LLM/utils.py index b167d3c..fa37a8d 100644 --- a/F2LLM/utils.py +++ b/F2LLM/utils.py @@ -65,7 +65,7 @@ def hard_loss( ): if hard_neg_embeddings is None: - return 0.0 + return torch.tensor(0.0, device=query_embeddings.device) bs = query_embeddings.size(0) a_norm = F.normalize(query_embeddings, p=2, dim=-1) @@ -90,18 +90,51 @@ def validate(args, accelerator, model, valid_loader_dict, criterion, completed_s for batch in valid_dataloader: with torch.no_grad(): outputs = model.forward(batch) - loss_hard = hard_loss(outputs['query_passage_features'].squeeze(1), outputs['passage_passage_features'].squeeze(1), outputs['negative_passage_features'], criterion, accelerator) - loss_hard_ls.append(accelerator.gather(loss_hard).float()) + + query_emb = outputs['query_passage_features'].squeeze(1) + passage_emb = outputs['passage_passage_features'].squeeze(1) + neg_emb = outputs['negative_passage_features'] + + loss_hard = hard_loss(query_emb, passage_emb, neg_emb, criterion, accelerator) + if accelerator.num_processes > 1: + # When using multiple processes, we need to gather the loss from all processes + gathered_loss_hard = accelerator.gather(loss_hard).float() + # Ensure gathered_loss_hard is at least 1D + if gathered_loss_hard.dim() == 0: + # Scalar tensor, convert to 1D tensor with one element + gathered_loss_hard = gathered_loss_hard.unsqueeze(0) + loss_hard_ls.append(gathered_loss_hard) + else: + # When using a single process, just append the loss directly + # Ensure loss_hard is at least 1D + if loss_hard.dim() == 0: + loss_hard = loss_hard.unsqueeze(0) + loss_hard_ls.append(loss_hard.float()) if dataset_name in RETRIEVAL_DATASETS: - loss = inbatch_loss(outputs['query_passage_features'].squeeze(1), outputs['passage_passage_features'].squeeze(1), criterion, accelerator) - loss_ls.append(accelerator.gather(loss).float()) + loss = inbatch_loss(query_emb, passage_emb, criterion, accelerator) + if accelerator.num_processes > 1: + # When using multiple processes, we need to gather the loss from all processes + gathered_loss = accelerator.gather(loss).float() + # Ensure gathered_loss is at least 1D + if gathered_loss.dim() == 0: + # Scalar tensor, convert to 1D tensor with one element + gathered_loss = gathered_loss.unsqueeze(0) + loss_ls.append(gathered_loss) + else: + # When using a single process, just append the loss directly + # Ensure loss is at least 1D + if loss.dim() == 0: + loss = loss.unsqueeze(0) + loss_ls.append(loss.float()) accelerator.wait_for_everyone() - loss_hard_ls = torch.cat(loss_hard_ls) - eval_log_dict[f'{dataset_name}/valid_loss_hard'] = loss_hard_ls.mean() + if loss_hard_ls: # Check if the list is not empty + loss_hard_ls = torch.cat(loss_hard_ls) + eval_log_dict[f'{dataset_name}/valid_loss_hard'] = loss_hard_ls.mean() if dataset_name in RETRIEVAL_DATASETS: - loss_ls = torch.cat(loss_ls) - eval_log_dict[f"{dataset_name}/valid_loss_in_batch"] = loss_ls.mean() + if loss_ls: # Check if the list is not empty + loss_ls = torch.cat(loss_ls) + eval_log_dict[f"{dataset_name}/valid_loss_in_batch"] = loss_ls.mean() eval_log_dict['Avg/retrieval/valid_loss_in_batch'] = torch.tensor([v for k, v in eval_log_dict.items() if k.split('/')[0] in RETRIEVAL_DATASETS and k.endswith('valid_loss_in_batch')]).mean() eval_log_dict['Avg/retrieval/valid_loss_hard'] = torch.tensor([v for k, v in eval_log_dict.items() if k.split('/')[0] in RETRIEVAL_DATASETS and k.endswith('valid_loss_hard')]).mean() From 10740c3cc1cc8baa234b3bb031118c199f0cab7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=AF=E8=BF=9B?= Date: Fri, 28 Nov 2025 18:22:39 +0800 Subject: [PATCH 2/2] model-support --- ...5\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" | 1 + 1 file changed, 1 insertion(+) diff --git "a/F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" "b/F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" index 3294ec9..0f7bc28 100644 --- "a/F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" +++ "b/F2LLM/docs/\344\270\215\345\220\214\346\250\241\345\236\213\344\275\277\347\224\250\346\226\207\346\241\243.md" @@ -178,3 +178,4 @@ python tokenize_data_generic.py --config configs/config_generic.json --device mp - 选择支持长序列的模型(如LLaMA、Qwen) - 调整`max_seq_length`参数 - 考虑使用滑动窗口等策略处理超长文本 +-