#!/usr/bin/env python
"""REST Translation server."""
from __future__ import print_function
import codecs
import sys
import os
import time
import json
import threading
import re
import traceback
import torch
import onmt.opts
from onmt.utils.logging import init_logger
from onmt.utils.misc import set_random_seed
from onmt.utils.parse import ArgumentParser
from onmt.translate.translator import build_translator
def critical(func):
"""Decorator for critical section (mutually exclusive code)"""
def wrapper(server_model, *args, **kwargs):
if sys.version_info[0] == 3:
if not server_model.running_lock.acquire(True, 120):
raise ServerModelError("Model %d running lock timeout"
% server_model.model_id)
else:
# semaphore doesn't have a timeout arg in Python 2.7
server_model.running_lock.acquire(True)
try:
o = func(server_model, *args, **kwargs)
except (Exception, RuntimeError):
server_model.running_lock.release()
raise
server_model.running_lock.release()
return o
return wrapper
[docs]class Timer:
def __init__(self, start=False):
self.stime = -1
self.prev = -1
self.times = {}
if start:
self.start()
def start(self):
self.stime = time.time()
self.prev = self.stime
self.times = {}
def tick(self, name=None, tot=False):
t = time.time()
if not tot:
elapsed = t - self.prev
else:
elapsed = t - self.stime
self.prev = t
if name is not None:
self.times[name] = elapsed
return elapsed
[docs]class ServerModelError(Exception):
pass
[docs]class TranslationServer(object):
def __init__(self):
self.models = {}
self.next_id = 0
[docs] def start(self, config_file):
"""Read the config file and pre-/load the models."""
self.config_file = config_file
with open(self.config_file) as f:
self.confs = json.load(f)
self.models_root = self.confs.get('models_root', './available_models')
for i, conf in enumerate(self.confs["models"]):
if "models" not in conf:
if "model" in conf:
# backwards compatibility for confs
conf["models"] = [conf["model"]]
else:
raise ValueError("""Incorrect config file: missing 'models'
parameter for model #%d""" % i)
kwargs = {'timeout': conf.get('timeout', None),
'load': conf.get('load', None),
'tokenizer_opt': conf.get('tokenizer', None),
'on_timeout': conf.get('on_timeout', None),
'model_root': conf.get('model_root', self.models_root)
}
kwargs = {k: v for (k, v) in kwargs.items() if v is not None}
model_id = conf.get("id", None)
opt = conf["opt"]
opt["models"] = conf["models"]
self.preload_model(opt, model_id=model_id, **kwargs)
[docs] def clone_model(self, model_id, opt, timeout=-1):
"""Clone a model `model_id`.
Different options may be passed. If `opt` is None, it will use the
same set of options
"""
if model_id in self.models:
if opt is None:
opt = self.models[model_id].user_opt
opt["models"] = self.models[model_id].opt.models
return self.load_model(opt, timeout)
else:
raise ServerModelError("No such model '%s'" % str(model_id))
[docs] def load_model(self, opt, model_id=None, **model_kwargs):
"""Load a model given a set of options
"""
model_id = self.preload_model(opt, model_id=model_id, **model_kwargs)
load_time = self.models[model_id].load_time
return model_id, load_time
[docs] def preload_model(self, opt, model_id=None, **model_kwargs):
"""Preloading the model: updating internal datastructure
It will effectively load the model if `load` is set
"""
if model_id is not None:
if model_id in self.models.keys():
raise ValueError("Model ID %d already exists" % model_id)
else:
model_id = self.next_id
while model_id in self.models.keys():
model_id += 1
self.next_id = model_id + 1
print("Pre-loading model %d" % model_id)
model = ServerModel(opt, model_id, **model_kwargs)
self.models[model_id] = model
return model_id
[docs] def run(self, inputs):
"""Translate `inputs`
We keep the same format as the Lua version i.e.
``[{"id": model_id, "src": "sequence to translate"},{ ...}]``
We use inputs[0]["id"] as the model id
"""
model_id = inputs[0].get("id", 0)
if model_id in self.models and self.models[model_id] is not None:
return self.models[model_id].run(inputs)
else:
print("Error No such model '%s'" % str(model_id))
raise ServerModelError("No such model '%s'" % str(model_id))
[docs] def unload_model(self, model_id):
"""Manually unload a model.
It will free the memory and cancel the timer
"""
if model_id in self.models and self.models[model_id] is not None:
self.models[model_id].unload()
else:
raise ServerModelError("No such model '%s'" % str(model_id))
[docs] def list_models(self):
"""Return the list of available models
"""
models = []
for _, model in self.models.items():
models += [model.to_dict()]
return models
[docs]class ServerModel(object):
"""Wrap a model with server functionality.
Args:
opt (dict): Options for the Translator
model_id (int): Model ID
tokenizer_opt (dict): Options for the tokenizer or None
load (bool): whether to load the model during :func:`__init__()`
timeout (int): Seconds before running :func:`do_timeout()`
Negative values means no timeout
on_timeout (str): Options are ["to_cpu", "unload"]. Set what to do on
timeout (see :func:`do_timeout()`.)
model_root (str): Path to the model directory
it must contain the model and tokenizer file
"""
def __init__(self, opt, model_id, tokenizer_opt=None, load=False,
timeout=-1, on_timeout="to_cpu", model_root="./"):
self.model_root = model_root
self.opt = self.parse_opt(opt)
if self.opt.n_best > 1:
raise ValueError("Values of n_best > 1 are not supported")
self.model_id = model_id
self.tokenizer_opt = tokenizer_opt
self.timeout = timeout
self.on_timeout = on_timeout
self.unload_timer = None
self.user_opt = opt
self.tokenizer = None
if len(self.opt.log_file) > 0:
log_file = os.path.join(model_root, self.opt.log_file)
else:
log_file = None
self.logger = init_logger(log_file=log_file,
log_file_level=self.opt.log_file_level)
self.loading_lock = threading.Event()
self.loading_lock.set()
self.running_lock = threading.Semaphore(value=1)
set_random_seed(self.opt.seed, self.opt.cuda)
if load:
self.load()
[docs] def parse_opt(self, opt):
"""Parse the option set passed by the user using `onmt.opts`
Args:
opt (dict): Options passed by the user
Returns:
opt (argparse.Namespace): full set of options for the Translator
"""
prec_argv = sys.argv
sys.argv = sys.argv[:1]
parser = ArgumentParser()
onmt.opts.translate_opts(parser)
models = opt['models']
if not isinstance(models, (list, tuple)):
models = [models]
opt['models'] = [os.path.join(self.model_root, model)
for model in models]
opt['src'] = "dummy_src"
for (k, v) in opt.items():
if k == 'models':
sys.argv += ['-model']
sys.argv += [str(model) for model in v]
elif type(v) == bool:
sys.argv += ['-%s' % k]
else:
sys.argv += ['-%s' % k, str(v)]
opt = parser.parse_args()
ArgumentParser.validate_translate_opts(opt)
opt.cuda = opt.gpu > -1
sys.argv = prec_argv
return opt
@property
def loaded(self):
return hasattr(self, 'translator')
def load(self):
self.loading_lock.clear()
timer = Timer()
self.logger.info("Loading model %d" % self.model_id)
timer.start()
try:
self.translator = build_translator(self.opt,
report_score=False,
out_file=codecs.open(
os.devnull, "w", "utf-8"))
except RuntimeError as e:
raise ServerModelError("Runtime Error: %s" % str(e))
timer.tick("model_loading")
if self.tokenizer_opt is not None:
self.logger.info("Loading tokenizer")
if "type" not in self.tokenizer_opt:
raise ValueError(
"Missing mandatory tokenizer option 'type'")
if self.tokenizer_opt['type'] == 'sentencepiece':
if "model" not in self.tokenizer_opt:
raise ValueError(
"Missing mandatory tokenizer option 'model'")
import sentencepiece as spm
sp = spm.SentencePieceProcessor()
model_path = os.path.join(self.model_root,
self.tokenizer_opt['model'])
sp.Load(model_path)
self.tokenizer = sp
elif self.tokenizer_opt['type'] == 'pyonmttok':
if "params" not in self.tokenizer_opt:
raise ValueError(
"Missing mandatory tokenizer option 'params'")
import pyonmttok
if self.tokenizer_opt["mode"] is not None:
mode = self.tokenizer_opt["mode"]
else:
mode = None
# load can be called multiple times: modify copy
tokenizer_params = dict(self.tokenizer_opt["params"])
for key, value in self.tokenizer_opt["params"].items():
if key.endswith("path"):
tokenizer_params[key] = os.path.join(
self.model_root, value)
tokenizer = pyonmttok.Tokenizer(mode,
**tokenizer_params)
self.tokenizer = tokenizer
else:
raise ValueError("Invalid value for tokenizer type")
self.load_time = timer.tick()
self.reset_unload_timer()
self.loading_lock.set()
@critical
def run(self, inputs):
"""Translate `inputs` using this model
Args:
inputs (List[dict[str, str]]): [{"src": "..."},{"src": ...}]
Returns:
result (list): translations
times (dict): containing times
"""
self.stop_unload_timer()
timer = Timer()
timer.start()
self.logger.info("Running translation using %d" % self.model_id)
if not self.loading_lock.is_set():
self.logger.info(
"Model #%d is being loaded by another thread, waiting"
% self.model_id)
if not self.loading_lock.wait(timeout=30):
raise ServerModelError("Model %d loading timeout"
% self.model_id)
else:
if not self.loaded:
self.load()
timer.tick(name="load")
elif self.opt.cuda:
self.to_gpu()
timer.tick(name="to_gpu")
texts = []
head_spaces = []
tail_spaces = []
sslength = []
for i, inp in enumerate(inputs):
src = inp['src']
if src.strip() == "":
head_spaces.append(src)
texts.append("")
tail_spaces.append("")
else:
whitespaces_before, whitespaces_after = "", ""
match_before = re.search(r'^\s+', src)
match_after = re.search(r'\s+$', src)
if match_before is not None:
whitespaces_before = match_before.group(0)
if match_after is not None:
whitespaces_after = match_after.group(0)
head_spaces.append(whitespaces_before)
tok = self.maybe_tokenize(src.strip())
texts.append(tok)
sslength.append(len(tok.split()))
tail_spaces.append(whitespaces_after)
empty_indices = [i for i, x in enumerate(texts) if x == ""]
texts_to_translate = [x for x in texts if x != ""]
scores = []
predictions = []
if len(texts_to_translate) > 0:
try:
scores, predictions = self.translator.translate(
texts_to_translate,
batch_size=self.opt.batch_size)
except (RuntimeError, Exception) as e:
err = "Error: %s" % str(e)
self.logger.error(err)
self.logger.error("repr(text_to_translate): "
+ repr(texts_to_translate))
self.logger.error("model: #%s" % self.model_id)
self.logger.error("model opt: " + str(self.opt.__dict__))
self.logger.error(traceback.format_exc())
raise ServerModelError(err)
timer.tick(name="translation")
self.logger.info("""Using model #%d\t%d inputs
\ttranslation time: %f""" % (self.model_id, len(texts),
timer.times['translation']))
self.reset_unload_timer()
# NOTE: translator returns lists of `n_best` list
# we can ignore that (i.e. flatten lists) only because
# we restrict `n_best=1`
def flatten_list(_list): return sum(_list, [])
results = flatten_list(predictions)
scores = [score_tensor.item()
for score_tensor in flatten_list(scores)]
results = [self.maybe_detokenize(item)
for item in results]
# build back results with empty texts
for i in empty_indices:
results.insert(i, "")
scores.insert(i, 0)
results = ["".join(items)
for items in zip(head_spaces, results, tail_spaces)]
self.logger.info("Translation Results: %d", len(results))
return results, scores, self.opt.n_best, timer.times
[docs] def do_timeout(self):
"""Timeout function that frees GPU memory.
Moves the model to CPU or unloads it; depending on
attr`self.on_timemout` value
"""
if self.on_timeout == "unload":
self.logger.info("Timeout: unloading model %d" % self.model_id)
self.unload()
if self.on_timeout == "to_cpu":
self.logger.info("Timeout: sending model %d to CPU"
% self.model_id)
self.to_cpu()
@critical
def unload(self):
self.logger.info("Unloading model %d" % self.model_id)
del self.translator
if self.opt.cuda:
torch.cuda.empty_cache()
self.unload_timer = None
def stop_unload_timer(self):
if self.unload_timer is not None:
self.unload_timer.cancel()
def reset_unload_timer(self):
if self.timeout < 0:
return
self.stop_unload_timer()
self.unload_timer = threading.Timer(self.timeout, self.do_timeout)
self.unload_timer.start()
def to_dict(self):
hide_opt = ["models", "src"]
d = {"model_id": self.model_id,
"opt": {k: self.user_opt[k] for k in self.user_opt.keys()
if k not in hide_opt},
"models": self.user_opt["models"],
"loaded": self.loaded,
"timeout": self.timeout,
}
if self.tokenizer_opt is not None:
d["tokenizer"] = self.tokenizer_opt
return d
@critical
def to_cpu(self):
"""Move the model to CPU and clear CUDA cache."""
self.translator.model.cpu()
if self.opt.cuda:
torch.cuda.empty_cache()
[docs] def to_gpu(self):
"""Move the model to GPU."""
torch.cuda.set_device(self.opt.gpu)
self.translator.model.cuda()
[docs] def maybe_tokenize(self, sequence):
"""Tokenize the sequence (or not).
Same args/returns as `tokenize`
"""
if self.tokenizer_opt is not None:
return self.tokenize(sequence)
return sequence
[docs] def tokenize(self, sequence):
"""Tokenize a single sequence.
Args:
sequence (str): The sequence to tokenize.
Returns:
tok (str): The tokenized sequence.
"""
if self.tokenizer is None:
raise ValueError("No tokenizer loaded")
if self.tokenizer_opt["type"] == "sentencepiece":
tok = self.tokenizer.EncodeAsPieces(sequence)
tok = " ".join(tok)
elif self.tokenizer_opt["type"] == "pyonmttok":
tok, _ = self.tokenizer.tokenize(sequence)
tok = " ".join(tok)
return tok
[docs] def maybe_detokenize(self, sequence):
"""De-tokenize the sequence (or not)
Same args/returns as :func:`tokenize()`
"""
if self.tokenizer_opt is not None and ''.join(sequence.split()) != '':
return self.detokenize(sequence)
return sequence
[docs] def detokenize(self, sequence):
"""Detokenize a single sequence
Same args/returns as :func:`tokenize()`
"""
if self.tokenizer is None:
raise ValueError("No tokenizer loaded")
if self.tokenizer_opt["type"] == "sentencepiece":
detok = self.tokenizer.DecodePieces(sequence.split())
elif self.tokenizer_opt["type"] == "pyonmttok":
detok = self.tokenizer.detokenize(sequence.split())
return detok