From 2b75b0a6f0f078002c743eddf20acf27dea0f933 Mon Sep 17 00:00:00 2001 From: Abbas Khan Date: Wed, 10 Dec 2025 14:15:23 +0100 Subject: [PATCH 1/4] feat: implement uniform split sampler with capped oversampling and configuration support --- configs/sampler/uniform_split_sampler.yaml | 8 + .../sampling/uniform_split_sampler.py | 207 ++++++++++++++++++ .../utils/uniform_split_sampler_utils.py | 149 +++++++++++++ 3 files changed, 364 insertions(+) create mode 100644 configs/sampler/uniform_split_sampler.yaml create mode 100644 src/ml_filter/sampling/uniform_split_sampler.py create mode 100644 src/ml_filter/utils/uniform_split_sampler_utils.py diff --git a/configs/sampler/uniform_split_sampler.yaml b/configs/sampler/uniform_split_sampler.yaml new file mode 100644 index 00000000..97329bf4 --- /dev/null +++ b/configs/sampler/uniform_split_sampler.yaml @@ -0,0 +1,8 @@ +input_dir: /home/abbas-khan/ml_filter/src/ml_filter/sampling/data/ +output_dir: /home/abbas-khan/ml_filter/src/ml_filter/sampling/processed_data_uniform_split +validation_fraction: 0.10 +score_column: score +random_seed: 42 +max_oversampling_ratio: 10.0 +# Set per_label_target to force each score label to aim for this many samples per language file (before split) +per_label_target: 32800 diff --git a/src/ml_filter/sampling/uniform_split_sampler.py b/src/ml_filter/sampling/uniform_split_sampler.py new file mode 100644 index 00000000..ae62c202 --- /dev/null +++ b/src/ml_filter/sampling/uniform_split_sampler.py @@ -0,0 +1,207 @@ +"""Uniform split sampler: split by label first, then oversample within each split.""" + +import argparse +import logging +from pathlib import Path +from typing import List, Tuple + +import numpy as np +import pandas as pd + +from ml_filter.utils.uniform_split_sampler_utils import ( + load_sampler_config, + log_distribution, + normalize_score_value, + per_label_targets, + sample_with_cap, + save_dataset, + split_label_pools, +) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) +DEFAULT_CONFIG_PATH = Path(__file__).resolve().parents[3] / "configs" / "sampler" / "uniform_split_sampler.yaml" + + +class UniformSplitSampler: + def __init__( + self, + input_dir: str, + output_dir: str, + validation_fraction: float = 0.10, + score_column: str = "score", + random_seed: int = 42, + max_oversampling_ratio: float = 10.0, + per_label_target: int | None = None, + ): + self.input_dir = Path(input_dir) + self.output_dir = Path(output_dir) + self.validation_fraction = validation_fraction + self.score_column = score_column + self.random_seed = random_seed + self.max_oversampling_ratio = max_oversampling_ratio + self.per_label_target = per_label_target + + self.train_dir = self.output_dir / "training_set" + self.val_dir = self.output_dir / "validation_set" + self.train_dir.mkdir(parents=True, exist_ok=True) + self.val_dir.mkdir(parents=True, exist_ok=True) + + np.random.seed(self.random_seed) + + def process_all_files(self): + jsonl_files = sorted(self.input_dir.glob("*.jsonl")) + if not jsonl_files: + logger.error("No JSONL files found in %s", self.input_dir) + return + + datasets: List[Tuple[str, pd.DataFrame]] = [] + for path in jsonl_files: + df = self._load_file(path) + if not df.empty: + datasets.append((path.name, df)) + + if not datasets: + logger.error("No valid datasets to process.") + return + + for filename, df in datasets: + language = df.get("language", pd.Series(["unknown"])).iloc[0] + logger.info("\nProcessing %s (%s) with %d available rows", filename, language, len(df)) + + target_size = len(df) + train_df, val_df, train_target_total, val_target_total = self._build_splits(df, target_size) + + save_dataset( + train_df, + self.train_dir / f"{filename.replace('.jsonl', '')}_train.jsonl", + score_column=self.score_column, + log=logger, + ) + save_dataset( + val_df, + self.val_dir / f"{filename.replace('.jsonl', '')}_val.jsonl", + score_column=self.score_column, + log=logger, + ) + + log_distribution(train_df, self.score_column, f"Training ({language})", train_target_total, logger) + log_distribution(val_df, self.score_column, f"Validation ({language})", val_target_total, logger) + + logger.info("\nAll files processed. Output written to %s", self.output_dir) + + def _load_file(self, file_path: Path) -> pd.DataFrame: + try: + df = pd.read_json(file_path, lines=True) + except ValueError as exc: + logger.error("Failed to read %s: %s", file_path, exc) + return pd.DataFrame() + + if self.score_column not in df.columns: + logger.error("File %s missing required column '%s'", file_path, self.score_column) + return pd.DataFrame() + + df[self.score_column] = df[self.score_column].apply(normalize_score_value) + df[self.score_column] = pd.to_numeric(df[self.score_column], errors="coerce") + df = df.dropna(subset=[self.score_column]) + + df = df[df[self.score_column].apply(lambda x: int(x) == float(x))] + df["language"] = file_path.name.split("_sampled", 1)[0] + + logger.info("Loaded %d valid rows from %s", len(df), file_path.name) + return df + + def _build_splits(self, df: pd.DataFrame, target_size: int) -> Tuple[pd.DataFrame, pd.DataFrame]: + unique_scores = sorted(df[self.score_column].unique()) + if not unique_scores: + return df.head(0).copy(), df.head(0).copy() + + per_label_total_target = ( + float(self.per_label_target) if self.per_label_target is not None else (target_size / len(unique_scores)) + ) + + train_target_total = int(per_label_total_target * (1 - self.validation_fraction) * len(unique_scores)) + val_target_total = int(per_label_total_target * self.validation_fraction * len(unique_scores)) + + train_targets = per_label_targets(unique_scores, train_target_total) + val_targets = per_label_targets(unique_scores, val_target_total) + + train_pools, val_pools = split_label_pools( + df, + unique_scores, + score_column=self.score_column, + validation_fraction=self.validation_fraction, + random_seed=self.random_seed, + ) + + train_samples = [] + val_samples = [] + + for score in unique_scores: + train_pool = train_pools.get(score, df.head(0).copy()) + val_pool = val_pools.get(score, df.head(0).copy()) + + logger.info( + "Score %.1f → train pool %d rows, val pool %d rows (targets: train %d, val %d)", + score, + len(train_pool), + len(val_pool), + train_targets.get(score, 0), + val_targets.get(score, 0), + ) + + train_sample = sample_with_cap( + train_pool, + train_targets.get(score, 0), + score, + "train", + seed_offset=0, + random_seed=self.random_seed, + max_oversampling_ratio=self.max_oversampling_ratio, + log=logger, + ) + val_sample = sample_with_cap( + val_pool, + val_targets.get(score, 0), + score, + "validation", + seed_offset=10_000, + random_seed=self.random_seed, + max_oversampling_ratio=self.max_oversampling_ratio, + log=logger, + ) + + if not train_sample.empty: + train_samples.append(train_sample) + if not val_sample.empty: + val_samples.append(val_sample) + + train_df = pd.concat(train_samples, ignore_index=True) if train_samples else df.head(0).copy() + val_df = pd.concat(val_samples, ignore_index=True) if val_samples else df.head(0).copy() + + if not train_df.empty: + train_df = train_df.sample(frac=1, random_state=self.random_seed).reset_index(drop=True) + if not val_df.empty: + val_df = val_df.sample(frac=1, random_state=self.random_seed + 1).reset_index(drop=True) + + return train_df, val_df, train_target_total, val_target_total + + +def main(): + parser = argparse.ArgumentParser(description="Run the uniform split sampler.") + parser.add_argument( + "-c", + "--config", + type=Path, + default=DEFAULT_CONFIG_PATH, + help=f"Path to the sampler config file (default: {DEFAULT_CONFIG_PATH})", + ) + args = parser.parse_args() + + config = load_sampler_config(args.config) + sampler = UniformSplitSampler(**config) + sampler.process_all_files() + + +if __name__ == "__main__": + main() diff --git a/src/ml_filter/utils/uniform_split_sampler_utils.py b/src/ml_filter/utils/uniform_split_sampler_utils.py new file mode 100644 index 00000000..b7c49670 --- /dev/null +++ b/src/ml_filter/utils/uniform_split_sampler_utils.py @@ -0,0 +1,149 @@ +import logging +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import numpy as np +import pandas as pd +from omegaconf import OmegaConf + +logger = logging.getLogger(__name__) + + +def load_sampler_config(config_path: Path) -> Dict[str, Any]: + if not config_path.exists(): + raise FileNotFoundError(f"Sampler config file not found: {config_path}") + + logger.info("Loading sampler configuration from %s", config_path) + config_container = OmegaConf.load(config_path) + config = OmegaConf.to_container(config_container, resolve=True) + if not isinstance(config, dict): + raise ValueError(f"Sampler configuration must be a mapping, but received {type(config_container)}") + return config + + +def normalize_score_value(value): + if isinstance(value, (list, tuple, np.ndarray)): + return value[0] if value else np.nan + return value + + +def split_label_pools( + df: pd.DataFrame, + unique_scores: List[float], + score_column: str, + validation_fraction: float, + random_seed: int, +) -> Tuple[Dict[float, pd.DataFrame], Dict[float, pd.DataFrame]]: + train_pools: Dict[float, pd.DataFrame] = {} + val_pools: Dict[float, pd.DataFrame] = {} + + for score in unique_scores: + label_data = df[df[score_column] == score] + if label_data.empty: + base = df.head(0).copy() + train_pools[score] = base.copy() + val_pools[score] = base.copy() + continue + + shuffled = label_data.sample(frac=1, replace=False, random_state=random_seed + int(score)).reset_index( + drop=True + ) + n_total = len(shuffled) + n_train_rows = int(np.floor(n_total * (1 - validation_fraction))) + n_val_rows = n_total - n_train_rows + + train_pools[score] = shuffled.iloc[:n_train_rows] + val_pools[score] = shuffled.iloc[n_train_rows : n_train_rows + n_val_rows] + + return train_pools, val_pools + + +def sample_with_cap( + pool: pd.DataFrame, + target: int, + score: float, + split_name: str, + seed_offset: int, + random_seed: int, + max_oversampling_ratio: float, + log: logging.Logger | None = None, +) -> pd.DataFrame: + if pool.empty or target <= 0: + return pool.head(0).copy() + + max_allowed = int(len(pool) * max_oversampling_ratio) + if max_allowed <= 0: + return pool.head(0).copy() + + effective_target = min(target, max_allowed) + + replace = len(pool) < effective_target + sample = pool.sample( + n=effective_target, + replace=replace, + random_state=random_seed + seed_offset + int(score), + ) + if replace: + factor = effective_target / len(pool) + if log: + log.info( + " Oversampling %s %.1f: %d → %d (%.1fx)", + split_name, + score, + len(pool), + effective_target, + factor, + ) + return sample + + +def per_label_targets(scores: List[float], total_target: int) -> Dict[float, int]: + if not scores or total_target <= 0: + return {score: 0 for score in scores} + + base = total_target // len(scores) + remainder = total_target - base * len(scores) + + targets: Dict[float, int] = {} + for idx, score in enumerate(scores): + targets[score] = base + (1 if idx < remainder else 0) + return targets + + +def save_dataset(df: pd.DataFrame, path: Path, score_column: str, log: logging.Logger | None = None): + df_to_write = df.copy() + if not df_to_write.empty: + df_to_write[score_column] = df_to_write[score_column].apply(lambda x: [x]) + df_to_write.to_json(path, orient="records", lines=True) + if log: + log.info("Wrote %d rows to %s", len(df), path) + + +def log_distribution( + df: pd.DataFrame, + score_column: str, + label: str, + target_total: float | None = None, + log: logging.Logger | None = None, +): + log_ref = log or logger + if df.empty: + log_ref.info("%s: 0 samples", label) + return + + counts = df[score_column].value_counts().sort_index() + total = len(df) + log_ref.info("%s: %d samples", label, total) + for score, count in counts.items(): + split_pct = (count / total) * 100 if total > 0 else 0 + if target_total and target_total > 0: + target_pct = (count / target_total) * 100 + log_ref.info( + " Score %.1f: %d (%.2f%% of split, %.2f%% of target)", + score, + count, + split_pct, + target_pct, + ) + else: + log_ref.info(" Score %.1f: %d (%.2f%%)", score, count, split_pct) From 1f642ba4fb8b480cf9a4766958121c11e80224f0 Mon Sep 17 00:00:00 2001 From: Abbas Khan Date: Wed, 10 Dec 2025 15:21:28 +0100 Subject: [PATCH 2/4] feat: add uniform split sampler CLI entry point and tests --- src/ml_filter/__main__.py | 23 ++- .../sampling/uniform_split_sampler.py | 24 +-- .../resources/data/lorem_ipsum_sampling.jsonl | 15 ++ tests/test_uniform_split_sampler.py | 143 ++++++++++++++++++ 4 files changed, 177 insertions(+), 28 deletions(-) create mode 100644 tests/resources/data/lorem_ipsum_sampling.jsonl create mode 100644 tests/test_uniform_split_sampler.py diff --git a/src/ml_filter/__main__.py b/src/ml_filter/__main__.py index 5e357d7b..e7790332 100644 --- a/src/ml_filter/__main__.py +++ b/src/ml_filter/__main__.py @@ -12,12 +12,13 @@ from ml_filter.analysis.collect_ir_metrics import collect_ir_metrics from ml_filter.analysis.evaluate_predicted_annotations import evaluate_predicted_annotations from ml_filter.analysis.plot_score_distributions import plot_differences_in_scores, plot_scores -from ml_filter.annotation.embedding_pipeline import run_embedding_pipeline from ml_filter.annotation.annotation_pipeline import run_annotation_pipeline +from ml_filter.annotation.embedding_pipeline import run_embedding_pipeline from ml_filter.compare_experiments import compare_experiments from ml_filter.data_processing.deduplication import deduplicate_jsonl from ml_filter.llm_client import LLMClient from ml_filter.sample_from_hf_dataset import sample_from_hf_dataset, upload_file_to_hf +from ml_filter.sampling.uniform_split_sampler import UniformSplitSampler from ml_filter.training.embedding_training_pipeline import run_embedding_head_training_pipeline from ml_filter.translate import TranslationServiceType, TranslatorFactory from ml_filter.utils.chunk_data import chunk_jsonl @@ -26,6 +27,7 @@ from ml_filter.utils.manipulate_documents import merge_and_sort_jsonl_files from ml_filter.utils.manipulate_prompt import add_target_language_to_prompt from ml_filter.utils.statistics import compute_num_words_and_chars_in_jsonl, run_word_count_jsonl_files +from ml_filter.utils.uniform_split_sampler_utils import load_sampler_config input_file_path_option = click.option( "--input_file_path", @@ -170,6 +172,19 @@ def entry_point_compare_experiments(config_file_path: Path): compare_experiments(config_file_path) +@main.command(name="uniform_split_sampler") +@click.option( + "--config_file_path", + type=click_pathlib.Path(exists=True), + required=True, + help="Path to the YAML config file for the uniform split sampler.", +) +def entry_point_uniform_split_sampler(config_file_path: Path): + config = load_sampler_config(config_file_path) + sampler = UniformSplitSampler(**config) + sampler.process_all_files() + + @main.command(name="chunk_jsonl") @click.option( "--input_file_path", @@ -371,7 +386,7 @@ def aggregate_human_annotations_cli( "--min_metrics", type=str, help="Comma-separated list of metrics for which lower is better." - + "All other metrics are considered to be better when higher.", + + "All other metrics are considered to be better when higher.", ) @click.option( "--report_metrics", @@ -752,9 +767,7 @@ def entry_run_embedding_pipeline(config_file_path: Path): ) def entry_run_annotations(config_file_path: Path): """Run annotation pipeline using precomputed embeddings from HDF5.""" - run_annotation_pipeline( - config_file_path=config_file_path - ) + run_annotation_pipeline(config_file_path=config_file_path) def _get_translator_helper(translation_service: str, ignore_tag_text: Optional[str] = None): diff --git a/src/ml_filter/sampling/uniform_split_sampler.py b/src/ml_filter/sampling/uniform_split_sampler.py index ae62c202..dfecec2d 100644 --- a/src/ml_filter/sampling/uniform_split_sampler.py +++ b/src/ml_filter/sampling/uniform_split_sampler.py @@ -1,6 +1,5 @@ """Uniform split sampler: split by label first, then oversample within each split.""" -import argparse import logging from pathlib import Path from typing import List, Tuple @@ -9,7 +8,6 @@ import pandas as pd from ml_filter.utils.uniform_split_sampler_utils import ( - load_sampler_config, log_distribution, normalize_score_value, per_label_targets, @@ -20,7 +18,7 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) -DEFAULT_CONFIG_PATH = Path(__file__).resolve().parents[3] / "configs" / "sampler" / "uniform_split_sampler.yaml" +__all__ = ["UniformSplitSampler"] class UniformSplitSampler: @@ -185,23 +183,3 @@ def _build_splits(self, df: pd.DataFrame, target_size: int) -> Tuple[pd.DataFram val_df = val_df.sample(frac=1, random_state=self.random_seed + 1).reset_index(drop=True) return train_df, val_df, train_target_total, val_target_total - - -def main(): - parser = argparse.ArgumentParser(description="Run the uniform split sampler.") - parser.add_argument( - "-c", - "--config", - type=Path, - default=DEFAULT_CONFIG_PATH, - help=f"Path to the sampler config file (default: {DEFAULT_CONFIG_PATH})", - ) - args = parser.parse_args() - - config = load_sampler_config(args.config) - sampler = UniformSplitSampler(**config) - sampler.process_all_files() - - -if __name__ == "__main__": - main() diff --git a/tests/resources/data/lorem_ipsum_sampling.jsonl b/tests/resources/data/lorem_ipsum_sampling.jsonl new file mode 100644 index 00000000..a01a15d9 --- /dev/null +++ b/tests/resources/data/lorem_ipsum_sampling.jsonl @@ -0,0 +1,15 @@ +{"id": "fixture-0-0", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-1", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-2", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-3", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-0-4", "score": 0.0, "aggregation_type": "majority"} +{"id": "fixture-1-0", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-1", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-2", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-3", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-1-4", "score": 1.0, "aggregation_type": "majority"} +{"id": "fixture-2-0", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-1", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-2", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-3", "score": 2.0, "aggregation_type": "majority"} +{"id": "fixture-2-4", "score": 2.0, "aggregation_type": "majority"} diff --git a/tests/test_uniform_split_sampler.py b/tests/test_uniform_split_sampler.py new file mode 100644 index 00000000..b4a29c9c --- /dev/null +++ b/tests/test_uniform_split_sampler.py @@ -0,0 +1,143 @@ +import json +import shutil +from collections import Counter +from pathlib import Path + +from ml_filter.sampling.uniform_split_sampler import UniformSplitSampler + +TESTS_ROOT = Path(__file__).resolve().parent +FIXTURE_INPUT_NAME = "lorem_ipsum_sampling.jsonl" +FIXTURE_INPUT_PATH = TESTS_ROOT / "resources" / "data" / FIXTURE_INPUT_NAME +TARGET_INPUT_FILENAME = "lorem_ipsum_sampling.jsonl" + + +def _write_jsonl(path: Path, rows: list[dict]): + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as handle: + for row in rows: + handle.write(json.dumps(row) + "\n") + + +def _read_jsonl(path: Path) -> list[dict]: + with path.open("r", encoding="utf-8") as handle: + return [json.loads(line) for line in handle if line.strip()] + + +def test_uniform_split_sampler_uses_fixture_distribution(tmp_path): + input_dir = tmp_path / "input_data" + input_dir.mkdir() + output_dir = tmp_path / "output_data" + + fixture_target = input_dir / TARGET_INPUT_FILENAME + shutil.copy(FIXTURE_INPUT_PATH, fixture_target) + + sampler = UniformSplitSampler( + input_dir=str(input_dir), + output_dir=str(output_dir), + validation_fraction=0.2, + score_column="score", + random_seed=123, + max_oversampling_ratio=1.0, + ) + sampler.process_all_files() + + train_file = output_dir / "training_set" / f"{TARGET_INPUT_FILENAME.replace('.jsonl', '')}_train.jsonl" + val_file = output_dir / "validation_set" / f"{TARGET_INPUT_FILENAME.replace('.jsonl', '')}_val.jsonl" + + assert train_file.exists() + assert val_file.exists() + + train_records = _read_jsonl(train_file) + val_records = _read_jsonl(val_file) + + assert len(train_records) == 12 + assert len(val_records) == 3 + + train_counts = Counter(int(row["score"][0]) for row in train_records) + val_counts = Counter(int(row["score"][0]) for row in val_records) + + assert train_counts == {0: 4, 1: 4, 2: 4} + assert val_counts == {0: 1, 1: 1, 2: 1} + + assert all(isinstance(row["score"], list) and len(row["score"]) == 1 for row in train_records + val_records) + + +def test_uniform_split_sampler_honors_per_label_target(tmp_path): + input_dir = tmp_path / "input_per_label" + output_dir = tmp_path / "output_per_label" + + records = [] + for score in range(3): + for idx in range(5): + records.append( + { + "id": f"balanced-{score}-{idx}", + "score": float(score), + "aggregation_type": "majority", + } + ) + + input_file = input_dir / "balanced_dataset.jsonl" + _write_jsonl(input_file, records) + + sampler = UniformSplitSampler( + input_dir=str(input_dir), + output_dir=str(output_dir), + validation_fraction=0.0, + score_column="score", + random_seed=11, + max_oversampling_ratio=10.0, + per_label_target=2, + ) + sampler.process_all_files() + + train_file = output_dir / "training_set" / "balanced_dataset_train.jsonl" + train_records = _read_jsonl(train_file) + + train_counts = Counter(int(row["score"][0]) for row in train_records) + assert train_counts == {0: 2, 1: 2, 2: 2} + + +def test_uniform_split_sampler_respects_max_oversampling_ratio(tmp_path): + input_dir = tmp_path / "input_sparse" + output_dir = tmp_path / "output_sparse" + + source_counts = {0: 1, 1: 2, 2: 3} + sparse_records = [] + for score, count in source_counts.items(): + for idx in range(count): + sparse_records.append( + { + "id": f"sparse-{score}-{idx}", + "score": float(score), + "aggregation_type": "majority", + } + ) + + input_file = input_dir / "sparse_dataset.jsonl" + _write_jsonl(input_file, sparse_records) + + sampler = UniformSplitSampler( + input_dir=str(input_dir), + output_dir=str(output_dir), + validation_fraction=0.0, + score_column="score", + random_seed=7, + max_oversampling_ratio=2.0, + per_label_target=500, + ) + sampler.process_all_files() + + train_file = output_dir / "training_set" / "sparse_dataset_train.jsonl" + val_file = output_dir / "validation_set" / "sparse_dataset_val.jsonl" + + train_records = _read_jsonl(train_file) + val_records = _read_jsonl(val_file) + + assert not val_records # validation_fraction=0.0, so no validation records expected + + train_counts = Counter(int(row["score"][0]) for row in train_records) + assert train_counts == {0: 2, 1: 4, 2: 6} + + for score, available in source_counts.items(): + assert train_counts[score] <= available * sampler.max_oversampling_ratio From 2a4bdeb361acaec1b7c9dd0f179a6f47f692c08a Mon Sep 17 00:00:00 2001 From: Abbas Khan Date: Fri, 12 Dec 2025 17:30:15 +0100 Subject: [PATCH 3/4] refactor: rename max_oversampling_ratio to max_upsample_factor for consistency --- configs/sampler/uniform_split_sampler.yaml | 2 +- .../sampling/uniform_split_sampler.py | 40 +++++++++---------- .../utils/uniform_split_sampler_utils.py | 21 +++++----- tests/test_uniform_split_sampler.py | 10 ++--- 4 files changed, 38 insertions(+), 35 deletions(-) diff --git a/configs/sampler/uniform_split_sampler.yaml b/configs/sampler/uniform_split_sampler.yaml index 97329bf4..ad622d2b 100644 --- a/configs/sampler/uniform_split_sampler.yaml +++ b/configs/sampler/uniform_split_sampler.yaml @@ -3,6 +3,6 @@ output_dir: /home/abbas-khan/ml_filter/src/ml_filter/sampling/processed_data_uni validation_fraction: 0.10 score_column: score random_seed: 42 -max_oversampling_ratio: 10.0 +max_upsample_factor: 10.0 # Set per_label_target to force each score label to aim for this many samples per language file (before split) per_label_target: 32800 diff --git a/src/ml_filter/sampling/uniform_split_sampler.py b/src/ml_filter/sampling/uniform_split_sampler.py index dfecec2d..cac57b76 100644 --- a/src/ml_filter/sampling/uniform_split_sampler.py +++ b/src/ml_filter/sampling/uniform_split_sampler.py @@ -2,14 +2,14 @@ import logging from pathlib import Path -from typing import List, Tuple +from typing import Dict, Tuple import numpy as np import pandas as pd from ml_filter.utils.uniform_split_sampler_utils import ( + extract_score_value, log_distribution, - normalize_score_value, per_label_targets, sample_with_cap, save_dataset, @@ -29,7 +29,7 @@ def __init__( validation_fraction: float = 0.10, score_column: str = "score", random_seed: int = 42, - max_oversampling_ratio: float = 10.0, + max_upsample_factor: float = 10.0, per_label_target: int | None = None, ): self.input_dir = Path(input_dir) @@ -37,7 +37,7 @@ def __init__( self.validation_fraction = validation_fraction self.score_column = score_column self.random_seed = random_seed - self.max_oversampling_ratio = max_oversampling_ratio + self.max_upsample_factor = max_upsample_factor self.per_label_target = per_label_target self.train_dir = self.output_dir / "training_set" @@ -47,44 +47,44 @@ def __init__( np.random.seed(self.random_seed) - def process_all_files(self): + def process_all_files(self) -> None: jsonl_files = sorted(self.input_dir.glob("*.jsonl")) if not jsonl_files: logger.error("No JSONL files found in %s", self.input_dir) return - datasets: List[Tuple[str, pd.DataFrame]] = [] + datasets: Dict[Path, pd.DataFrame] = {} for path in jsonl_files: df = self._load_file(path) if not df.empty: - datasets.append((path.name, df)) + datasets[path] = df if not datasets: logger.error("No valid datasets to process.") return - for filename, df in datasets: - language = df.get("language", pd.Series(["unknown"])).iloc[0] - logger.info("\nProcessing %s (%s) with %d available rows", filename, language, len(df)) + for path, df in datasets.items(): + dataset_name = path.name.replace(".jsonl", "") + logger.info("\nProcessing %s with %d available rows", dataset_name, len(df)) target_size = len(df) train_df, val_df, train_target_total, val_target_total = self._build_splits(df, target_size) save_dataset( train_df, - self.train_dir / f"{filename.replace('.jsonl', '')}_train.jsonl", + self.train_dir / f"{dataset_name}_train.jsonl", score_column=self.score_column, log=logger, ) save_dataset( val_df, - self.val_dir / f"{filename.replace('.jsonl', '')}_val.jsonl", + self.val_dir / f"{dataset_name}_val.jsonl", score_column=self.score_column, log=logger, ) - log_distribution(train_df, self.score_column, f"Training ({language})", train_target_total, logger) - log_distribution(val_df, self.score_column, f"Validation ({language})", val_target_total, logger) + log_distribution(train_df, self.score_column, "Training", train_target_total, logger) + log_distribution(val_df, self.score_column, "Validation", val_target_total, logger) logger.info("\nAll files processed. Output written to %s", self.output_dir) @@ -99,20 +99,20 @@ def _load_file(self, file_path: Path) -> pd.DataFrame: logger.error("File %s missing required column '%s'", file_path, self.score_column) return pd.DataFrame() - df[self.score_column] = df[self.score_column].apply(normalize_score_value) + df[self.score_column] = df[self.score_column].apply(extract_score_value) df[self.score_column] = pd.to_numeric(df[self.score_column], errors="coerce") df = df.dropna(subset=[self.score_column]) df = df[df[self.score_column].apply(lambda x: int(x) == float(x))] - df["language"] = file_path.name.split("_sampled", 1)[0] logger.info("Loaded %d valid rows from %s", len(df), file_path.name) return df - def _build_splits(self, df: pd.DataFrame, target_size: int) -> Tuple[pd.DataFrame, pd.DataFrame]: + def _build_splits(self, df: pd.DataFrame, target_size: int) -> Tuple[pd.DataFrame, pd.DataFrame, int, int]: unique_scores = sorted(df[self.score_column].unique()) if not unique_scores: - return df.head(0).copy(), df.head(0).copy() + empty = df.head(0).copy() + return empty, empty, 0, 0 per_label_total_target = ( float(self.per_label_target) if self.per_label_target is not None else (target_size / len(unique_scores)) @@ -155,7 +155,7 @@ def _build_splits(self, df: pd.DataFrame, target_size: int) -> Tuple[pd.DataFram "train", seed_offset=0, random_seed=self.random_seed, - max_oversampling_ratio=self.max_oversampling_ratio, + max_upsample_factor=self.max_upsample_factor, log=logger, ) val_sample = sample_with_cap( @@ -165,7 +165,7 @@ def _build_splits(self, df: pd.DataFrame, target_size: int) -> Tuple[pd.DataFram "validation", seed_offset=10_000, random_seed=self.random_seed, - max_oversampling_ratio=self.max_oversampling_ratio, + max_upsample_factor=self.max_upsample_factor, log=logger, ) diff --git a/src/ml_filter/utils/uniform_split_sampler_utils.py b/src/ml_filter/utils/uniform_split_sampler_utils.py index b7c49670..905f7b92 100644 --- a/src/ml_filter/utils/uniform_split_sampler_utils.py +++ b/src/ml_filter/utils/uniform_split_sampler_utils.py @@ -21,10 +21,12 @@ def load_sampler_config(config_path: Path) -> Dict[str, Any]: return config -def normalize_score_value(value): +def extract_score_value(value: Any) -> float | int | np.floating | np.integer | None: + """Extract a scalar score from lists/arrays when present.""" + if isinstance(value, (list, tuple, np.ndarray)): - return value[0] if value else np.nan - return value + return value[0] if len(value) > 0 else np.nan + return value # type: ignore[return-value] def split_label_pools( @@ -37,8 +39,9 @@ def split_label_pools( train_pools: Dict[float, pd.DataFrame] = {} val_pools: Dict[float, pd.DataFrame] = {} + grouped = df.groupby(score_column) for score in unique_scores: - label_data = df[df[score_column] == score] + label_data = grouped.get_group(score) if score in grouped.groups else df.head(0).copy() if label_data.empty: base = df.head(0).copy() train_pools[score] = base.copy() @@ -65,14 +68,14 @@ def sample_with_cap( split_name: str, seed_offset: int, random_seed: int, - max_oversampling_ratio: float, + max_upsample_factor: float, log: logging.Logger | None = None, ) -> pd.DataFrame: if pool.empty or target <= 0: return pool.head(0).copy() - max_allowed = int(len(pool) * max_oversampling_ratio) - if max_allowed <= 0: + max_allowed = int(len(pool) * max_upsample_factor) + if max_allowed == 0: return pool.head(0).copy() effective_target = min(target, max_allowed) @@ -110,7 +113,7 @@ def per_label_targets(scores: List[float], total_target: int) -> Dict[float, int return targets -def save_dataset(df: pd.DataFrame, path: Path, score_column: str, log: logging.Logger | None = None): +def save_dataset(df: pd.DataFrame, path: Path, score_column: str, log: logging.Logger | None = None) -> None: df_to_write = df.copy() if not df_to_write.empty: df_to_write[score_column] = df_to_write[score_column].apply(lambda x: [x]) @@ -125,7 +128,7 @@ def log_distribution( label: str, target_total: float | None = None, log: logging.Logger | None = None, -): +) -> None: log_ref = log or logger if df.empty: log_ref.info("%s: 0 samples", label) diff --git a/tests/test_uniform_split_sampler.py b/tests/test_uniform_split_sampler.py index b4a29c9c..68795a91 100644 --- a/tests/test_uniform_split_sampler.py +++ b/tests/test_uniform_split_sampler.py @@ -37,7 +37,7 @@ def test_uniform_split_sampler_uses_fixture_distribution(tmp_path): validation_fraction=0.2, score_column="score", random_seed=123, - max_oversampling_ratio=1.0, + max_upsample_factor=1.0, ) sampler.process_all_files() @@ -86,7 +86,7 @@ def test_uniform_split_sampler_honors_per_label_target(tmp_path): validation_fraction=0.0, score_column="score", random_seed=11, - max_oversampling_ratio=10.0, + max_upsample_factor=10.0, per_label_target=2, ) sampler.process_all_files() @@ -98,7 +98,7 @@ def test_uniform_split_sampler_honors_per_label_target(tmp_path): assert train_counts == {0: 2, 1: 2, 2: 2} -def test_uniform_split_sampler_respects_max_oversampling_ratio(tmp_path): +def test_uniform_split_sampler_respects_max_upsample_factor(tmp_path): input_dir = tmp_path / "input_sparse" output_dir = tmp_path / "output_sparse" @@ -123,7 +123,7 @@ def test_uniform_split_sampler_respects_max_oversampling_ratio(tmp_path): validation_fraction=0.0, score_column="score", random_seed=7, - max_oversampling_ratio=2.0, + max_upsample_factor=2.0, per_label_target=500, ) sampler.process_all_files() @@ -140,4 +140,4 @@ def test_uniform_split_sampler_respects_max_oversampling_ratio(tmp_path): assert train_counts == {0: 2, 1: 4, 2: 6} for score, available in source_counts.items(): - assert train_counts[score] <= available * sampler.max_oversampling_ratio + assert train_counts[score] <= available * sampler.max_upsample_factor From 8512f7c1c70c463285db3ba797b84285e4f4938f Mon Sep 17 00:00:00 2001 From: Abbas Khan Date: Fri, 12 Dec 2025 23:21:22 +0100 Subject: [PATCH 4/4] chore: Added a value error to ensure max up sampling is always > 1. --- src/ml_filter/utils/uniform_split_sampler_utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ml_filter/utils/uniform_split_sampler_utils.py b/src/ml_filter/utils/uniform_split_sampler_utils.py index 905f7b92..0421bf59 100644 --- a/src/ml_filter/utils/uniform_split_sampler_utils.py +++ b/src/ml_filter/utils/uniform_split_sampler_utils.py @@ -26,7 +26,7 @@ def extract_score_value(value: Any) -> float | int | np.floating | np.integer | if isinstance(value, (list, tuple, np.ndarray)): return value[0] if len(value) > 0 else np.nan - return value # type: ignore[return-value] + return value def split_label_pools( @@ -71,12 +71,13 @@ def sample_with_cap( max_upsample_factor: float, log: logging.Logger | None = None, ) -> pd.DataFrame: + if max_upsample_factor < 1: + raise ValueError("max_upsample_factor must be >= 1") + if pool.empty or target <= 0: return pool.head(0).copy() max_allowed = int(len(pool) * max_upsample_factor) - if max_allowed == 0: - return pool.head(0).copy() effective_target = min(target, max_allowed)