diff --git a/impc_etl/jobs/ingest/data_ingestion.py b/impc_etl/jobs/ingest/data_ingestion.py index 5310b105..17bb4b33 100644 --- a/impc_etl/jobs/ingest/data_ingestion.py +++ b/impc_etl/jobs/ingest/data_ingestion.py @@ -144,6 +144,32 @@ def copy_products_report(): return tracking_products_mice_file_asset +tracking_products_crispr_file_asset = create_input_asset( + "tracking/gentar-products_crispr-latest.json" +) + +@asset.multi( + schedule=[tracking_directory_asset], + outlets=[tracking_products_crispr_file_asset], + dag_id=f"{dr_tag}_copy_products_crispr_report", +) +def copy_products_report(): + """Gather tracking data from GenTar when tracking directory is available""" + source_file = f"{data_archive_path}/gentar-data-archive/product_reports/gentar-products_crispr-latest.json" + target_file = f"{input_data_path}/tracking/gentar-products_crispr.json" + + task_logger.info(f"Copying tracking data from {source_file} to {target_file}") + + # Ensure source file exists + if not os.path.exists(source_file): + raise FileNotFoundError(f"Source tracking file not found: {source_file}") + + # Copy the file + shutil.copy(source_file, target_file) + task_logger.info(f"Successfully copied tracking data to {target_file}") + + return tracking_products_mice_file_asset + gene_interest_asset = create_input_asset("tracking/gene_interest.tsv") gene_interest_json_asset = create_input_asset("tracking/gene_interest.json") diff --git a/impc_etl/jobs/load/impc_spa/__init__.py b/impc_etl/jobs/load/impc_spa/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/impc_etl/jobs/load/impc_spa/impc_chromosome_mapper.py b/impc_etl/jobs/load/impc_spa/impc_chromosome_mapper.py new file mode 100644 index 00000000..f5adbc12 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_chromosome_mapper.py @@ -0,0 +1,36 @@ +import logging +import textwrap +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_summary_service_json_asset = create_input_asset("output/impc_web_api/gene_summary_service_json") +chromosome_map_json_asset = create_output_asset("impc_spa/chromosome-map.json") + +@asset.multi( + schedule=[gene_summary_service_json_asset], + outlets=[chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_chromosome_mapper", + description=textwrap.dedent( + """IMPC SPA chromosome mapper DAG.""" + ), + tags=["impc_spa", "chromosome map"], +) +@with_spark_session +def impc_spa_chromosome_mapper(): + import json + from pyspark.sql import SparkSession + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + gene_summary_service_json_path = gene_summary_service_json_asset.uri + gene_summary_df = spark.read.json(gene_summary_service_json_path) + gene_summary_df = gene_summary_df.select("mgiGeneAccessionId", "chrName") + gene_list = map(lambda row: row.asDict(), gene_summary_df.collect()) + chromosome_map_dict = {gene["mgiGeneAccessionId"]: gene["chrName"] for gene in gene_list} + output_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + with open(output_path, "w") as output_file: + output_file.write(json.dumps(chromosome_map_dict)) \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_spa/impc_diseases_mapper.py b/impc_etl/jobs/load/impc_spa/impc_diseases_mapper.py new file mode 100644 index 00000000..4ef1df56 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_diseases_mapper.py @@ -0,0 +1,89 @@ +import json +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import generate_valid_json_from_file +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_diseases_service_json_asset = create_input_asset("output/impc_web_api/gene_diseases_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_diseases_asset = AssetAlias("impc_spa_diseases") + +@dag( + schedule=[gene_diseases_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_diseases_mapper", + description=textwrap.dedent( + """IMPC SPA diseases mapper DAG.""" + ), + tags=["impc_spa", "diseases"], +) +def impc_spa_gene_diseases_mapper(): + @with_spark_session + @task + def process_gene_diseases(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, regexp_replace + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + gene_diseases_service_json_path = gene_diseases_service_json_asset.uri + gene_diseases_df = spark.read.json(gene_diseases_service_json_path) + result_df = chromosome_map_df.join(gene_diseases_df, "mgiGeneAccessionId", "left_outer") + result_df.show() + result_df = result_df.filter(col("associationCurated").isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + (result_df + .repartition("mgiGeneAccessionId") + .write + .option("header", True) + .mode("overwrite") + .partitionBy("mgiGeneAccessionId", "associationCurated") + .json(f"{get_data_release_work_dir()}/output/impc_spa/gene_diseases_temp_json") + ) + print("Finished") + + @task(outlets=[gene_diseases_asset]) + def process_temp_folder(*, outlet_events): + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/gene_diseases_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json", recursive=True): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + association_status = parent_dir.split("=")[1] + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + file_name = 'associated-diseases' if association_status == "true" else 'predicted-diseases' + gene_external_links_path = f"{gene_dir_path}/{file_name}.json" + with open(gene_external_links_path, "w") as gene_file: + gene_file.write(generate_valid_json_from_file(file_path)) + shutil.rmtree(input_path) + print("Finished") + + chain(process_gene_diseases(), process_temp_folder()) +impc_spa_gene_diseases_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_all_phenotype_data_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_all_phenotype_data_mapper.py new file mode 100644 index 00000000..c18b0e15 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_all_phenotype_data_mapper.py @@ -0,0 +1,96 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import write_partitioned_data, generate_valid_json_from_file +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +datasets_metadata_service_json_asset = create_input_asset("output/impc_web_api/datasets_metadata_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_all_phenotype_data_asset = AssetAlias("impc_spa_gene_all_phenotype_data") + +@dag( + schedule=[datasets_metadata_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_all_phenotype_data_mapper", + description=textwrap.dedent( + """IMPC SPA gene all phenotype data mapper DAG.""" + ), + tags=["impc_spa", "gene", "all phenotype data"], +) +def impc_spa_gene_all_phenotype_data_mapper(): + @with_spark_session + @task + def process_parquet(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, regexp_replace + from urllib.parse import unquote, urlparse + import json + + spark = SparkSession.builder.getOrCreate() + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + + dataset_df = spark.read.json(datasets_metadata_service_json_asset.uri) + result_df = chromosome_map_df.join(dataset_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col("pipelineStableId").isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + + (result_df + .repartition("mgiGeneAccessionId") + .write + .mode("overwrite") + .partitionBy("mgiGeneAccessionId", "pipelineStableId", "procedureStableId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/all_ph_data_temp_json") + ) + + print("Finished") + + @task(outlets=[gene_all_phenotype_data_asset]) + def process_temp_folder(*, outlet_events): + import json + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/all_ph_data_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json", recursive=True): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + procedure_stable_id = parent_dir.split("=")[1] + parent_dir = filepath_parts.pop() + pipeline_stable_id = parent_dir.split("=")[1] + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + general_gene_images_path = f"{gene_dir_path}/pipeline" + os.makedirs(general_gene_images_path, exist_ok=True) + pipeline_dir_path = f"{general_gene_images_path}/{pipeline_stable_id}" + os.makedirs(pipeline_dir_path, exist_ok=True) + # write data + file_to_be_generated_path = f"{pipeline_dir_path}/{procedure_stable_id}.json" + with open(file_to_be_generated_path, "w") as dataset_file: + dataset_file.write(generate_valid_json_from_file(file_path)) + shutil.rmtree(input_path) + print("Finished") + + chain(process_parquet(), process_temp_folder()) +impc_spa_gene_all_phenotype_data_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_allele_crispr_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_allele_crispr_mapper.py new file mode 100644 index 00000000..d64cd3d6 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_allele_crispr_mapper.py @@ -0,0 +1,102 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, dag, chain + +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import create_gentar_crispr_report_df +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gentar_products_crispr_latest_json_output_asset = create_input_asset("tracking/gentar-products_crispr.json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") + +@dag( + schedule=[gentar_products_crispr_latest_json_output_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_allele_crispr_mapper", + description=textwrap.dedent( + """IMPC SPA gene allele crispr mapper DAG.""" + ), + tags=["impc_spa", "allele", "crispr"], +) +def impc_spa_gene_allele_crispr_mapper(): + @with_spark_session + @task + def process_allele_crispr_data(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, regexp_replace + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + + product_crispr_df = create_gentar_crispr_report_df(spark, gentar_products_crispr_latest_json_output_asset.uri) + + result_df = chromosome_map_df.join(product_crispr_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col("alleleSuperscript").isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + (result_df + .repartition("mgiGeneAccessionId") + .write + .mode("overwrite") + .partitionBy("mgiGeneAccessionId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/allele_crispr_temp_json") + ) + + print("Finished") + @task + def process_temp_folder(): + import json + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/allele_crispr_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + + allele_dict = {} + # JSON file can have multiple objects for distinct alleles + allele_data = open(file_path, 'r') + for json_obj_str in allele_data.readlines(): + allele_obj = json.loads(json_obj_str) + if allele_obj["alleleSuperscript"] in allele_dict: + allele_dict[allele_obj["alleleSuperscript"]].append(json_obj_str) + else: + allele_dict[allele_obj["alleleSuperscript"]] = [json_obj_str] + + + general_alleles_dir_path = f"{gene_dir_path}/alleles" + os.makedirs(general_alleles_dir_path, exist_ok=True) + for original_allele_name, allele_json_list in allele_dict.items(): + allele_name = original_allele_name.replace("/", "_") + allele_dir_path = f"{general_alleles_dir_path}/{allele_name}" + os.makedirs(allele_dir_path, exist_ok=True) + file_to_be_generated_path = f"{allele_dir_path}/crispr.json" + with open(file_to_be_generated_path, "w") as allele_file: + allele_file.write(f"[{','.join(allele_json_list)}]") + shutil.rmtree(input_path) + print("Finished") + + chain(process_allele_crispr_data(), process_temp_folder()) +impc_spa_gene_allele_crispr_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_allele_escell_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_allele_escell_mapper.py new file mode 100644 index 00000000..5bab3e32 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_allele_escell_mapper.py @@ -0,0 +1,101 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, dag, chain + +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +product_report_parquet_asset = create_input_asset("output/product_report_parquet") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") + +@dag( + schedule=[product_report_parquet_asset], + dag_id=f"{dr_tag}_impc_spa_gene_escell_mapper", + description=textwrap.dedent( + """IMPC SPA gene allele EsCells mapper DAG.""" + ), + tags=["impc_spa", "allele", "escells"], +) +def impc_spa_gene_allele_escell_mapper(): + @with_spark_session + @task + def process_allele_escell_data(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, regexp_replace + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + + product_df = spark.read.parquet(product_report_parquet_asset.uri) + product_df = product_df.filter(col("type") == "es_cell") + product_df = product_df.withColumnRenamed("mgi_accession_id", "mgiGeneAccessionId") + + result_df = chromosome_map_df.join(product_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col("allele_name").isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + + (result_df + .repartition("mgiGeneAccessionId") + .write + .mode("overwrite") + .partitionBy("mgiGeneAccessionId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/allele_escell_temp_json") + ) + + @task + def process_temp_folder(): + import json + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/allele_escell_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + + allele_dict = {} + # JSON file can have multiple objects for distinct alleles + allele_data = open(file_path, 'r') + for json_obj_str in allele_data.readlines(): + allele_obj = json.loads(json_obj_str) + if allele_obj["allele_name"] in allele_dict: + allele_dict[allele_obj["allele_name"]].append(json_obj_str) + else: + allele_dict[allele_obj["allele_name"]] = [json_obj_str] + + general_alleles_dir_path = f"{gene_dir_path}/alleles" + os.makedirs(general_alleles_dir_path, exist_ok=True) + for original_allele_name, allele_json_list in allele_dict.items(): + allele_name = original_allele_name.replace("/", "_") + allele_dir_path = f"{general_alleles_dir_path}/{allele_name}" + os.makedirs(allele_dir_path, exist_ok=True) + file_to_be_generated_path = f"{allele_dir_path}/es_cell.json" + with open(file_to_be_generated_path, "w") as allele_file: + allele_file.write(f"[{','.join(allele_json_list)}]") + shutil.rmtree(input_path) + print("Finished") + + chain(process_allele_escell_data(), process_temp_folder()) +impc_spa_gene_allele_escell_mapper() \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_allele_ivp_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_allele_ivp_mapper.py new file mode 100644 index 00000000..714c67f8 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_allele_ivp_mapper.py @@ -0,0 +1,102 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, dag, chain + +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +product_report_parquet_asset = create_input_asset("output/product_report_parquet") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") + +@dag( + schedule=[product_report_parquet_asset], + dag_id=f"{dr_tag}_impc_spa_gene_ivp_mapper", + description=textwrap.dedent( + """IMPC SPA gene allele intermediate vector mapper DAG.""" + ), + tags=["impc_spa", "allele", "ivp"], +) +def impc_spa_gene_allele_ivp_mapper(): + @with_spark_session + @task + def process_allele_ivp_data(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, regexp_replace + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + + product_df = spark.read.parquet(product_report_parquet_asset.uri) + product_df = product_df.filter(col("type") == "intermediate_vector") + product_df = product_df.withColumnRenamed("mgi_accession_id", "mgiGeneAccessionId") + + result_df = chromosome_map_df.join(product_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col("allele_name").isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + + print(result_df.count()) + (result_df + .repartition("mgiGeneAccessionId") + .write + .mode("overwrite") + .partitionBy("mgiGeneAccessionId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/allele_ivp_temp_json") + ) + + @task + def process_temp_folder(): + import json + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/allele_ivp_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + + allele_dict = {} + # JSON file can have multiple objects for distinct alleles + allele_data = open(file_path, 'r') + for json_obj_str in allele_data.readlines(): + allele_obj = json.loads(json_obj_str) + if allele_obj["allele_name"] in allele_dict: + allele_dict[allele_obj["allele_name"]].append(json_obj_str) + else: + allele_dict[allele_obj["allele_name"]] = [json_obj_str] + + general_alleles_dir_path = f"{gene_dir_path}/alleles" + os.makedirs(general_alleles_dir_path, exist_ok=True) + for original_allele_name, allele_json_list in allele_dict.items(): + allele_name = original_allele_name.replace("/", "_") + allele_dir_path = f"{general_alleles_dir_path}/{allele_name}" + os.makedirs(allele_dir_path, exist_ok=True) + file_to_be_generated_path = f"{allele_dir_path}/ivp.json" + with open(file_to_be_generated_path, "w") as allele_file: + allele_file.write(f"[{','.join(allele_json_list)}]") + shutil.rmtree(input_path) + print("Finished") + + chain(process_allele_ivp_data(), process_temp_folder()) +impc_spa_gene_allele_ivp_mapper() \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_allele_mice_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_allele_mice_mapper.py new file mode 100644 index 00000000..f4276bb9 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_allele_mice_mapper.py @@ -0,0 +1,88 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, dag, chain + +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import write_partitioned_data +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gentar_products_mice_latest_json_output_asset = create_input_asset("output/impc_web_api/gentar-products_mice-latest.json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") + +@dag( + schedule=[gentar_products_mice_latest_json_output_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_allele_mice_mapper", + description=textwrap.dedent( + """IMPC SPA gene allele mice mapper DAG.""" + ), + tags=["impc_spa", "allele", "mice"], +) +def impc_spa_gene_allele_mice_mapper(): + @with_spark_session + @task + def process_allele_mice_data(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=gentar_products_mice_latest_json_output_asset.uri, + col_to_filter="alleleName", + temp_folder_path="allele_mice_temp_json" + ) + print("Finished") + @task + def process_temp_folder(): + import json + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/allele_mice_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + + allele_dict = {} + # JSON file can have multiple objects for distinct alleles + allele_data = open(file_path, 'r') + for json_obj_str in allele_data.readlines(): + allele_obj = json.loads(json_obj_str) + if allele_obj["alleleName"] in allele_dict: + allele_dict[allele_obj["alleleName"]].append(json_obj_str) + else: + allele_dict[allele_obj["alleleName"]] = [json_obj_str] + + + general_alleles_dir_path = f"{gene_dir_path}/alleles" + os.makedirs(general_alleles_dir_path, exist_ok=True) + for original_allele_name, allele_json_list in allele_dict.items(): + allele_name = original_allele_name.replace("/", "_") + allele_dir_path = f"{general_alleles_dir_path}/{allele_name}" + os.makedirs(allele_dir_path, exist_ok=True) + file_to_be_generated_path = f"{allele_dir_path}/mice.json" + with open(file_to_be_generated_path, "w") as allele_file: + allele_file.write(f"[{','.join(allele_json_list)}]") + shutil.rmtree(input_path) + print("Finished") + + chain(process_allele_mice_data(), process_temp_folder()) +impc_spa_gene_allele_mice_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_allele_summary_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_allele_summary_mapper.py new file mode 100644 index 00000000..1432d1c0 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_allele_summary_mapper.py @@ -0,0 +1,199 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, dag, chain +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files, \ + create_gentar_crispr_report_df +from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +order_report_json_path_asset = create_input_asset("tracking/gentar-products_order.json") +gentar_products_crispr_latest_json_output_asset = create_input_asset("tracking/gentar-products_crispr.json") +gentar_products_mice_latest_json_output_asset = create_input_asset("output/impc_web_api/gentar-products_mice-latest.json") +product_report_parquet_asset = create_input_asset("output/product_report_parquet") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") + +@dag( + schedule=[ + order_report_json_path_asset, + gentar_products_crispr_latest_json_output_asset, + gentar_products_mice_latest_json_output_asset, + product_report_parquet_asset, + chromosome_map_json_asset + ], + dag_id=f"{dr_tag}_impc_spa_gene_allele_summary_mapper", + description=textwrap.dedent( + """IMPC SPA gene allele summary mapper DAG.""" + ), + tags=["impc_spa", "allele", "summary"], +) +def impc_spa_gene_order_mapper(): + @with_spark_session + @task + def process_reports(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, concat, lit, regexp_replace, when + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + order_report_df = spark.read.json(order_report_json_path_asset.uri) + general_product_df = spark.read.parquet(product_report_parquet_asset.uri) + product_crispr_df = create_gentar_crispr_report_df(spark, gentar_products_crispr_latest_json_output_asset.uri) + product_mice_df = spark.read.json(gentar_products_mice_latest_json_output_asset.uri) + + order_report_df = order_report_df.withColumnRenamed("alleleSuperscript", "alleleName") + product_crispr_df = product_crispr_df.withColumnRenamed("alleleSuperscript", "alleleName") + + product_crispr_df = product_crispr_df.select( + "alleleSymbol", + lit(True).alias("doesCrisprProductsExist") + ).dropDuplicates() + + product_mice_df = product_mice_df.withColumn( + "alleleSymbol", + concat(col("geneSymbol"), lit("<"), col("alleleName"), lit(">")) + ) + product_mice_df = product_mice_df.select( + "alleleSymbol", + lit(True).alias("doesMiceProductsExist") + ).dropDuplicates() + + general_product_df = general_product_df.withColumnsRenamed({ + "allele_name": "alleleName", + "marker_symbol": "geneSymbol", + "mgi_accession_id": "mgiGeneAccessionId" + }) + + general_product_df = general_product_df.withColumn( + "alleleSymbol", + concat(col("geneSymbol"), lit("<"), col("alleleName"), lit(">")) + ) + + general_product_df = general_product_df.drop("geneSymbol") + es_cell_product_df = general_product_df.where(col("type") == "es_cell") + es_cell_product_df = es_cell_product_df.select( + "alleleSymbol", + lit(True).alias("doesEsCellProductsExist") + ) + + tvp_product_df = general_product_df.where(col("type") == "targeting_vector") + tvp_product_df = tvp_product_df.select( + "alleleSymbol", + lit(True).alias("doesTargetingVectorProductsExist") + ) + + ivp_product_df = general_product_df.where(col("type") == "targeting_vector") + ivp_product_df = ivp_product_df.select( + "alleleSymbol", + lit(True).alias("doesIntermediateVectorProductsExist") + ) + + result_df = order_report_df.join( + product_crispr_df, + "alleleSymbol", + "left_outer" + ) + result_df = result_df.withColumn( + "doesCrisprProductsExist", + when(col("doesCrisprProductsExist").isNotNull(), lit(True)).otherwise(lit(False)), + ) + result_df = result_df.join( + product_mice_df, + "alleleSymbol", + "left_outer" + ) + result_df = result_df.withColumn( + "doesMiceProductsExist", + when(col("doesMiceProductsExist").isNotNull(), lit(True)).otherwise(lit(False)), + ) + result_df = result_df.join( + es_cell_product_df, + "alleleSymbol", + "left_outer" + ) + result_df = result_df.withColumn( + "doesEsCellProductsExist", + when(col("doesEsCellProductsExist").isNotNull(), lit(True)).otherwise(lit(False)), + ) + result_df = result_df.join( + tvp_product_df, + "alleleSymbol", + "left_outer" + ) + result_df = result_df.withColumn( + "doesTargetingVectorProductsExist", + when(col("doesTargetingVectorProductsExist").isNotNull(), lit(True)).otherwise(lit(False)), + ) + result_df = result_df.join( + ivp_product_df, + "alleleSymbol", + "left_outer" + ) + result_df = result_df.withColumn( + "doesIntermediateVectorProductsExist", + when(col("doesIntermediateVectorProductsExist").isNotNull(), lit(True)).otherwise(lit(False)), + ) + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + result_df = chromosome_map_df.join(result_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col("alleleName").isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + (result_df + .repartition("mgiGeneAccessionId") + .write + .option("header", True) + .mode("overwrite") + .partitionBy("mgiGeneAccessionId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/allele_summary_temp_json") + ) + print("Finished") + @task + def process_temp_folder(): + import json + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/allele_summary_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + + allele_data = open(file_path, 'r') + for json_obj_str in allele_data.readlines(): + allele_obj = json.loads(json_obj_str) + allele_obj["geneSymbol"] = allele_obj["alleleSymbol"].split("<")[0] + + allele_name = allele_obj["alleleName"].replace("/", "_") + allele_dir_path = f"{gene_dir_path}/{allele_name}" + os.makedirs(allele_dir_path, exist_ok=True) + file_to_be_generated_path = f"{allele_dir_path}/summary.json" + with open(file_to_be_generated_path, "w") as allele_file: + allele_file.write(f"[{','.join(allele_obj)}]") + shutil.rmtree(input_path) + + + chain(process_reports(), process_temp_folder()) +impc_spa_gene_order_mapper() \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_allele_tvp_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_allele_tvp_mapper.py new file mode 100644 index 00000000..7bbff311 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_allele_tvp_mapper.py @@ -0,0 +1,101 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, dag, chain + +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +product_report_parquet_asset = create_input_asset("output/product_report_parquet") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") + +@dag( + schedule=[product_report_parquet_asset], + dag_id=f"{dr_tag}_impc_spa_gene_tvp_mapper", + description=textwrap.dedent( + """IMPC SPA gene allele targeting vector mapper DAG.""" + ), + tags=["impc_spa", "allele", "tvp"], +) +def impc_spa_gene_allele_tvp_mapper(): + @with_spark_session + @task + def process_allele_tvp_data(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, regexp_replace + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + + product_df = spark.read.parquet(product_report_parquet_asset.uri) + product_df = product_df.filter(col("type") == "targeting_vector") + product_df = product_df.withColumnRenamed("mgi_accession_id", "mgiGeneAccessionId") + + result_df = chromosome_map_df.join(product_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col("allele_name").isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + + (result_df + .repartition("mgiGeneAccessionId") + .write + .mode("overwrite") + .partitionBy("mgiGeneAccessionId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/allele_tvp_temp_json") + ) + + @task + def process_temp_folder(): + import json + import os + import shutil + from glob import iglob + from urllib.parse import unquote, urlparse + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/allele_tvp_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + + allele_dict = {} + # JSON file can have multiple objects for distinct alleles + allele_data = open(file_path, 'r') + for json_obj_str in allele_data.readlines(): + allele_obj = json.loads(json_obj_str) + if allele_obj["allele_name"] in allele_dict: + allele_dict[allele_obj["allele_name"]].append(json_obj_str) + else: + allele_dict[allele_obj["allele_name"]] = [json_obj_str] + + general_alleles_dir_path = f"{gene_dir_path}/alleles" + os.makedirs(general_alleles_dir_path, exist_ok=True) + for original_allele_name, allele_json_list in allele_dict.items(): + allele_name = original_allele_name.replace("/", "_") + allele_dir_path = f"{general_alleles_dir_path}/{allele_name}" + os.makedirs(allele_dir_path, exist_ok=True) + file_to_be_generated_path = f"{allele_dir_path}/tvp.json" + with open(file_to_be_generated_path, "w") as allele_file: + allele_file.write(f"[{','.join(allele_json_list)}]") + shutil.rmtree(input_path) + print("Finished") + + chain(process_allele_tvp_data(), process_temp_folder()) +impc_spa_gene_allele_tvp_mapper() \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_expression_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_expression_mapper.py new file mode 100644 index 00000000..6c80cd59 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_expression_mapper.py @@ -0,0 +1,51 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_expression_service_json_asset = create_input_asset("output/impc_web_api/gene_expression_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_expression_asset = AssetAlias("impc_spa_gene_expression") + +@dag( + schedule=[gene_expression_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_expression_mapper", + description=textwrap.dedent( + """IMPC SPA gene expression mapper DAG.""" + ), + tags=["impc_spa", "gene", "lac z expression"], +) +def impc_spa_gene_expression_mapper(): + @with_spark_session + @task + def process_gene_expression(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=gene_expression_service_json_asset.uri, + col_to_filter="id", + temp_folder_path="gene_expression_temp_json" + ) + print("Finished") + + @task(outlets=[gene_expression_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="gene_expression_temp_json", + file_name="expression.json", + asset_alias=gene_expression_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_expression(), process_temp_folder()) +impc_spa_gene_expression_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_external_links_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_external_links_mapper.py new file mode 100644 index 00000000..503f8376 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_external_links_mapper.py @@ -0,0 +1,51 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +external_links_json_asset = create_input_asset("output/impc_web_api/external_links_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_external_links_asset = AssetAlias("impc_spa_external_links") + +@dag( + schedule=[external_links_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_external_links_mapper", + description=textwrap.dedent( + """IMPC SPA external links mapper DAG.""" + ), + tags=["impc_spa", "external links"], +) +def impc_spa_gene_external_links_mapper(): + @with_spark_session + @task + def process_gene_external_links(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=external_links_json_asset.uri, + col_to_filter="href", + temp_folder_path="external_links_temp_json" + ) + print("Finished") + + @task(outlets=[gene_external_links_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="external_links_temp_json", + file_name="external-links.json", + asset_alias=gene_external_links_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_external_links(), process_temp_folder()) +impc_spa_gene_external_links_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_histopathology_datasets_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_histopathology_datasets_mapper.py new file mode 100644 index 00000000..2e2518c3 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_histopathology_datasets_mapper.py @@ -0,0 +1,51 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +histopathology_service_json_asset = create_input_asset("output/impc_web_api/histopathology_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_histopathology_asset = AssetAlias("impc_spa_gene_histopathology") + +@dag( + schedule=[histopathology_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_histopathology_datasets_mapper", + description=textwrap.dedent( + """IMPC SPA gene histopathology datasets mapper DAG.""" + ), + tags=["impc_spa", "gene", "histopathology", "datasets"], +) +def impc_spa_gene_histopathology_datasets_mapper(): + @with_spark_session + @task + def process_gene_histopathology_datasets(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=histopathology_service_json_asset.uri, + col_to_filter="datasets", + temp_folder_path="histopathology_datasets_temp_json" + ) + print("Finished") + + @task(outlets=[gene_histopathology_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="histopathology_datasets_temp_json", + file_name="full-histopathology.json", + asset_alias=gene_histopathology_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_histopathology_datasets(), process_temp_folder()) +impc_spa_gene_histopathology_datasets_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_histopathology_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_histopathology_mapper.py new file mode 100644 index 00000000..1c199384 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_histopathology_mapper.py @@ -0,0 +1,51 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_histopathology_service_json_asset = create_input_asset("output/impc_web_api/gene_histopathology_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_histopathology_asset = AssetAlias("impc_spa_gene_histopathology") + +@dag( + schedule=[gene_histopathology_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_histopathology_mapper", + description=textwrap.dedent( + """IMPC SPA gene histopathology mapper DAG.""" + ), + tags=["impc_spa", "gene", "histopathology"], +) +def impc_spa_gene_histopathology_mapper(): + @with_spark_session + @task + def process_gene_histopathology(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=gene_histopathology_service_json_asset.uri, + col_to_filter="alleleAccessionId", + temp_folder_path="gene_histopathology_temp_json" + ) + print("Finished") + + @task(outlets=[gene_histopathology_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="gene_histopathology_temp_json", + file_name="gene-histopathology.json", + asset_alias=gene_histopathology_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_histopathology(), process_temp_folder()) +impc_spa_gene_histopathology_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_images_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_images_mapper.py new file mode 100644 index 00000000..5e90b5b1 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_images_mapper.py @@ -0,0 +1,52 @@ +import json +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_images_service_json_asset = create_input_asset("output/impc_web_api/gene_images_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_images_asset = AssetAlias("impc_spa_gene_images") + +@dag( + schedule=[gene_images_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_images_mapper", + description=textwrap.dedent( + """IMPC SPA gene images mapper DAG.""" + ), + tags=["impc_spa", "gene", "images"], +) +def impc_spa_gene_images_mapper(): + @with_spark_session + @task + def process_gene_images(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=gene_images_service_json_asset.uri, + col_to_filter="thumbnailUrl", + temp_folder_path="gene_images_temp_json" + ) + print("Finished") + + @task(outlets=[gene_images_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="gene_images_temp_json", + file_name="images.json", + asset_alias=gene_images_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_images(), process_temp_folder()) +impc_spa_gene_images_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_order_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_order_mapper.py new file mode 100644 index 00000000..628a10cd --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_order_mapper.py @@ -0,0 +1,50 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, dag, chain +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.airflow import create_input_asset + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +product_report_json_path_asset = create_input_asset("tracking/gentar-products_order.json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_order_asset = AssetAlias("impc_spa_gene_order") + +@dag( + schedule=[product_report_json_path_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_order_mapper", + description=textwrap.dedent( + """IMPC SPA gene order mapper DAG.""" + ), + tags=["impc_spa", "gene", "order"], +) +def impc_spa_gene_order_mapper(): + @with_spark_session + @task + def process_gene_order_report(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=product_report_json_path_asset.uri, + col_to_filter="alleleSymbol", + temp_folder_path="gene_order_temp_json" + ) + print("Finished") + + @task(outlets=[gene_order_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="gene_order_temp_json", + file_name="order.json", + asset_alias=gene_order_asset, + outlet_events=outlet_events + ) + print("Finished") + chain(process_gene_order_report(), process_temp_folder()) +impc_spa_gene_order_mapper() \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_phenotypehits_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_phenotypehits_mapper.py new file mode 100644 index 00000000..00e9effc --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_phenotypehits_mapper.py @@ -0,0 +1,53 @@ +import json +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_phenotype_hits_service_json_asset = create_input_asset("output/impc_web_api/gene_phenotype_hits_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_phenotype_hits_asset = AssetAlias("impc_spa_gene_phenotype_hits") + +@dag( + schedule=[gene_phenotype_hits_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_phenotypehits_mapper", + description=textwrap.dedent( + """IMPC SPA gene phenotype hits mapper DAG.""" + ), + tags=["impc_spa", "gene", "phenotype hits"], +) +def impc_spa_gene_phenotypehits_mapper(): + @with_spark_session + @task + def process_gene_phenotypehits(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=gene_phenotype_hits_service_json_asset.uri, + col_to_filter="alleleAccessionId", + temp_folder_path="gene_phenotypehits_temp_json" + ) + print("Finished") + + @task(outlets=[gene_phenotype_hits_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="gene_phenotypehits_temp_json", + file_name="phenotypehits.json", + asset_alias=gene_phenotype_hits_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_phenotypehits(), process_temp_folder()) +impc_spa_gene_phenotypehits_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_publications_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_publications_mapper.py new file mode 100644 index 00000000..323045ca --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_publications_mapper.py @@ -0,0 +1,60 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +publications_service_json_asset = create_input_asset("output/impc_web_api/publications_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_publications_asset = AssetAlias("impc_spa_gene_publications") + +@dag( + schedule=[publications_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_publications_mapper", + description=textwrap.dedent( + """IMPC SPA gene publications mapper DAG.""" + ), + tags=["impc_spa", "gene", "publications"], +) +def impc_spa_gene_publications_mapper(): + @with_spark_session + @task + def process_gene_publications(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import explode + + def process_allele_col(df): + df = df.withColumn("allele_fields", explode("alleles")) + df = df.select("*", "allele_fields.*") + return df + + spark = SparkSession.builder.getOrCreate() + + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=publications_service_json_asset.uri, + col_to_filter="alleles", + temp_folder_path="publications_temp_json", + filtered_dataframe_fn=process_allele_col + + ) + print("Finished") + + @task(outlets=[gene_publications_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="publications_temp_json", + file_name="publications.json", + asset_alias=gene_publications_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_publications(), process_temp_folder()) +impc_spa_gene_publications_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_significant_phenotypes_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_significant_phenotypes_mapper.py new file mode 100644 index 00000000..85217f4f --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_significant_phenotypes_mapper.py @@ -0,0 +1,83 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import write_partitioned_data +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +datasets_metadata_service_json_asset = create_input_asset("output/impc_web_api/datasets_metadata_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_sig_phenotypes_asset = AssetAlias("impc_spa_gene_sig_phenotypes") + +@dag( + schedule=[datasets_metadata_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_sig_phenotypes_mapper", + description=textwrap.dedent( + """IMPC SPA gene significant phenotypes mapper DAG.""" + ), + tags=["impc_spa", "gene", "significant phenotypes"], +) +def impc_spa_gene_significant_phenotypes_mapper(): + @with_spark_session + @task + def process_parquet(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import col + + def filter_dataframe(df): + return df.where(col("significant") == "true") + + spark = SparkSession.builder.getOrCreate() + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=datasets_metadata_service_json_asset.uri, + col_to_filter="significantPhenotype", + temp_folder_path="sig_phenotypes_temp_json", + filtered_dataframe_fn=filter_dataframe + ) + print("Finished") + + @task(outlets=[gene_sig_phenotypes_asset]) + def process_temp_folder(*, outlet_events): + from urllib.parse import unquote, urlparse + from glob import iglob + import json + import os + import shutil + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/sig_phenotypes_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + general_phenotypes_dir_path = f"{gene_dir_path}/significant_phenotypes" + os.makedirs(general_phenotypes_dir_path, exist_ok=True) + json_file = open(file_path, 'r') + for json_obj_str in json_file.readlines(): + sig_phenotype_obj = json.loads(json_obj_str) + sig_phenotype_id = sig_phenotype_obj["significantPhenotype"]["id"].replace(":", "_") + file_to_be_generated_path = f"{general_phenotypes_dir_path}/{sig_phenotype_id}.json" + with open(file_to_be_generated_path, "w") as phenotype_file: + phenotype_file.write(json_obj_str) + shutil.rmtree(input_path) + print("Finished") + + chain(process_parquet(), process_temp_folder()) +impc_spa_gene_significant_phenotypes_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_stats_results_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_stats_results_mapper.py new file mode 100644 index 00000000..0d1b9f71 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_stats_results_mapper.py @@ -0,0 +1,52 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset +from impc_etl.utils.impc_spa import write_partitioned_data, process_temp_folder_into_files +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_statistical_results_service_json_asset = create_input_asset("output/impc_web_api/gene_statistical_results_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_stats_results_links_asset = AssetAlias("impc_spa_gene_stats_results") + +@dag( + schedule=[gene_statistical_results_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_stats_results_mapper", + description=textwrap.dedent( + """IMPC SPA gene statistical results DAG.""" + ), + tags=["impc_spa", "gene", "statatistical results"], +) +def impc_spa_gene_stats_results_mapper(): + @with_spark_session + @task + def process_gene_stats_results(): + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + + write_partitioned_data( + spark, + chromosome_map_path=chromosome_map_json_asset.uri, + parquet_path=gene_statistical_results_service_json_asset.uri, + col_to_filter="alleleAccessionId", + temp_folder_path="stats_results_temp_json" + ) + print("Finished") + + @task(outlets=[gene_stats_results_links_asset]) + def process_temp_folder(*, outlet_events): + process_temp_folder_into_files( + chromosome_map_path=chromosome_map_json_asset.uri, + temp_folder_path="stats_results_temp_json", + file_name="stats-results.json", + asset_alias=gene_stats_results_links_asset, + outlet_events=outlet_events + ) + print("Finished") + + chain(process_gene_stats_results(), process_temp_folder()) +impc_spa_gene_stats_results_mapper() \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_spa/impc_gene_summaries_mapper.py b/impc_etl/jobs/load/impc_spa/impc_gene_summaries_mapper.py new file mode 100644 index 00000000..0d4479c8 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_gene_summaries_mapper.py @@ -0,0 +1,55 @@ +import json +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, Asset, Metadata, dag +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_summary_service_json_asset = create_input_asset("output/impc_web_api/gene_summary_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +gene_summaries_asset = AssetAlias("impc_spa_gene_summaries") + +@dag( + schedule=[gene_summary_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_gene_summaries_mapper", + description=textwrap.dedent( + """IMPC SPA gene summaries mapper DAG.""" + ), + tags=["impc_spa", "gene", "summary"], +) +def impc_spa_gene_summaries_mapper(): + @with_spark_session + @task(outlets=[gene_summaries_asset]) + def process_gene_summaries(*, outlet_events): + import os + from pyspark.sql import SparkSession + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + gene_summary_service_json_path = gene_summary_service_json_asset.uri + gene_summary_df = spark.read.json(gene_summary_service_json_path) + gene_list = map(lambda row: row.asDict(), gene_summary_df.collect()) + output_path = f"{get_data_release_work_dir()}/output/impc_spa/" + for gene in gene_list: + chromosome = chromosome_map_json[gene["mgiGeneAccessionId"]] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + + gene_accession_id = gene["mgiGeneAccessionId"].replace(":", "_") + gene_dir_path = f"{chromosome_folder}/{gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + + + gene_summary_path = f"{gene_dir_path}/gene-summary.json" + with open(gene_summary_path, "w") as gene_file: + gene_file.write(json.dumps(gene)) + outlet_events[gene_summaries_asset].add(Asset(f"file://{gene_summary_path}")) + process_gene_summaries() +impc_spa_gene_summaries_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_images_mapper.py b/impc_etl/jobs/load/impc_spa/impc_images_mapper.py new file mode 100644 index 00000000..e2eafd3f --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_images_mapper.py @@ -0,0 +1,129 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.impc_spa import generate_valid_json_from_file +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +images_service_json_asset = create_input_asset("output/impc_web_api/images_service_json") +chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json") +images_asset = AssetAlias("impc_spa_images") + +@dag( + schedule=[images_service_json_asset, chromosome_map_json_asset], + dag_id=f"{dr_tag}_impc_spa_images_mapper", + description=textwrap.dedent( + """IMPC SPA images mapper DAG.""" + ), + tags=["impc_spa", "images"], +) +def impc_spa_images_mapper(): + @with_spark_session + @task + def process_images_parquet(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, regexp_replace + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + parquet_df = spark.read.json(images_service_json_asset.uri) + control_images_df = parquet_df.filter(col("biologicalSampleGroup") == "control") + + result_df = chromosome_map_df.join(parquet_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col("biologicalSamplegroup").isNotNull()) + result_df = result_df.filter(col("biologicalSampleGroup") == "experimental") + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + (result_df + .repartition("mgiGeneAccessionId") + .write + .mode("overwrite") + .partitionBy("mgiGeneAccessionId", "parameterStableId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/experimental_images_temp_json") + ) + (control_images_df + .repartition("parameterStableId") + .write + .mode("overwrite") + .partitionBy("parameterStableId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/control_images_temp_json") + ) + print("Finished") + + @task + def process_control_folder(): + from glob import iglob + import os + import shutil + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/control_images_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa/control_images_by_parameter" + os.makedirs(output_path, exist_ok=True) + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + parameter_stable_id = parent_dir.split("=")[1] + file_to_be_generated_path = f"{output_path}/{parameter_stable_id}.json" + with open(file_to_be_generated_path, "w") as output_file: + output_file.write(generate_valid_json_from_file(file_path)) + shutil.rmtree(input_path) + print("Finished") + + @task + def process_experimental_folder(): + from glob import iglob + from urllib.parse import unquote, urlparse + import json + import os + import shutil + + chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + control_images_path = f"{get_data_release_work_dir()}/output/impc_spa/control_images_by_parameter" + input_path = f"{get_data_release_work_dir()}/output/impc_spa/experimental_images_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + + + for file_path in iglob(f"{input_path}/**/*.json", recursive=True): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + parameter_stable_id = parent_dir.split("=")[1] + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + general_gene_images_path = f"{gene_dir_path}/images" + os.makedirs(general_gene_images_path, exist_ok=True) + parameter_images_path = f"{general_gene_images_path}/{parameter_stable_id}" + os.makedirs(parameter_images_path, exist_ok=True) + # writing experimental images + file_to_be_generated_path = f"{parameter_images_path}/mutant.json" + with open(file_to_be_generated_path, "w") as gene_file: + gene_file.write(generate_valid_json_from_file(file_path)) + # copying control images to gene dir + control_images_file_path = f"{control_images_path}/{parameter_stable_id}.json" + if os.path.exists(control_images_file_path): + shutil.copy(control_images_file_path, f"{parameter_images_path}/control.json") + shutil.rmtree(input_path) + shutil.rmtree(control_images_path) + print("Finished") + + chain(process_images_parquet(), process_control_folder(), process_experimental_folder()) +impc_spa_images_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_phenotype_genotypehits_mapper.py b/impc_etl/jobs/load/impc_spa/impc_phenotype_genotypehits_mapper.py new file mode 100644 index 00000000..90bcdeb9 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_phenotype_genotypehits_mapper.py @@ -0,0 +1,68 @@ +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, dag, chain, Asset +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +phenotype_genotype_hits_service_json_asset = create_input_asset("output/impc_web_api/phenotype_genotype_hits_service_json") +phenotype_genotypehits_asset = AssetAlias("impc_spa_phenotype_genotypehits") + +@dag( + schedule=[phenotype_genotype_hits_service_json_asset], + dag_id=f"{dr_tag}_impc_spa_phenotype_genotypehits_mapper", + description=textwrap.dedent( + """IMPC SPA phenotype genotypehits mapper DAG.""" + ), + tags=["impc_spa", "phenotype", "genotypehits"], +) +def impc_spa_phenotype_summaries_mapper(): + @with_spark_session + @task + def process_phenotype_genotypehits_parquet(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import regexp_replace + + spark = SparkSession.builder.getOrCreate() + + phenotype_genotype_hits_service_json_path = phenotype_genotype_hits_service_json_asset.uri + phenotype_genotypehits_df = spark.read.json(phenotype_genotype_hits_service_json_path) + result_df = phenotype_genotypehits_df.withColumn("phenotypeId", regexp_replace("phenotypeId", ":", "_")) + (result_df + .repartition("phenotypeId") + .write + .mode("overwrite") + .partitionBy("phenotypeId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/phenotype_genotypehits_temp_json") + ) + + @task(outlets=[phenotype_genotypehits_asset]) + def process_temp_folder(*, outlet_events): + import os + import shutil + from glob import iglob + def generate_valid_json_from_file(file_path): + file_data = open(file_path, 'r') + lines = file_data.readlines() + return f"[{','.join(lines)}]" + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/phenotype_genotypehits_temp_json" + output_path = f"{get_data_release_work_dir()}/output/impc_spa/phenotypes/" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + phenotype_id = parent_dir.split("=")[1] + + phenotype_dir_path = f"{output_path}/{phenotype_id}" + os.makedirs(phenotype_dir_path, exist_ok=True) + phenotype_genotypehits_path = f"{phenotype_dir_path}/genotype-hits.json" + + with open(phenotype_genotypehits_path, "w") as phenotype_file: + phenotype_file.write(generate_valid_json_from_file(file_path)) + outlet_events[phenotype_genotypehits_asset].add(Asset(f"file://{phenotype_genotypehits_path}")) + shutil.rmtree(input_path) + chain(process_phenotype_genotypehits_parquet(), process_temp_folder()) +impc_spa_phenotype_summaries_mapper() diff --git a/impc_etl/jobs/load/impc_spa/impc_phenotype_summaries_mapper.py b/impc_etl/jobs/load/impc_spa/impc_phenotype_summaries_mapper.py new file mode 100644 index 00000000..8edeadf9 --- /dev/null +++ b/impc_etl/jobs/load/impc_spa/impc_phenotype_summaries_mapper.py @@ -0,0 +1,45 @@ +import json +import logging +import textwrap +from airflow.sdk import Variable, task, AssetAlias, Asset, Metadata, dag +from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +phenotype_summary_service_json_asset = create_input_asset("output/impc_web_api/phenotype_summary_service_json") +phenotype_summaries_asset = AssetAlias("impc_spa_phenotype_summaries") + +@dag( + schedule=[phenotype_summary_service_json_asset], + dag_id=f"{dr_tag}_impc_spa_phenotype_summaries_mapper", + description=textwrap.dedent( + """IMPC SPA phenotype summaries mapper DAG.""" + ), + tags=["impc_spa", "phenotype", "summary"], +) +def impc_spa_phenotype_summaries_mapper(): + @with_spark_session + @task(outlets=[phenotype_summaries_asset]) + def process_phenotype_summaries(*, outlet_events): + import os + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + + phenotype_summary_service_json_path = phenotype_summary_service_json_asset.uri + phenotype_summary_df = spark.read.json(phenotype_summary_service_json_path) + phenotype_list = map(lambda row: row.asDict(), phenotype_summary_df.collect()) + output_path = f"{get_data_release_work_dir()}/output/impc_spa/phenotypes/" + os.makedirs(output_path, exist_ok=True) + for phenotype in phenotype_list: + phenotypeId = phenotype["phenotypeId"].replace(":", "_") + phenotype_dir_path = f"{output_path}/{phenotypeId}" + os.makedirs(phenotype_dir_path, exist_ok=True) + phenotype_summary_path = f"{phenotype_dir_path}/summary.json" + with open(phenotype_summary_path, "w") as phenotype_file: + phenotype_file.write(json.dumps(phenotype)) + outlet_events[phenotype_summaries_asset].add(Asset(f"file://{phenotype_summary_path}")) + process_phenotype_summaries() +impc_spa_phenotype_summaries_mapper() diff --git a/impc_etl/utils/impc_spa.py b/impc_etl/utils/impc_spa.py new file mode 100644 index 00000000..b41a4c55 --- /dev/null +++ b/impc_etl/utils/impc_spa.py @@ -0,0 +1,87 @@ +from pyspark.sql.functions import col, regexp_replace +from glob import iglob +from urllib.parse import unquote, urlparse +from airflow.sdk import Asset +from impc_etl.utils.airflow import get_data_release_work_dir +import os +import json +import shutil + + +def write_partitioned_data( + spark, + chromosome_map_path: str, + parquet_path: str, + col_to_filter: str, + temp_folder_path: str, + filtered_dataframe_fn: callable = lambda df: df, +): + + + chromosome_map_json_path = unquote(urlparse(chromosome_map_path).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"]) + parquet_df = spark.read.json(parquet_path) + result_df = filtered_dataframe_fn(parquet_df) + result_df = chromosome_map_df.join(result_df, "mgiGeneAccessionId", "left_outer") + result_df = result_df.filter(col(col_to_filter).isNotNull()) + result_df = result_df.drop("chromosome") + result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_")) + (result_df + .repartition("mgiGeneAccessionId") + .write + .option("header", True) + .mode("overwrite") + .partitionBy("mgiGeneAccessionId") + .json(f"{get_data_release_work_dir()}/output/impc_spa/{temp_folder_path}") + ) + + +def generate_valid_json_from_file(file_path): + file_data = open(file_path, 'r') + lines = file_data.readlines() + return f"[{','.join(lines)}]" + +def process_temp_folder_into_files( + chromosome_map_path: str, + temp_folder_path: str, + file_name: str, + asset_alias, + outlet_events +): + + chromosome_map_json_path = unquote(urlparse(chromosome_map_path).path) + chromosome_map_json = json.loads(open(chromosome_map_json_path).read()) + + input_path = f"{get_data_release_work_dir()}/output/impc_spa/{temp_folder_path}" + output_path = f"{get_data_release_work_dir()}/output/impc_spa" + for file_path in iglob(f"{input_path}/**/*.json"): + filepath_parts = file_path.split("/") + filepath_parts.pop() + parent_dir = filepath_parts.pop() + mgi_gene_accession_id = parent_dir.split("=")[1] + original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":") + + chromosome = chromosome_map_json[original_mgi_gene_accession_id] + chromosome_folder = f"{output_path}/{chromosome}" + os.makedirs(chromosome_folder, exist_ok=True) + + gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}" + os.makedirs(gene_dir_path, exist_ok=True) + file_to_be_generated_path = f"{gene_dir_path}/{file_name}" + with open(file_to_be_generated_path, "w") as gene_file: + gene_file.write(generate_valid_json_from_file(file_path)) + # outlet_events[asset_alias].add(Asset(f"file://{file_to_be_generated_path}")) + shutil.rmtree(input_path) + +def create_gentar_crispr_report_df( + spark, + asset_path: str, +): + product_report_path = unquote(urlparse(asset_path).path) + file_data = open(product_report_path, 'r') + lines = filter(lambda line: line.replace("\n", ""), file_data.readlines()) + lines = filter(None, lines) + json_str = "[" + ",".join(lines) + "]" + product_report_json = json.loads(json_str) + return spark.createDataFrame(product_report_json) \ No newline at end of file