11import os
22import re
3- import typer
43import logging
4+ import typer
55import numpy as np
6+ from pathlib import Path
67from datetime import datetime
78from typing import List , Optional , Dict , Tuple , Any , Final , Literal
8- from operator import itemgetter , attrgetter
9-
109import Bio .SeqIO
10+ from enum import Enum
11+
1112
1213from .models import (
1314 HLAStandard ,
1718 HLAResultRow ,
1819)
1920
21+
22+ class HLAType (str , Enum ):
23+ A = "A"
24+ B = "B"
25+ C = "C"
26+
27+
2028DATE_FORMAT = "%a %b %d %H:%M:%S %Z %Y"
2129
2230HLA_TYPES = Final [Literal ["A" , "B" , "C" ]]
@@ -86,12 +94,13 @@ class EasyHLA:
8694
8795 COLUMN_IDS : Final [Dict [str , int ]] = {"A" : 0 , "B" : 2 , "C" : 4 }
8896
89- def __init__ (self , letter : HLA_TYPES ):
97+ def __init__ (self , letter : HLA_TYPES , logger : Optional [ logging . Logger ] = None ):
9098 if letter .upper () not in ["A" , "B" , "C" ]:
9199 raise ValueError ("Invalid HLA Type!" )
92100 self .letter : str = letter .upper ()
93101 self .hla_stds : List [HLAStandard ] = self .load_hla_stds (letter = self .letter )
94102 self .hla_freqs : Dict [str , int ] = self .load_hla_frequencies (letter = self .letter )
103+ self .log = logger or logging .Logger (__name__ , logging .ERROR )
95104
96105 def check_length (self , letter : HLA_TYPES , seq : str , name : str ) -> bool :
97106 error_condition : bool = False
@@ -121,6 +130,16 @@ def check_length(self, letter: HLA_TYPES, seq: str, name: str) -> bool:
121130 )
122131 return True
123132
133+ def print (
134+ self ,
135+ message : Any ,
136+ log_level : int = logging .INFO ,
137+ to_stdout : Optional [bool ] = None ,
138+ ) -> None :
139+ self .log .log (level = log_level , msg = message )
140+ if to_stdout :
141+ print (message )
142+
124143 def check_bases (self , seq : str , name : str ) -> bool :
125144 if not re .match (r"^[ATGCRYKMSWNBDHV]+$" , seq ):
126145 raise ValueError (f"Sequence { name } has invalid characters" )
@@ -316,6 +335,7 @@ def interpret(
316335 entry : Bio .SeqIO .SeqRecord ,
317336 unmatched : List [List [Bio .SeqIO .SeqRecord ]],
318337 threshold : Optional [int ] = None ,
338+ to_stdout : Optional [bool ] = None ,
319339 ) -> Optional [HLAResult ]:
320340 samp = entry .description
321341
@@ -382,8 +402,16 @@ def interpret(
382402
383403 matching_stds = self .get_matching_stds (seq , self .hla_stds )
384404 if len (matching_stds ) == 0 :
385- print (f"Sequence { samp } did not match any known alleles." )
386- print ("Please check the locus and the orientation." )
405+ self .print (
406+ f"Sequence { samp } did not match any known alleles." ,
407+ log_level = logging .WARN ,
408+ to_stdout = to_stdout ,
409+ )
410+ self .print (
411+ "Please check the locus and the orientation." ,
412+ log_level = logging .WARN ,
413+ to_stdout = to_stdout ,
414+ )
387415 return None
388416
389417 # Now, combine all the stds (pick up that can citizen!)
@@ -396,13 +424,24 @@ def interpret(
396424 for i , combos in all_combos_sorted :
397425 if i > threshold :
398426 if i == 0 :
399- print ("No matches found below specified threshold." )
400- print ("Please heck the locus, orientation, and/or increase" )
401- print ("number of mismatches." )
427+ self .print (
428+ "No matches found below specified threshold." ,
429+ log_level = logging .WARN ,
430+ to_stdout = to_stdout ,
431+ )
432+ self .print (
433+ "Please heck the locus, orientation, and/or increase" ,
434+ log_level = logging .WARN ,
435+ to_stdout = to_stdout ,
436+ )
437+ self .print (
438+ "number of mismatches." ,
439+ log_level = logging .WARN ,
440+ to_stdout = to_stdout ,
441+ )
402442 break
403443 for cons in combos :
404444 for pair in cons .discrete_allele_names :
405- # print(" - ".join(pair))
406445 misstrings = []
407446 _seq = [int (nuc ) for nuc in cons .standard .split ("-" )]
408447 for n in range (len (_seq )):
@@ -414,8 +453,16 @@ def interpret(
414453 else :
415454 dex = n + 1
416455 misstrings .append (f"{ dex } :{ base } ->{ correct_base } " )
417- # print(";".join(misstrings) + ",")
418- # print(f"{exon2},{intron},{exon3}")
456+ self .print (
457+ ";" .join (misstrings ) + "," ,
458+ log_level = logging .INFO ,
459+ to_stdout = to_stdout ,
460+ )
461+ self .print (
462+ f"{ exon2 } ,{ intron } ,{ exon3 } " ,
463+ log_level = logging .INFO ,
464+ to_stdout = to_stdout ,
465+ )
419466
420467 best_matches = all_combos_sorted [0 ][1 ]
421468 mismatch_count = all_combos_sorted [0 ][0 ]
@@ -488,24 +535,38 @@ def interpret(
488535 return HLAResult (result = row , num_pats = 1 , num_seqs = nseqs )
489536
490537 def report_unmatched_sequences (
491- self , unmatched : List [List [Bio .SeqIO .SeqRecord ]]
538+ self ,
539+ unmatched : List [List [Bio .SeqIO .SeqRecord ]],
540+ to_stdout : Optional [bool ] = None ,
492541 ) -> None :
493542 for exon in [2 , 3 ]:
494543 for entry in unmatched [exon % 2 ]:
495- print (f"No matching exon{ 3 - exon % 2 } for { entry .description } " )
544+ self .print (
545+ f"No matching exon{ 3 - exon % 2 } for { entry .description } " ,
546+ to_stdout = to_stdout ,
547+ )
496548
497549 def run (
498550 self ,
499551 letter : HLA_TYPES ,
500552 filename : str ,
501553 output_filename : str ,
502554 threshold : Optional [int ] = None ,
555+ to_stdout : Optional [bool ] = None ,
503556 ):
504557 rows = []
505558 npats = 0
506559 nseqs = 0
507560 time_start = datetime .now ()
508561 unmatched : List [List [Bio .SeqIO .SeqRecord ]] = [[], []]
562+ self .print (
563+ f"Run commencing { time_start .strftime (DATE_FORMAT )} . Allele definitions last updated { self .load_allele_definitions_last_modified_time ().strftime (DATE_FORMAT )} ." ,
564+ to_stdout = to_stdout ,
565+ )
566+ self .print (
567+ "ENUM,ALLELES_CLEAN,ALLELES,AMBIGUOUS,HOMOZYGOUS,MISMATCH_COUNT,MISMATCHES,EXON2,INTRON,EXON3" ,
568+ to_stdout = to_stdout ,
569+ )
509570 with open (filename , "r" , encoding = "utf-8" ) as f :
510571 fasta = Bio .SeqIO .parse (f , "fasta" )
511572 for i , entry in enumerate (fasta ):
@@ -519,10 +580,16 @@ def run(
519580 continue
520581 else :
521582 rows .append (result .result .get_result_as_str ())
583+ self .print (result .result .get_result_as_str (), to_stdout = to_stdout )
522584 npats += result .num_pats
523585 nseqs += result .num_seqs
524586
525- self .report_unmatched_sequences (unmatched )
587+ self .report_unmatched_sequences (unmatched , to_stdout = to_stdout )
588+ self .print (
589+ f"{ npats } patients, { nseqs } sequences processed." , to_stdout = to_stdout
590+ )
591+
592+ self .log .info (f"% patients, % sequences processed." , npats , nseqs )
526593
527594 with open (output_filename , "w" , encoding = "utf-8" ) as f :
528595 f .write (
@@ -564,8 +631,6 @@ def get_clean_alleles(self, all_alleles: List[List[str]]) -> str:
564631 [a [0 ].strip ().split (":" ), a [1 ].strip ().split (":" )] for a in all_alleles
565632 ]
566633
567- print (all_alleles )
568-
569634 clean_allele : List [str ] = []
570635 for n in [0 , 1 ]:
571636 for i in [4 , 3 , 2 , 1 ]:
@@ -576,7 +641,6 @@ def get_clean_alleles(self, all_alleles: List[List[str]]) -> str:
576641 break
577642
578643 clean_allele_str : str = " - " .join (clean_allele )
579- print (clean_allele_str )
580644 return clean_allele_str
581645
582646 def get_alleles (
@@ -660,17 +724,3 @@ def sort_allele(item: Tuple[str, int]):
660724 alleles .remove (a )
661725
662726 return ambig , alleles
663-
664-
665- if __name__ == "__main__" :
666- input_file = "tests/input/test.fasta"
667- output_file = "tests/output/test.csv"
668-
669- easyhla = EasyHLA ("A" )
670-
671- easyhla .run (
672- easyhla .letter ,
673- input_file ,
674- output_file ,
675- 0 ,
676- )
0 commit comments