diff --git a/configs/sampler/uniform_split_sampler.yaml b/configs/sampler/uniform_split_sampler.yaml new file mode 100644 index 00000000..ad622d2b --- /dev/null +++ b/configs/sampler/uniform_split_sampler.yaml @@ -0,0 +1,8 @@ +input_dir: /home/abbas-khan/ml_filter/src/ml_filter/sampling/data/ +output_dir: /home/abbas-khan/ml_filter/src/ml_filter/sampling/processed_data_uniform_split +validation_fraction: 0.10 +score_column: score +random_seed: 42 +max_upsample_factor: 10.0 +# Set per_label_target to force each score label to aim for this many samples per language file (before split) +per_label_target: 32800 diff --git a/src/ml_filter/__main__.py b/src/ml_filter/__main__.py index 5e357d7b..e7790332 100644 --- a/src/ml_filter/__main__.py +++ b/src/ml_filter/__main__.py @@ -12,12 +12,13 @@ from ml_filter.analysis.collect_ir_metrics import collect_ir_metrics from ml_filter.analysis.evaluate_predicted_annotations import evaluate_predicted_annotations from ml_filter.analysis.plot_score_distributions import plot_differences_in_scores, plot_scores -from ml_filter.annotation.embedding_pipeline import run_embedding_pipeline from ml_filter.annotation.annotation_pipeline import run_annotation_pipeline +from ml_filter.annotation.embedding_pipeline import run_embedding_pipeline from ml_filter.compare_experiments import compare_experiments from ml_filter.data_processing.deduplication import deduplicate_jsonl from ml_filter.llm_client import LLMClient from ml_filter.sample_from_hf_dataset import sample_from_hf_dataset, upload_file_to_hf +from ml_filter.sampling.uniform_split_sampler import UniformSplitSampler from ml_filter.training.embedding_training_pipeline import run_embedding_head_training_pipeline from ml_filter.translate import TranslationServiceType, TranslatorFactory from ml_filter.utils.chunk_data import chunk_jsonl @@ -26,6 +27,7 @@ from ml_filter.utils.manipulate_documents import merge_and_sort_jsonl_files from ml_filter.utils.manipulate_prompt import add_target_language_to_prompt from ml_filter.utils.statistics import compute_num_words_and_chars_in_jsonl, run_word_count_jsonl_files +from ml_filter.utils.uniform_split_sampler_utils import load_sampler_config input_file_path_option = click.option( "--input_file_path", @@ -170,6 +172,19 @@ def entry_point_compare_experiments(config_file_path: Path): compare_experiments(config_file_path) +@main.command(name="uniform_split_sampler") +@click.option( + "--config_file_path", + type=click_pathlib.Path(exists=True), + required=True, + help="Path to the YAML config file for the uniform split sampler.", +) +def entry_point_uniform_split_sampler(config_file_path: Path): + config = load_sampler_config(config_file_path) + sampler = UniformSplitSampler(**config) + sampler.process_all_files() + + @main.command(name="chunk_jsonl") @click.option( "--input_file_path", @@ -371,7 +386,7 @@ def aggregate_human_annotations_cli( "--min_metrics", type=str, help="Comma-separated list of metrics for which lower is better." - + "All other metrics are considered to be better when higher.", + + "All other metrics are considered to be better when higher.", ) @click.option( "--report_metrics", @@ -752,9 +767,7 @@ def entry_run_embedding_pipeline(config_file_path: Path): ) def entry_run_annotations(config_file_path: Path): """Run annotation pipeline using precomputed embeddings from HDF5.""" - run_annotation_pipeline( - config_file_path=config_file_path - ) + run_annotation_pipeline(config_file_path=config_file_path) def _get_translator_helper(translation_service: str, ignore_tag_text: Optional[str] = None): diff --git a/src/ml_filter/sampling/uniform_split_sampler.py b/src/ml_filter/sampling/uniform_split_sampler.py new file mode 100644 index 00000000..cac57b76 --- /dev/null +++ b/src/ml_filter/sampling/uniform_split_sampler.py @@ -0,0 +1,185 @@ +"""Uniform split sampler: split by label first, then oversample within each split.""" + +import logging +from pathlib import Path +from typing import Dict, Tuple + +import numpy as np +import pandas as pd + +from ml_filter.utils.uniform_split_sampler_utils import ( + extract_score_value, + log_distribution, + per_label_targets, + sample_with_cap, + save_dataset, + split_label_pools, +) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) +__all__ = ["UniformSplitSampler"] + + +class UniformSplitSampler: + def __init__( + self, + input_dir: str, + output_dir: str, + validation_fraction: float = 0.10, + score_column: str = "score", + random_seed: int = 42, + max_upsample_factor: float = 10.0, + per_label_target: int | None = None, + ): + self.input_dir = Path(input_dir) + self.output_dir = Path(output_dir) + self.validation_fraction = validation_fraction + self.score_column = score_column + self.random_seed = random_seed + self.max_upsample_factor = max_upsample_factor + self.per_label_target = per_label_target + + self.train_dir = self.output_dir / "training_set" + self.val_dir = self.output_dir / "validation_set" + self.train_dir.mkdir(parents=True, exist_ok=True) + self.val_dir.mkdir(parents=True, exist_ok=True) + + np.random.seed(self.random_seed) + + def process_all_files(self) -> None: + jsonl_files = sorted(self.input_dir.glob("*.jsonl")) + if not jsonl_files: + logger.error("No JSONL files found in %s", self.input_dir) + return + + datasets: Dict[Path, pd.DataFrame] = {} + for path in jsonl_files: + df = self._load_file(path) + if not df.empty: + datasets[path] = df + + if not datasets: + logger.error("No valid datasets to process.") + return + + for path, df in datasets.items(): + dataset_name = path.name.replace(".jsonl", "") + logger.info("\nProcessing %s with %d available rows", dataset_name, len(df)) + + target_size = len(df) + train_df, val_df, train_target_total, val_target_total = self._build_splits(df, target_size) + + save_dataset( + train_df, + self.train_dir / f"{dataset_name}_train.jsonl", + score_column=self.score_column, + log=logger, + ) + save_dataset( + val_df, + self.val_dir / f"{dataset_name}_val.jsonl", + score_column=self.score_column, + log=logger, + ) + + log_distribution(train_df, self.score_column, "Training", train_target_total, logger) + log_distribution(val_df, self.score_column, "Validation", val_target_total, logger) + + logger.info("\nAll files processed. Output written to %s", self.output_dir) + + def _load_file(self, file_path: Path) -> pd.DataFrame: + try: + df = pd.read_json(file_path, lines=True) + except ValueError as exc: + logger.error("Failed to read %s: %s", file_path, exc) + return pd.DataFrame() + + if self.score_column not in df.columns: + logger.error("File %s missing required column '%s'", file_path, self.score_column) + return pd.DataFrame() + + df[self.score_column] = df[self.score_column].apply(extract_score_value) + df[self.score_column] = pd.to_numeric(df[self.score_column], errors="coerce") + df = df.dropna(subset=[self.score_column]) + + df = df[df[self.score_column].apply(lambda x: int(x) == float(x))] + + logger.info("Loaded %d valid rows from %s", len(df), file_path.name) + return df + + def _build_splits(self, df: pd.DataFrame, target_size: int) -> Tuple[pd.DataFrame, pd.DataFrame, int, int]: + unique_scores = sorted(df[self.score_column].unique()) + if not unique_scores: + empty = df.head(0).copy() + return empty, empty, 0, 0 + + per_label_total_target = ( + float(self.per_label_target) if self.per_label_target is not None else (target_size / len(unique_scores)) + ) + + train_target_total = int(per_label_total_target * (1 - self.validation_fraction) * len(unique_scores)) + val_target_total = int(per_label_total_target * self.validation_fraction * len(unique_scores)) + + train_targets = per_label_targets(unique_scores, train_target_total) + val_targets = per_label_targets(unique_scores, val_target_total) + + train_pools, val_pools = split_label_pools( + df, + unique_scores, + score_column=self.score_column, + validation_fraction=self.validation_fraction, + random_seed=self.random_seed, + ) + + train_samples = [] + val_samples = [] + + for score in unique_scores: + train_pool = train_pools.get(score, df.head(0).copy()) + val_pool = val_pools.get(score, df.head(0).copy()) + + logger.info( + "Score %.1f → train pool %d rows, val pool %d rows (targets: train %d, val %d)", + score, + len(train_pool), + len(val_pool), + train_targets.get(score, 0), + val_targets.get(score, 0), + ) + + train_sample = sample_with_cap( + train_pool, + train_targets.get(score, 0), + score, + "train", + seed_offset=0, + random_seed=self.random_seed, + max_upsample_factor=self.max_upsample_factor, + log=logger, + ) + val_sample = sample_with_cap( + val_pool, + val_targets.get(score, 0), + score, + "validation", + seed_offset=10_000, + random_seed=self.random_seed, + max_upsample_factor=self.max_upsample_factor, + log=logger, + ) + + if not train_sample.empty: + train_samples.append(train_sample) + if not val_sample.empty: + val_samples.append(val_sample) + + train_df = pd.concat(train_samples, ignore_index=True) if train_samples else df.head(0).copy() + val_df = pd.concat(val_samples, ignore_index=True) if val_samples else df.head(0).copy() + + if not train_df.empty: + train_df = train_df.sample(frac=1, random_state=self.random_seed).reset_index(drop=True) + if not val_df.empty: + val_df = val_df.sample(frac=1, random_state=self.random_seed + 1).reset_index(drop=True) + + return train_df, val_df, train_target_total, val_target_total diff --git a/src/ml_filter/utils/uniform_split_sampler_utils.py b/src/ml_filter/utils/uniform_split_sampler_utils.py new file mode 100644 index 00000000..0421bf59 --- /dev/null +++ b/src/ml_filter/utils/uniform_split_sampler_utils.py @@ -0,0 +1,153 @@ +import logging +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import numpy as np +import pandas as pd +from omegaconf import OmegaConf + +logger = logging.getLogger(__name__) + + +def load_sampler_config(config_path: Path) -> Dict[str, Any]: + if not config_path.exists(): + raise FileNotFoundError(f"Sampler config file not found: {config_path}") + + logger.info("Loading sampler configuration from %s", config_path) + config_container = OmegaConf.load(config_path) + config = OmegaConf.to_container(config_container, resolve=True) + if not isinstance(config, dict): + raise ValueError(f"Sampler configuration must be a mapping, but received {type(config_container)}") + return config + + +def extract_score_value(value: Any) -> float | int | np.floating | np.integer | None: + """Extract a scalar score from lists/arrays when present.""" + + if isinstance(value, (list, tuple, np.ndarray)): + return value[0] if len(value) > 0 else np.nan + return value + + +def split_label_pools( + df: pd.DataFrame, + unique_scores: List[float], + score_column: str, + validation_fraction: float, + random_seed: int, +) -> Tuple[Dict[float, pd.DataFrame], Dict[float, pd.DataFrame]]: + train_pools: Dict[float, pd.DataFrame] = {} + val_pools: Dict[float, pd.DataFrame] = {} + + grouped = df.groupby(score_column) + for score in unique_scores: + label_data = grouped.get_group(score) if score in grouped.groups else df.head(0).copy() + if label_data.empty: + base = df.head(0).copy() + train_pools[score] = base.copy() + val_pools[score] = base.copy() + continue + + shuffled = label_data.sample(frac=1, replace=False, random_state=random_seed + int(score)).reset_index( + drop=True + ) + n_total = len(shuffled) + n_train_rows = int(np.floor(n_total * (1 - validation_fraction))) + n_val_rows = n_total - n_train_rows + + train_pools[score] = shuffled.iloc[:n_train_rows] + val_pools[score] = shuffled.iloc[n_train_rows : n_train_rows + n_val_rows] + + return train_pools, val_pools + + +def sample_with_cap( + pool: pd.DataFrame, + target: int, + score: float, + split_name: str, + seed_offset: int, + random_seed: int, + max_upsample_factor: float, + log: logging.Logger | None = None, +) -> pd.DataFrame: + if max_upsample_factor < 1: + raise ValueError("max_upsample_factor must be >= 1") + + if pool.empty or target <= 0: + return pool.head(0).copy() + + max_allowed = int(len(pool) * max_upsample_factor) + + effective_target = min(target, max_allowed) + + replace = len(pool) < effective_target + sample = pool.sample( + n=effective_target, + replace=replace, + random_state=random_seed + seed_offset + int(score), + ) + if replace: + factor = effective_target / len(pool) + if log: + log.info( + " Oversampling %s %.1f: %d → %d (%.1fx)", + split_name, + score, + len(pool), + effective_target, + factor, + ) + return sample + + +def per_label_targets(scores: List[float], total_target: int) -> Dict[float, int]: + if not scores or total_target <= 0: + return {score: 0 for score in scores} + + base = total_target // len(scores) + remainder = total_target - base * len(scores) + + targets: Dict[float, int] = {} + for idx, score in enumerate(scores): + targets[score] = base + (1 if idx < remainder else 0) + return targets + + +def save_dataset(df: pd.DataFrame, path: Path, score_column: str, log: logging.Logger | None = None) -> None: + df_to_write = df.copy() + if not df_to_write.empty: + df_to_write[score_column] = df_to_write[score_column].apply(lambda x: [x]) + df_to_write.to_json(path, orient="records", lines=True) + if log: + log.info("Wrote %d rows to %s", len(df), path) + + +def log_distribution( + df: pd.DataFrame, + score_column: str, + label: str, + target_total: float | None = None, + log: logging.Logger | None = None, +) -> None: + log_ref = log or logger + if df.empty: + log_ref.info("%s: 0 samples", label) + return + + counts = df[score_column].value_counts().sort_index() + total = len(df) + log_ref.info("%s: %d samples", label, total) + for score, count in counts.items(): + split_pct = (count / total) * 100 if total > 0 else 0 + if target_total and target_total > 0: + target_pct = (count / target_total) * 100 + log_ref.info( + " Score %.1f: %d (%.2f%% of split, %.2f%% of target)", + score, + count, + split_pct, + target_pct, + ) + else: + log_ref.info(" Score %.1f: %d (%.2f%%)", score, count, split_pct) diff --git a/tests/resources/data/lorem_ipsum_sampling.jsonl b/tests/resources/data/lorem_ipsum_sampling.jsonl new file mode 100644 index 00000000..a01a15d9 --- /dev/null +++ b/tests/resources/data/lorem_ipsum_sampling.jsonl @@ -0,0 +1,15 @@ +{"id": "fixture-0-0", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-1", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-2", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-3", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-4", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-1-0", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-1", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-2", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-3", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-4", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-2-0", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-1", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-2", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-3", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-4", "score": 2.0, "aggregation_type": "majority"} diff --git a/tests/test_uniform_split_sampler.py b/tests/test_uniform_split_sampler.py new file mode 100644 index 00000000..68795a91 --- /dev/null +++ b/tests/test_uniform_split_sampler.py @@ -0,0 +1,143 @@ +import json +import shutil +from collections import Counter +from pathlib import Path + +from ml_filter.sampling.uniform_split_sampler import UniformSplitSampler + +TESTS_ROOT = Path(__file__).resolve().parent +FIXTURE_INPUT_NAME = "lorem_ipsum_sampling.jsonl" +FIXTURE_INPUT_PATH = TESTS_ROOT / "resources" / "data" / FIXTURE_INPUT_NAME +TARGET_INPUT_FILENAME = "lorem_ipsum_sampling.jsonl" + + +def _write_jsonl(path: Path, rows: list[dict]): + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as handle: + for row in rows: + handle.write(json.dumps(row) + "\n") + + +def _read_jsonl(path: Path) -> list[dict]: + with path.open("r", encoding="utf-8") as handle: + return [json.loads(line) for line in handle if line.strip()] + + +def test_uniform_split_sampler_uses_fixture_distribution(tmp_path): + input_dir = tmp_path / "input_data" + input_dir.mkdir() + output_dir = tmp_path / "output_data" + + fixture_target = input_dir / TARGET_INPUT_FILENAME + shutil.copy(FIXTURE_INPUT_PATH, fixture_target) + + sampler = UniformSplitSampler( + input_dir=str(input_dir), + output_dir=str(output_dir), + validation_fraction=0.2, + score_column="score", + random_seed=123, + max_upsample_factor=1.0, + ) + sampler.process_all_files() + + train_file = output_dir / "training_set" / f"{TARGET_INPUT_FILENAME.replace('.jsonl', '')}_train.jsonl" + val_file = output_dir / "validation_set" / f"{TARGET_INPUT_FILENAME.replace('.jsonl', '')}_val.jsonl" + + assert train_file.exists() + assert val_file.exists() + + train_records = _read_jsonl(train_file) + val_records = _read_jsonl(val_file) + + assert len(train_records) == 12 + assert len(val_records) == 3 + + train_counts = Counter(int(row["score"][0]) for row in train_records) + val_counts = Counter(int(row["score"][0]) for row in val_records) + + assert train_counts == {0: 4, 1: 4, 2: 4} + assert val_counts == {0: 1, 1: 1, 2: 1} + + assert all(isinstance(row["score"], list) and len(row["score"]) == 1 for row in train_records + val_records) + + +def test_uniform_split_sampler_honors_per_label_target(tmp_path): + input_dir = tmp_path / "input_per_label" + output_dir = tmp_path / "output_per_label" + + records = [] + for score in range(3): + for idx in range(5): + records.append( + { + "id": f"balanced-{score}-{idx}", + "score": float(score), + "aggregation_type": "majority", + } + ) + + input_file = input_dir / "balanced_dataset.jsonl" + _write_jsonl(input_file, records) + + sampler = UniformSplitSampler( + input_dir=str(input_dir), + output_dir=str(output_dir), + validation_fraction=0.0, + score_column="score", + random_seed=11, + max_upsample_factor=10.0, + per_label_target=2, + ) + sampler.process_all_files() + + train_file = output_dir / "training_set" / "balanced_dataset_train.jsonl" + train_records = _read_jsonl(train_file) + + train_counts = Counter(int(row["score"][0]) for row in train_records) + assert train_counts == {0: 2, 1: 2, 2: 2} + + +def test_uniform_split_sampler_respects_max_upsample_factor(tmp_path): + input_dir = tmp_path / "input_sparse" + output_dir = tmp_path / "output_sparse" + + source_counts = {0: 1, 1: 2, 2: 3} + sparse_records = [] + for score, count in source_counts.items(): + for idx in range(count): + sparse_records.append( + { + "id": f"sparse-{score}-{idx}", + "score": float(score), + "aggregation_type": "majority", + } + ) + + input_file = input_dir / "sparse_dataset.jsonl" + _write_jsonl(input_file, sparse_records) + + sampler = UniformSplitSampler( + input_dir=str(input_dir), + output_dir=str(output_dir), + validation_fraction=0.0, + score_column="score", + random_seed=7, + max_upsample_factor=2.0, + per_label_target=500, + ) + sampler.process_all_files() + + train_file = output_dir / "training_set" / "sparse_dataset_train.jsonl" + val_file = output_dir / "validation_set" / "sparse_dataset_val.jsonl" + + train_records = _read_jsonl(train_file) + val_records = _read_jsonl(val_file) + + assert not val_records # validation_fraction=0.0, so no validation records expected + + train_counts = Counter(int(row["score"][0]) for row in train_records) + assert train_counts == {0: 2, 1: 4, 2: 6} + + for score, available in source_counts.items(): + assert train_counts[score] <= available * sampler.max_upsample_factor