from __future__ import annotations
import contextlib
import tempfile
from pathlib import Path
from threading import Thread
from typing import TYPE_CHECKING, Union
from pxblat.extc import ClientOption, pygfClient
from pxblat.parser import read
from .basic import wait_server_ready
if TYPE_CHECKING:
from .server import ServerOption
INSEQ = Union[str, Path]
INSEQS = Union[list[INSEQ], list[str], list[Path]]
[docs]
def copy_client_option(option: ClientOption) -> ClientOption:
"""Copies the ClientOption object."""
new_option = ClientOption()
new_option.hostName = option.hostName
new_option.portName = option.portName
new_option.tType = option.tType
new_option.qType = option.qType
new_option.dots = option.dots
new_option.nohead = option.nohead
new_option.minScore = option.minScore
new_option.minIdentity = option.minIdentity
new_option.outputFormat = option.outputFormat
new_option.maxIntron = option.maxIntron
new_option.genome = option.genome
new_option.genomeDataDir = option.genomeDataDir
new_option.isDynamic = option.isDynamic
new_option.SeqDir = option.SeqDir
new_option.inName = option.inName
new_option.outName = option.outName
new_option.inSeq = option.inSeq
return new_option
[docs]
def create_client_option():
"""Creates a new ClientOption object with default values.
Return:
ClientOption object
See Also:
:class:`.ClientOption`
Examples:
>>> option = create_client_option().build()
>>> option
ClientOption(hostName=, portName=, tType=dna, qType=dna, dots=0, nohead=false, minScore=30, minIdentity=90,
outputFormat=psl, maxIntron=750000, genome=, genomeDataDir=, isDynamic=false,
tSeqDir=, inName=, outName=)
>>> option = create_client_option().withPort("66666").build()
>>> option
ClientOption(hostName=, portName=66666, tType=dna, qType=dna, dots=0, nohead=false, minScore=30, minIdentity=90,
outputFormat=psl, maxIntron=750000, genome=, genomeDataDir=, isDynamic=false,
tSeqDir=, inName=, outName=)
"""
return ClientOption()
def _resolve_host_port(
client_option: ClientOption,
host: str | None,
port: int | None,
):
"""Resolves the host and port for the client option.
Args:
client_option: ClientOption
host: Optional[str]
port: Optional[int]
"""
if host is not None:
client_option.hostName = host
if port is not None:
client_option.portName = str(port)
if not client_option.hostName and not client_option.portName:
msg = "host and port are both empty"
raise ValueError(msg)
def query_server_by_file(
option: ClientOption,
host: str | None = None,
port: int | None = None,
*,
parse: bool = True,
):
"""Sends a query to the server and returns the result.
Args:
option: ClientOption
host: Optional[str]
port: Optional[int]
seqname: Optional[str]
parse: bool
Returns:
str or bytes: The result of the query.
"""
_resolve_host_port(option, host, port)
ret = pygfClient(option)
try:
ret_decode = ret.decode().rsplit(",\n", 1)[0] # type: ignore
except UnicodeDecodeError:
ret_decode = ret.decode("latin-1").rsplit(",\n", 1)[0] # type: ignore
if parse and ret_decode:
try:
ret = read(ret_decode, "psl")
except ValueError as e:
if "No query results" in str(e):
return None
else:
return ret
return ret_decode
[docs]
def query_server(
option: ClientOption,
host: str | None = None,
port: int | None = None,
seqname: str | None = None,
*,
parse: bool = True,
):
"""Sends a query to the server and returns the result.
Args:
option: ClientOption
host: Optional[str]
port: Optional[int]
seqname: Optional[str]
parse: bool
Returns:
str or bytes: The result of the query.
"""
_resolve_host_port(option, host, port)
fafile = None
if not option.inName and not option.inSeq:
msg = "inName and inSeq are both empty"
raise ValueError(msg)
seqid = None
temp_file_path = None
prev_in_name = option.inName # Store original to restore later
try:
if option.inSeq:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as fafile:
temp_file_path = fafile.name
seqname = fafile.name if seqname is None else seqname
seqid = f"{option.inSeq[:5]}_{len(option.inSeq)}"
fafile.write(f">{seqid}\n")
fafile.write(option.inSeq)
option.inName = temp_file_path
ret = pygfClient(option)
try:
ret_decode = ret.decode().rsplit(",\n", 1)[0] # type: ignore
except UnicodeDecodeError:
ret_decode = ret.decode("latin-1").rsplit(",\n", 1)[0] # type: ignore
if not parse:
return ret_decode
try:
res = read(ret_decode, "psl", seqid=seqid)
except ValueError as e:
if "No query results" in str(e):
return None
raise
else:
return res
finally:
if temp_file_path is not None:
with contextlib.suppress(FileNotFoundError):
Path(temp_file_path).unlink()
# Restore original inName to avoid side effects
option.inName = prev_in_name
[docs]
class ClientThread(Thread):
"""A class for managing client connections to a server in a separate thread.
This class can be used to query a gfServer in a separate thread, and can optionally wait until the server is ready before
sending a query. It can also parse the result of the query.
Attributes:
option (ClientOption): Client options for the connection.
host (str, optional): The hostname or IP address of the server. Defaults to None.
port (int, optional): The port number of the server. Defaults to None.
wait_ready (bool, optional): Whether to wait until the server is ready before sending a query. Defaults to False.
wait_timeout (int, optional): The number of seconds to wait for the server to be ready. Defaults to 60.
server_option (ServerOption, optional): The server options to use if a server is not provided. Defaults to None.
seqname (str, optional): The sequence name to use for the query. Defaults to None.
parse (bool, optional): Whether to parse the result of the query. Defaults to True.
daemon (bool, optional): Whether to run the client as a daemon process. Defaults to True.
result: The result of the query, or None if the query has not yet been sent or the result has not yet been received.
Order:
-10
"""
[docs]
def __init__(
self,
option: ClientOption,
host: str | None = None,
port: int | None = None,
*,
wait_timeout: int = 60,
server_option: ServerOption | None = None,
seqname: str | None = None,
wait_ready: bool = False,
parse: bool = True,
daemon: bool = True,
) -> None:
"""A class for querying a gfServer using a separate thread.
Args:
option: ClientOption
host: Optional[str]
port: Optional[int]
wait_ready: bool
wait_timeout: int
server_option: Optional[ServerOption]
seqname: Optional[str]
parse: bool
daemon: bool
Attributes:
result: The result of the query.
"""
super().__init__(daemon=daemon)
self.option = option
self._host = host
self._port = port
self._resolve_host_port()
self._wait_ready = wait_ready
self._wait_timeout = wait_timeout
self._server_option = server_option
self._seqname = seqname
self._parse = parse
self.result = None
[docs]
def run(self):
"""Runs the query in a separate thread."""
if self._wait_ready:
wait_server_ready(
self.host,
self.port,
timeout=self._wait_timeout,
server_option=self._server_option,
)
ret = query_server(self.option, seqname=self._seqname, parse=self._parse)
self.result = ret
[docs]
def get(self):
"""Sends a query to the server and returns the result."""
self.join()
return self.result
@property
def host(self):
"""The hostname or IP address of the server."""
if self._host is None:
return self.option.hostName
return self._host
@host.setter
def host(self, value: str):
"""Sets the hostname or IP address of the server."""
self._host = value
@property
def port(self):
"""The port number of the server."""
if self._port is None:
return int(self.option.portName)
return self._port
@port.setter
def port(self, value: int):
"""Sets the port number of the server."""
self._port = value
[docs]
@classmethod
def create_option(cls):
"""Creates a new ClientOption object with default values.
Return:
ClientOption object
"""
return create_client_option()
def _resolve_host_port(self):
_resolve_host_port(self.option, self._host, self._port)
[docs]
class Client:
"""A class for managing client connections to a server in a separate thread.
This class can be used to query a gfServer in a separate thread, and can optionally wait until the server is ready before
sending a query. It can also parse the result of the query.
Attributes:
option (ClientOption): Client options for the connection.
host (str, optional): The hostname or IP address of the server. Defaults to None.
port (int, optional): The port number of the server. Defaults to None.
wait_ready (bool, optional): Whether to wait until the server is ready before sending a query. Defaults to False.
wait_timeout (int, optional): The number of seconds to wait for the server to be ready. Defaults to 60.
server_option (ServerOption, optional): The server options to use if a server is not provided. Defaults to None.
seqname (str, optional): The sequence name to use for the query. Defaults to None.
parse (bool, optional): Whether to parse the result of the query. Defaults to True.
daemon (bool, optional): Whether to run the client as a daemon process. Defaults to True.
result: The result of the query, or None if the query has not yet been sent or the result has not yet been received.
Order:
-10
"""
[docs]
def __init__(
self,
host: str,
port: int,
seq_dir: str | Path,
*,
ttype: str = "dna",
qtype: str = "dna",
dots: int = 0,
nohead: bool = False,
min_score: int = 30,
min_identity: float = 90.0,
output_format: str = "psl",
max_intron: int = 750000,
is_dynamic: bool = False,
genome: str | None = None,
genome_data_dir: str | None = None,
server_option: ServerOption | None = None,
wait_ready: bool = False,
wait_timeout: int = 60,
parse: bool = True,
) -> None:
"""A class for querying a gfServer using a separate thread.
Args:
host (str): The hostname or IP address of the server.
port (int): The port number of the server.
seq_dir (Union[str, Path]): The directory where sequence data is stored.
ttype (str, optional): Database type. One of 'dna', 'prot', 'dnax'. Default is 'dna'.
qtype (str, optional): Query type. One of 'dna', 'rna', 'prot', 'dnax', 'rnax'. Default is 'dna'.
dots (int, optional): Output a dot every N query sequences. Default is 0.
nohead (bool, optional): If True, suppresses 5-line psl header. Default is False.
min_score (int, optional): Sets minimum score. Default is 30.
min_identity (float, optional): Sets minimum sequence identity (in percent). Default is 90.
output_format (str, optional): Controls output file format. One of 'psl', 'pslx', 'axt', 'maf', 'sim4', 'wublast', 'blast', 'blast8', 'blast9'. Default is 'psl'.
max_intron (int, optional): Sets maximum intron size. Default is 750000.
is_dynamic (bool, optional): If True, the client is expected to interact with a dynamic gfServer. Default is False.
genome (Optional[str], optional): The genome name when using a dynamic gfServer. Defaults to None.
genome_data_dir (Optional[str], optional): The root directory containing the genome data files for a dynamic gfServer. Defaults to None.
server_option (Optional[ServerOption], optional): The server options to use if a server is not provided. Defaults to None.
wait_ready (bool, optional): If True, wait until the server is ready before sending a query. Default is False.
wait_timeout (int, optional): The number of seconds to wait for the server to be ready. Default is 60.
parse (bool, optional): If True, parse the result of the query. Default is True.
Raises:
ValueError: If any of the input values are invalid.
Examples:
>>> from pxblat import Client
>>> host = "localhost"
>>> port = 65000
>>> seq_dir = "."
>>> two_bit = "./test_ref.2bit"
>>> client = Client(
... host=host,
... port=port,
... seq_dir=seq_dir,
... min_score=20,
... min_identity=90,
... )
"""
self._basic_option = (
ClientOption()
.withHost(host)
.withPort(str(port))
.withMinScore(min_score)
.withMinIdentity(min_identity)
.withTType(ttype)
.withQType(qtype)
.withDots(dots)
.withNohead(nohead)
.withMaxIntron(max_intron)
.withOutputFormat(output_format)
.withIsDynamic(is_dynamic)
)
if genome is not None:
self._basic_option.withGenome(genome)
if genome_data_dir is not None:
self._basic_option.withGenomeDataDir(genome_data_dir)
self._basic_option.withSeqDir(str(seq_dir))
self._wait_ready = wait_ready
self._wait_timeout = wait_timeout
self._server_option = server_option
self._parse = parse
# fmt: off
@property
def seq_dir(self):
"""The directory containing the sequence files."""
return self._basic_option.SeqDir
@seq_dir.setter
def seq_dir(self, value: str | Path): self._basic_option.withSeqDir(str(value))
@property
def ttype(self):
"""The type of the target sequence."""
return self._basic_option.tType
@ttype.setter
def ttype(self, value: str): self._basic_option.withTType(value)
@property
def qtype(self):
"""The type of the query sequence."""
return self._basic_option.qType
@qtype.setter
def qtype(self, value: str): self._basic_option.withQType(value)
@property
def min_score(self):
"""The minimum score for the alignment."""
return int(self._basic_option.minScore)
@min_score.setter
def min_score(self, value: int): self._basic_option.withMinScore(value)
@property
def min_identity(self):
"""The minimum identity for the alignment."""
return self._basic_option.minIdentity
@min_identity.setter
def min_identity(self, value: float): self._basic_option.withMinIdentity(value)
@property
def host(self):
"""The hostname or IP address of the server."""
return self._basic_option.hostName
@host.setter
def host(self, value: str): self._basic_option.withHost(value)
@property
def port(self):
"""The port number of the server."""
return int(self._basic_option.portName)
@port.setter
def port(self, value: int): self._basic_option.withPort(str(value))
@property
def output_format(self):
"""The output format of the alignment."""
return self._basic_option.outputFormat
@output_format.setter
def output_format(self, value: str): self._basic_option.withOutputFormat(value)
@property
def max_intron(self):
"""The maximum intron size for the alignment."""
return self._basic_option.maxIntron
@max_intron.setter
def max_intron(self, value: int): self._basic_option.withMaxIntron(value)
@property
def is_dynamic(self):
"""Whether the server is dynamic."""
return self._basic_option.isDynamic
@is_dynamic.setter
def is_dynamic(self, value: bool): self._basic_option.withIsDynamic(value)
@property
def genome(self):
"""The genome name of the server."""
return self._basic_option.genome
@genome.setter
def genome(self, value: str): self._basic_option.withGenome(value)
@property
def genome_data_dir(self):
"""The genome data directory of the server."""
return self._basic_option.genomeDataDir
@genome_data_dir.setter
def genome_data_dir(self, value: str): self._basic_option.withGenomeDataDir(value)
# fmt: on
@staticmethod
def _verify_input(in_seqs: list[str | Path] | list[str] | list[Path]):
for item in in_seqs:
if isinstance(item, Path) and not item.exists():
msg = f"File {item} does not exist"
raise FileNotFoundError(msg)
if isinstance(item, str) and ("." in item or "/" in item):
new_item = Path(item)
if not new_item.exists():
msg = f"File {item} does not exist"
raise FileNotFoundError(msg)
yield new_item
yield item
def _query(self, in_seq: str | Path):
basic_option = copy_client_option(self._basic_option)
if isinstance(in_seq, Path):
basic_option.withInName(str(in_seq)).withInSeq("").build()
else:
basic_option.withInSeq(str(in_seq)).withInName("").build()
return query_server(basic_option, parse=self._parse)
[docs]
def query(self, in_seqs: INSEQS | list[str] | list[Path] | INSEQ):
"""Query the server with the specified sequences.
Args:
in_seqs: The sequences to query.
Returns:
The query results: `Bio.SearchIO.QueryResult`
Examples:
>>> from pxblat import Client, Server
>>> host = "localhost"
>>> port = 65000
>>> seq_dir = "."
>>> two_bit = "./test_ref.2bit"
>>> client = Client(
... host=host,
... port=port,
... seq_dir=seq_dir,
... min_score=20,
... min_identity=90,
... )
>>> with Server(host, port, two_bit, can_stop=True, step_size=5) as server:
... # work() assume work() is your own function that takes time to prepare something
... server.wait_ready()
... result1 = client.query("ATCG")
... result2 = client.query("AtcG")
... result3 = client.query("test_case1.fa")
... result4 = client.query(["ATCG", "ATCG"])
... result5 = client.query(["test_case1.fa"])
... result6 = client.query(["cgTA", "test_case1.fa"])
... print(result3[0]) # print result
"""
if isinstance(in_seqs, (str, Path)):
in_seqs = [in_seqs]
in_seqs = list(self._verify_input(in_seqs))
if self._wait_ready:
wait_server_ready(
self.host,
self.port,
timeout=self._wait_timeout,
server_option=self._server_option,
)
results = []
for in_seq in in_seqs:
results.append(self._query(in_seq))
return results