from __future__ import annotations
import typing as t
from contextlib import ContextDecorator
from multiprocessing import Process
from pathlib import Path
from pxblat.extc import ServerOption, UsageStats, pystartServer
from .basic import (
check_port_in_use,
files,
find_free_port,
server_query,
status_server,
stop_server,
wait_server_ready,
)
if t.TYPE_CHECKING:
from .status import Status
def _pystartServer(
hostName: str,
portName: str,
seqFiles: list[str],
options: ServerOption,
stats: UsageStats,
):
pystartServer(hostName, portName, len(seqFiles), seqFiles, options, stats)
[docs]
def create_server_option() -> ServerOption:
"""Creates a new ServerOption object with default values.
Returns:
ServerOption: A new ServerOption object with default values.
See Also:
:class:`.ServerOption`
"""
return ServerOption()
[docs]
class Server(ContextDecorator):
"""A context manager and decorator for managing a server process.
This class can be used as a context manager or decorator to manage a server process. It starts the server with the given
options, and can run it as a daemon process or block until it is ready.
Attributes:
host (str): The hostname or IP address to bind the server to.
port (int): The port number to bind the server to.
two_bit (Path | str): The path to the 2bit file or the URL of the 2bit file.
option (ServerOption): The options to use when starting the server.
daemon (bool, optional): Whether to run the server as a daemon process. Defaults to True.
use_others (bool, optional): Whether to allow other users to access the server. Defaults to False.
timeout (int, optional): The number of seconds to wait for the server to start. Defaults to 60.
block (bool, optional): Whether to block until the server is ready. Defaults to False.
Raises:
ValueError: If the given two_bit file or URL is invalid.
OSError: If there is an error starting the server process.
Order:
-10
"""
[docs]
def __init__(
self,
host: str,
port: int,
two_bit: Path | str,
*,
can_stop: bool = True,
mask: bool = False,
tile_size: int = 11,
step_size: int = 11,
max_aa_size: int = 8000,
max_dna_hits: int = 100,
max_gap: int = 2,
max_nt_size: int = 40000,
max_trans_hits: int = 200,
min_match: int = 2,
rep_match: int = 0,
seq_log: bool = False,
ip_log: bool = False,
debug_log: bool = False,
trans: bool = False,
syslog: bool = False,
no_simp_rep_mask: bool = False,
log: str | Path | None = None,
log_facility: str | None = None,
per_seq_max: str | Path | None = None,
index_file: str | Path | None = None,
daemon=True,
use_others: bool = False,
timeout: int = 60,
block: bool = False,
) -> None:
"""Initializes a gfServer object with the given parameters.
Args:
host (str): The hostname or IP address to bind the server to.
port (int): The port number to bind the server to.
two_bit (Path | str): The path to the 2bit file or the URL of the 2bit file.
can_stop (bool, optional): Whether to allow the server to be stopped. Defaults to True.
mask (bool, optional): Whether to use masking from the 2bit file. Defaults to False.
tile_size (int, optional): The size of n-mers to index. Defaults to 11 for nucleotides, 4 for proteins (or translated nucleotides).
step_size (int, optional): The spacing between tiles. Defaults to tileSize.
max_aa_size (int, optional): The maximum size of protein or translated DNA queries. Defaults to 8000.
max_dna_hits (int, optional): The maximum number of hits for a DNA query that are sent from the server. Defaults to 100.
max_gap (int, optional): The number of insertions or deletions allowed between n-mers. Defaults to 2 for nucleotides, 0 for proteins.
max_nt_size (int, optional): The maximum size of untranslated DNA query sequence. Defaults to 40000.
max_trans_hits (int, optional): The maximum number of hits for a translated query that are sent from the server. Defaults to 200.
min_match (int, optional): The number of n-mer matches that trigger detailed alignment. Defaults to 2 for nucleotides, 3 for proteins.
rep_match (int, optional): The number of occurrences of a tile (n-mer) that triggers repeat masking the tile. Defaults to 0.
seq_log (bool, optional): Whether to include sequences in the log file (not logged with syslog). Defaults to False.
ip_log (bool, optional): Whether to include user's IP in the log file (not logged with syslog). Defaults to False.
debug_log (bool, optional): Whether to include debugging info in the log file. Defaults to False.
trans (bool, optional): Whether to translate database to protein in 6 frames, and it is best to run this on RepeatMasked data. Defaults to False.
syslog (bool, optional): Whether to log to syslog. Defaults to False.
no_simp_rep_mask (bool, optional): Whether to suppress simple repeat masking. Defaults to False.
log (str | Path | None, optional): The path to the log file that records server requests. Defaults to None.
log_facility (str | None, optional): The syslog facility to log to. Defaults to None.
per_seq_max (str | Path | None, optional): The path to a file that contains one seq filename (possibly with ':seq' suffix) per line. Defaults to None.
index_file (str | Path | None, optional): The path to the index file created by `gfServer index`.
Saving index can speed up `gfServer` startup by two orders of magnitude. Defaults to None.
daemon (bool, optional): Whether to run the server as a daemon process. Defaults to True.
use_others (bool, optional): Whether to allow other users to access the server. Defaults to False.
timeout (int, optional): The number of seconds to wait for the server to start. Defaults to 60.
block (bool, optional): Whether to block until the server is ready. Defaults to False.
Raises:
ValueError: If the given two_bit file or URL is invalid.
OSError: If there is an error starting the server process.
Returns:
None
Examples:
Create a server object with options.
>>> from pxblat import Server
>>> server = Server("localhost", 65000, "tests/data/test_ref.2bit", can_stop=True, step_size=5)
>>> server.start()
>>> server.wait_ready()
>>> server.stop()
>>> server.can_stop
True
>>> server.step_size = 10
>>> server.step_size
10
"""
self._host = host
self._port = port
self.two_bit = two_bit
log = "" if log is None else str(log)
log_facility = "" if log_facility is None else str(log_facility)
per_seq_max = "" if per_seq_max is None else str(per_seq_max)
index_file = "" if index_file is None else str(index_file)
self.option = (
create_server_option()
.withCanStop(can_stop)
.withLog(log)
.withLogFacility(log_facility)
.withMask(mask)
.withMaxAaSize(max_aa_size)
.withMaxDnaHits(max_dna_hits)
.withMaxGap(max_gap)
.withMaxNtSize(max_nt_size)
.withMaxTransHits(max_trans_hits)
.withMinMatch(min_match)
.withRepMatch(rep_match)
.withSeqLog(seq_log)
.withIpLog(ip_log)
.withDebugLog(debug_log)
.withTileSize(tile_size)
.withStepSize(step_size)
.withTrans(trans)
.withSyslog(syslog)
.withPerSeqMax(per_seq_max)
.withNoSimpRepMask(no_simp_rep_mask)
.withIndexFile(index_file)
)
self.stat = UsageStats()
self.use_others = use_others
self.timeout = timeout
self.daemon = daemon
self._block = block
self._is_ready = False
self._is_open = True
self._process = None
@property
def host(self):
"""The hostname or IP address to bind the server to."""
return self._host
@host.setter
def host(self, value: str):
"""Sets the hostname or IP address to bind the server to."""
self._host = value
@property
def port(self) -> int:
"""The port number to bind the server to."""
return self._port
@port.setter
def port(self, value: int):
"""Sets the port number to bind the server to."""
self._port = value
def _start_b(self):
"""Start server in blocking mode."""
two_bit_file = self.two_bit if isinstance(self.two_bit, str) else self.two_bit.as_posix()
try:
if check_port_in_use(self.host, self.port):
if self.use_others:
self._is_open = False
# WARN: Use server that is already open. However, the server may be not opened by gfServer <05-16-23>
# Hence, the `wait_server_ready` may be timeout.
else:
self._is_open = True
new_port = find_free_port(self.host, start=self.port + 1)
self.port = new_port
pystartServer(
self.host,
str(self.port),
1,
[two_bit_file],
self.option,
self.stat,
)
else:
pystartServer(
self.host,
str(self.port),
1,
[two_bit_file],
self.option,
self.stat,
)
except Exception as e:
raise e
def _start_nb(self):
two_bit_file = self.two_bit if isinstance(self.two_bit, str) else self.two_bit.as_posix()
try:
if check_port_in_use(self.host, self.port):
if self.use_others:
self._is_open = False
else:
self._is_open = True
new_port = find_free_port(self._host, start=self.port + 1)
self.port = new_port
host = self.host
port = self.port
self._process = Process(
target=_pystartServer,
args=(
host,
str(port),
[two_bit_file],
self.option,
self.stat,
),
daemon=self.daemon,
)
else:
self._is_open = True
self._process = Process(
target=_pystartServer,
args=(
self.host,
str(self.port),
[two_bit_file],
self.option,
self.stat,
),
daemon=self.daemon,
)
except Exception as e:
raise e
else:
if self._process is not None:
self._process.start()
def _check(self):
if not Path(self.two_bit).exists():
msg = f"Invalid two_bit file: {self.two_bit}"
raise FileNotFoundError(msg)
[docs]
def start(self):
"""Starts the gfServer instance in either blocking or non-blocking mode.
If the server is set to non-blocking mode, it will start the server in a separate process.
If the server is set to blocking mode, it will start the server in the current process.
Raises:
ValueError: If the given two_bit file or URL is invalid.
"""
self.option.build()
if not self._block:
self._start_nb()
else:
self._start_b()
[docs]
def stop(self):
"""Stops the gfServer instance if it is running.
This method sends a stop signal to the server process, causing it to terminate gracefully.
See Also:
:func:`stop_server` is a free function to stop a server.
"""
if self._is_open:
stop_server(self.host, self.port)
if self._process is not None:
self._process.terminate()
self._is_open = False
self._is_ready = False
[docs]
def status(self, *, instance=False) -> dict[str, str] | Status:
"""Retrieves the status of the gfServer instance.
Args:
instance (bool, optional): If True, returns a Status object. If False, returns a dictionary with status information. Defaults to False.
Returns:
t.Union[t.Dict[str, str], Status]: The status of the gfServer instance, either as a dictionary or a Status object.
See Also:
:func:`status_server` is a free function to query server status.
"""
return status_server(self.host, self.port, self.option, instance=instance)
[docs]
def files(self) -> list[str]:
"""Retrieves the list of files served by the gfServer instance.
Returns:
list[str]: A list of file names served by the gfServer instance.
See Also:
:func:`files` is a free function to query file status for server.
"""
return files(self.host, self.port)
[docs]
def query(self, intype: str, faName: str, *, isComplex: bool, isProt: bool) -> str:
"""Queries the gfServer instance with the given parameters.
Args:
intype (str): The type of input sequence. Must be one of "dna", "rna", or "protein".
faName (str): The name of the input sequence.
isComplex (bool): Whether the input sequence is complex.
isProt (bool): Whether the input sequence is a protein sequence.
Returns:
str: The result of the query as a string.
"""
return server_query(
intype,
self.host,
self.port,
faName,
isComplex=isComplex,
isProt=isProt,
)
[docs]
def is_ready(self) -> bool:
"""Returns True if the server is ready to accept queries, False otherwise.
Returns:
bool: True if the server is ready to accept queries, False otherwise.
"""
return self._is_ready
[docs]
def is_open(self) -> bool:
"""Returns True if the server is open, False otherwise.
Returns:
bool: True if the server is open, False otherwise.
"""
return self._is_open
[docs]
def wait_ready(self, *, restart: bool = False):
"""Wait server ready in block mode.
Args:
timeout: Timeout for wait server ready.
restart: If timeout, restart server and wait again.
Raises:
RuntimeError: If server is not opened by gfServer or the server takes longer time to be ready, the `wait_server_ready` may be timeout.
If `restart` is True, the server will be restarted and wait again.
"""
if not self._is_ready:
try:
wait_server_ready(self.host, self.port, self.timeout, self.option)
except RuntimeError as e:
if restart and self.use_others:
self.use_others = False
self.start()
self.wait_ready(restart=restart)
else:
msg = f"Timeout for Waiting for {self.host} {self.port} server ready due to server is not opened by gfServer or need longer time to wait"
raise RuntimeError(
msg,
) from e
else:
self._is_ready = True
[docs]
def __str__(self) -> str:
"""Return server option as a string."""
return f"Server({self.host}, {self.port}, ready: {self.is_ready()} open: {self.is_open()}\n{self.option})"
__repr__ = __str__
[docs]
def __enter__(self):
"""Starts the gfServer instance in blocking mode when used as a context manager."""
self.start()
return self
[docs]
def __exit__(self, *exc):
"""Stops the gfServer."""
self.stop()
# fmt: off
@property
def can_stop(self) -> bool: return self.option.canStop
@can_stop.setter
def can_stop(self, value: bool): self.option.canStop = value
@property
def log(self) -> str: return self.option.log
@log.setter
def log(self, value: str): self.option.log = value
@property
def log_facility(self) -> str: return self.option.logFacility
@log_facility.setter
def log_facility(self, value: str): self.option.logFacility = value
@property
def mask(self) -> bool: return self.option.mask
@mask.setter
def mask(self, value: bool): self.option.mask = value
@property
def max_aa_size(self) -> int: return self.option.maxAaSize
@max_aa_size.setter
def max_aa_size(self, value: int): self.option.maxAaSize = value
@property
def max_dna_hits(self) -> int: return self.option.maxDnaHits
@max_dna_hits.setter
def max_dna_hits(self, value: int): self.option.maxDnaHits = value
@property
def max_gap(self) -> int: return self.option.maxGap
@max_gap.setter
def max_gap(self, value: int): self.option.maxGap = value
@property
def max_nt_size(self) -> int: return self.option.maxNtSize
@max_nt_size.setter
def max_nt_size(self, value: int): self.option.maxNtSize = value
@property
def max_trans_hits(self) -> int: return self.option.maxTransHits
@max_trans_hits.setter
def max_trans_hits(self, value: int): self.option.maxTransHits = value
@property
def min_match(self) -> int: return self.option.minMatch
@min_match.setter
def min_match(self, value: int): self.option.minMatch = value
@property
def rep_match(self) -> int: return self.option.repMatch
@rep_match.setter
def rep_match(self, value: int): self.option.repMatch = value
@property
def seq_log(self) -> bool: return self.option.seqLog
@seq_log.setter
def seq_log(self, value: bool): self.option.seqLog = value
@property
def ip_log(self) -> bool: return self.option.ipLog
@ip_log.setter
def ip_log(self, value: bool): self.option.ipLog = value
@property
def debug_log(self) -> bool: return self.option.debugLog
@debug_log.setter
def debug_log(self, value: bool): self.option.debugLog = value
@property
def trans(self) -> bool: return self.option.trans
@trans.setter
def trans(self, value: bool): self.option.trans = value
@property
def syslog(self) -> bool: return self.option.syslog
@syslog.setter
def syslog(self, value: bool): self.option.syslog= value
@property
def no_simp_rep_mask(self) -> bool: return self.option.noSimpRepMask
@no_simp_rep_mask.setter
def no_simp_rep_mask(self, value: bool): self.option.noSimpRepMask= value
@property
def per_seq_max(self) -> str: return self.option.perSeqMax
@per_seq_max.setter
def per_seq_max(self, value: str): self.option.perSeqMax = value
@property
def index_file(self) -> str: return self.option.indexFile
@index_file.setter
def index_file(self, value: str): self.option.indexFile= value
# fmt: on