Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/api/routers/map.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""""Provide mapping router"""
import logging
from pathlib import Path

from cool_seq_tool.schemas import AnnotationLayer
Expand Down Expand Up @@ -45,6 +46,8 @@
prefix="/api/v1", tags=["mappings"], responses={404: {"description": "Not found"}}
)

_logger = logging.getLogger(__name__)


@router.post(path="/map/{urn}", status_code=200, response_model=ScoresetMapping)
@with_mavedb_score_set
Expand All @@ -57,7 +60,7 @@ async def map_scoreset(urn: str, store_path: Path | None = None) -> JSONResponse
try:
metadata = get_scoreset_metadata(urn, store_path)
records = get_scoreset_records(metadata, True, store_path)
metadata = patch_target_sequence_type(metadata, records)
metadata = patch_target_sequence_type(metadata, records, force=False)
except ScoresetNotSupportedError as e:
return JSONResponse(
content=ScoresetMapping(
Expand Down
13 changes: 13 additions & 0 deletions src/api/server_main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
"""FastAPI server file"""
import logging

import uvicorn
from fastapi import FastAPI

from api.routers import map
from dcd_mapping import dcd_mapping_version

logging.basicConfig(
format="%(levelname)s:%(name)s:%(message)s",
level=logging.INFO,
force=True,
)
_logger = logging.getLogger(__name__)

app = FastAPI()

app.include_router(map.router)

msg = f"Starting DCD Mapping server v{dcd_mapping_version})"
_logger.info(msg)


# If the application is not already being run within a uvicorn server, start uvicorn here.
if __name__ == "__main__":
Expand Down
30 changes: 27 additions & 3 deletions src/dcd_mapping/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import subprocess
import tempfile
from collections.abc import Mapping
from pathlib import Path
from urllib.parse import urlparse

Expand All @@ -19,7 +20,7 @@
ScoresetNotSupportedError,
)
from dcd_mapping.lookup import get_chromosome_identifier, get_gene_location
from dcd_mapping.mavedb_data import LOCAL_STORE_PATH
from dcd_mapping.mavedb_data import LOCAL_STORE_PATH, patch_target_sequence_type
from dcd_mapping.resource_utils import http_download
from dcd_mapping.schemas import (
AlignmentResult,
Expand Down Expand Up @@ -441,7 +442,7 @@ def parse_cdot_mapping(cdot_mapping: dict, silent: bool) -> AlignmentResult:

def build_alignment_result(
metadata: ScoresetMetadata, silent: bool
) -> dict[str, AlignmentResult | None]:
) -> Mapping[str, AlignmentResult | None]:
# NOTE: Score set must contain all accession-based target genes or all sequence-based target genes
# This decision was made because it is most efficient to run BLAT all together, so the alignment function
# works on an entire score set rather than per target gene.
Expand All @@ -462,7 +463,30 @@ def build_alignment_result(
score_set_type = "sequence"

if score_set_type == "sequence":
alignment_result = align(metadata, silent)
try:
alignment_result = align(metadata, silent)
except AlignmentError as e:
failed_at_nucleotide_level = any(
target_gene.target_sequence_type == TargetSequenceType.DNA
for target_gene in metadata.target_genes.values()
)

if failed_at_nucleotide_level:
msg = f"BLAT alignment failed for {metadata.urn} at the nucleotide level. This alignment will be retried at the protein level."
_logger.warning(msg)
else:
raise AlignmentError from e

# So long as force=True, the content of the records dict is irrelevant.
try:
alignment_result = align(
patch_target_sequence_type(metadata, {}, force=True), silent
)
except AlignmentError as e2:
msg = f"BLAT alignment failed for {metadata.urn} at the protein level after failing at the nucleotide level."
_logger.error(msg)
raise AlignmentError(msg) from e2

else:
alignment_result = fetch_alignment(metadata, silent)

Expand Down
2 changes: 1 addition & 1 deletion src/dcd_mapping/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ async def map_scoreset_urn(
try:
metadata = get_scoreset_metadata(urn, store_path)
records = get_scoreset_records(metadata, silent, store_path)
metadata = patch_target_sequence_type(metadata, records)
metadata = patch_target_sequence_type(metadata, records, force=False)
except ScoresetNotSupportedError as e:
_emit_info(f"Score set not supported: {e}", silent, logging.ERROR)
final_output = write_scoreset_mapping_to_json(
Expand Down
36 changes: 28 additions & 8 deletions src/dcd_mapping/mavedb_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,21 +326,41 @@ def get_scoreset_records(


def patch_target_sequence_type(
metadata: ScoresetMetadata, records: dict
metadata: ScoresetMetadata, records: dict, force: bool = False
) -> ScoresetMetadata:
"""If target sequence type is DNA but no nucleotide variants are defined, treat the target as if
it were a protein level target.
This avoids BLAT errors in cases where the target sequence was codon-optimized
for a non-human organism
"""
patch_sequence_type = force or any(
target.target_sequence_type == TargetSequenceType.DNA
and not any(record.hgvs_nt for record in records.get(target_label, []))
for target_label, target in metadata.target_genes.items()
)

if not patch_sequence_type:
msg = f"Not patching target sequence type for {metadata.urn}. Either force=True (was {force}), or at least one target has nucleotide-level variants."
_logger.debug(msg)
return metadata

for target_label, target in metadata.target_genes.items():
if target.target_sequence_type == TargetSequenceType.DNA and not any(
record.hgvs_nt for record in records.get(target_label, [])
):
msg = f"Changing target sequence type for {metadata.urn} target {target_label} from DNA to protein because target only has protein-level variants"
_logger.info(msg)
target.target_sequence = _get_protein_sequence(target.target_sequence)
target.target_sequence_type = TargetSequenceType.PROTEIN
if not target.target_sequence:
msg = f"Cannot patch target sequence type for {metadata.urn} target {target_label} because no target sequence is available."
_logger.debug(msg)
continue

if target.target_gene_category != "protein_coding":
msg = f"Cannot patch target sequence type for {metadata.urn} target {target_label} because target gene category is {target.target_gene_category}, not protein_coding."
_logger.debug(msg)
continue

msg = f"Changing target sequence type for {metadata.urn} target {target_label} from DNA to protein. (force was {force})."
_logger.info(msg)

target.target_sequence = _get_protein_sequence(target.target_sequence)
target.target_sequence_type = TargetSequenceType.PROTEIN

return metadata


Expand Down
3 changes: 2 additions & 1 deletion src/dcd_mapping/transcripts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Select best reference sequence."""
import logging
import re
from collections.abc import Mapping

from Bio.Data.CodonTable import IUPACData
from Bio.Seq import Seq
Expand Down Expand Up @@ -345,7 +346,7 @@ async def select_transcript(
async def select_transcripts(
scoreset_metadata: ScoresetMetadata,
records: dict[str, list[ScoreRow]],
align_results: dict[str, AlignmentResult | None],
align_results: Mapping[str, AlignmentResult | None],
) -> dict[str, TxSelectResult | Exception | None]:
"""Select appropriate human reference sequence for each target in a score set.
:param scoreset_metadata: Metadata for score set from MaveDB API
Expand Down
1 change: 1 addition & 0 deletions src/dcd_mapping/vrs_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ def _construct_vrs_allele(
) -> Allele | Haplotype:
alleles: list[Allele] = []
for hgvs_string in hgvs_strings:
_logger.info("Processing HGVS string: %s", hgvs_string)
allele = translate_hgvs_to_vrs(hgvs_string)

if pre_map:
Expand Down