Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| '''A native client simulating the plugin to use for testing the server''' | |
| import asyncio | |
| import itertools | |
| import struct | |
| import json | |
| import time | |
| import sys | |
| import csv | |
| from pathlib import Path | |
| from pprint import pprint | |
| from tqdm import tqdm | |
| from flask import Flask, request, jsonify | |
| import asyncio | |
| from hypercorn.asyncio import serve | |
| from hypercorn.config import Config | |
| app = Flask(__name__) | |
| class Timer: | |
| """Little helper class top measure runtime of async function calls and dump | |
| all of those to a CSV. | |
| """ | |
| def __init__(self): | |
| self.measurements = [] | |
| async def measure(self, coro, *details): | |
| start = time.perf_counter() | |
| result = await coro | |
| end = time.perf_counter() | |
| self.measurements.append([end - start, *details]) | |
| return result | |
| def dump(self, fh): | |
| # TODO stats? For now I just export to Excel or something | |
| writer = csv.writer(fh) | |
| writer.writerows(self.measurements) | |
| class Client: | |
| """asyncio based native messaging client. Main interface is just calling | |
| `request()` with the right parameters and awaiting the future it returns. | |
| """ | |
| def __init__(self, *args): | |
| self.serial = itertools.count(1) | |
| self.futures = {} | |
| self.args = args | |
| async def __aenter__(self): | |
| self.proc = await asyncio.create_subprocess_exec(*self.args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) | |
| self.read_task = asyncio.create_task(self.reader()) | |
| return self | |
| async def __aexit__(self, *args): | |
| self.proc.stdin.close() | |
| await self.proc.wait() | |
| def request(self, command, data, *, update=lambda data: None): | |
| message_id = next(self.serial) | |
| message = json.dumps({"command": command, "id": message_id, "data": data}).encode() | |
| # print(f"Sending: {message}", file=sys.stderr) | |
| future = asyncio.get_running_loop().create_future() | |
| self.futures[message_id] = future, update | |
| self.proc.stdin.write(struct.pack("@I", len(message))) | |
| self.proc.stdin.write(message) | |
| return future | |
| async def reader(self): | |
| while True: | |
| try: | |
| raw_length = await self.proc.stdout.readexactly(4) | |
| length = struct.unpack("@I", raw_length)[0] | |
| raw_message = await self.proc.stdout.readexactly(length) | |
| # print(f"Receiving: {raw_message.decode()}", file=sys.stderr) | |
| message = json.loads(raw_message) | |
| # Not cool if there is no response message "id" here | |
| if not "id" in message: | |
| continue | |
| # print(f"Receiving response to {message['id']}", file=sys.stderr) | |
| future, update = self.futures[message["id"]] | |
| if "success" in message: | |
| del self.futures[message["id"]] | |
| if message["success"]: | |
| future.set_result(message["data"]) | |
| else: | |
| future.set_exception(Exception(message["error"])) | |
| elif "update" in message: | |
| update(message["data"]) | |
| except asyncio.IncompleteReadError: | |
| break # Stop read loop if EOF is reached | |
| except asyncio.CancelledError: | |
| break # Also stop reading if we're cancelled | |
| class TranslateLocally(Client): | |
| """TranslateLocally wrapper around Client that translates | |
| our defined messages into functions with arguments. | |
| """ | |
| async def list_models(self, *, include_remote=False): | |
| return await self.request("ListModels", {"includeRemote": bool(include_remote)}) | |
| async def translate(self, text, src=None, trg=None, *, model=None, pivot=None, html=False): | |
| if src and trg: | |
| if model or pivot: | |
| raise InvalidArgumentException("Cannot combine src + trg and model + pivot arguments") | |
| spec = {"src": str(src), "trg": str(trg)} | |
| elif model: | |
| if pivot: | |
| spec = {"model": str(model), "pivot": str(pivot)} | |
| else: | |
| spec = {"model": str(model)} | |
| else: | |
| raise InvalidArgumentException("Missing src + trg or model argument") | |
| result = await self.request("Translate", {**spec, "text": str(text), "html": bool(html)}) | |
| return result["target"]["text"] | |
| async def download_model(self, model_id, *, update=lambda data: None): | |
| return await self.request("DownloadModel", {"modelID": str(model_id)}, update=update) | |
| def first(iterable, *default): | |
| """Returns the first value of anything iterable, or throws StopIteration | |
| if it is empty. Or, if you specify a default argument, it will return that. | |
| """ | |
| return next(iter(iterable), *default) # passing as rest argument so it can be nothing and trigger StopIteration exception | |
| def get_build(): | |
| """Instantiate an asyncio TranslateLocally client that connects to | |
| tranlateLocally in your local build directory. | |
| """ | |
| paths = [ | |
| Path("./translateLocally"), | |
| Path(__file__).resolve().parent / Path("../build/translateLocally") | |
| ]; | |
| for path in paths: | |
| if path.exists(): | |
| return TranslateLocally(path.resolve(), "-p", "--debug") | |
| raise RuntimeError("Could not find translateLocally binary") | |
| async def download_with_progress(tl, model, position): | |
| """tl.download but with a tqdm powered progress bar.""" | |
| with tqdm(position=position, desc=model["modelName"], unit="b", unit_scale=True, leave=False) as bar: | |
| def update(data): | |
| assert data["read"] <= data["size"] | |
| bar.total = data["size"] | |
| diff = data["read"] - bar.n | |
| bar.update(diff) | |
| return await tl.download_model(model["id"], update=update) | |
| async def test(): | |
| """Test TranslateLocally functionality.""" | |
| async with get_build() as tl: | |
| models = await tl.list_models(include_remote=True) | |
| pprint(models) | |
| # Models necessary for tests, both direct & pivot | |
| necessary_models = {("en", "de"), ("en", "es"), ("es", "en")} | |
| # From all models available, pick one for every necessary language pair | |
| # (preferring tiny ones) so we can make sure these are downloaded. | |
| selected_models = { | |
| (src,trg): first(sorted( | |
| ( | |
| model | |
| for model in models | |
| if src in model["srcTags"] and trg == model["trgTag"] | |
| ), | |
| key=lambda model: 0 if model["type"] == "tiny" else 1 | |
| )) | |
| for src, trg in necessary_models | |
| } | |
| pprint(selected_models) | |
| # Download them. Even if they're already model['local'] == True, to test | |
| # that in that case this is a no-op. | |
| await asyncio.gather(*( | |
| download_with_progress(tl, model, position) | |
| for position, model in enumerate(selected_models.values()) | |
| )) | |
| print() # tqdm messes a lot with the print position, this makes it less bad | |
| # Test whether the model list has been updated to reflect that the | |
| # downloaded models are now local. | |
| models = await tl.list_models(include_remote=True) | |
| assert all( | |
| model["local"] | |
| for selected_model in selected_models.values() | |
| for model in models | |
| if model["id"] == selected_model["id"] | |
| ) | |
| # Perform some translations, switching between the models | |
| translations = await asyncio.gather( | |
| tl.translate("Hello world!", "en", "de"), | |
| tl.translate("Let's translate another sentence to German.", "en", "de"), | |
| tl.translate("Sticks and stones may break my bones but words WILL NEVER HURT ME!", "en", "es"), | |
| tl.translate("I <i>like</i> to drive my car. But I don't have one.", "en", "de", html=True), | |
| tl.translate("¿Por qué no funciona bien?", "es", "de"), | |
| tl.translate("This will be the last sentence of the day.", "en", "de"), | |
| ) | |
| pprint(translations) | |
| assert translations == [ | |
| "Hallo Welt!", | |
| "Übersetzen wir einen weiteren Satz mit Deutsch.", | |
| "Palos y piedras pueden romper mis huesos, pero las palabras NUNCA HURT ME.", | |
| "Ich <i>fahre gerne</i> mein Auto. Aber ich habe keine.", #<i>fahre</i>??? | |
| "Warum funktioniert es nicht gut?", | |
| "Dies wird der letzte Satz des Tages sein.", | |
| ] | |
| # Test bad input | |
| try: | |
| await tl.translate("This is impossible to translate", "en", "xx") | |
| assert False, "How are we able to translate to 'xx'???" | |
| except Exception as e: | |
| assert "Could not find the necessary translation models" in str(e) | |
| print("Fin") | |
| async def test_third_party(): | |
| """Test whether TranslateLocally can switch between different types of | |
| models. This test assumes you have the OPUS repository in your list: | |
| https://object.pouta.csc.fi/OPUS-MT-models/app/models.json | |
| """ | |
| async with get_build() as tl: | |
| models_to_try = [ | |
| 'en-de-tiny', | |
| 'en-de-base', | |
| 'eng-fin-tiny', # model has broken model_info.json so won't work anyway :( | |
| 'eng-ukr-tiny', | |
| ] | |
| models = await tl.list_models(include_remote=True) | |
| # Select a model from the model list for each of models_to_try, but | |
| # leave it out if there is no model available. | |
| selected_models = { | |
| shortname: model | |
| for shortname in models_to_try | |
| if (model := first((model for model in models if model["shortname"] == shortname), None)) | |
| } | |
| await asyncio.gather(*( | |
| download_with_progress(tl, model, position) | |
| for position, model in enumerate(selected_models.values()) | |
| )) | |
| # TODO: Temporary filter to figure out 'failed' downloads. eng-fin-tiny | |
| # has a broken JSON file so it will download correctly, but still not | |
| # be available or show up in this list. We should probably make the | |
| # download fail in that scenario. | |
| models = await tl.list_models(include_remote=False) | |
| for shortname in list(selected_models.keys()): | |
| if not any(True for model in models if model["shortname"] == shortname): | |
| print(f"Skipping {shortname} because it didn't show up in model list after downloading", file=sys.stderr) | |
| del selected_models[shortname] | |
| translations = await asyncio.gather(*[ | |
| tl.translate("This is a very simple test sentence", model=model["id"]) | |
| for model in selected_models.values() | |
| ]) | |
| pprint(list(zip(selected_models.keys(), translations))) | |
| async def test_latency(): | |
| timer = Timer() | |
| # Our line generator: just read Crime & Punishment from stdin :D | |
| lines = (line.strip() for line in sys.stdin) | |
| async with get_build() as tl: | |
| for epoch in range(100): | |
| print(f"Epoch {epoch}...", file=sys.stderr) | |
| for batch_size in [1, 5, 10, 20, 50, 100]: | |
| await asyncio.gather(*( | |
| timer.measure( | |
| tl.translate(line, "en", "de"), | |
| epoch, | |
| batch_size, | |
| len(line.split(' '))) | |
| for n, line in zip(range(batch_size), lines) | |
| )) | |
| timer.dump(sys.stdout) | |
| async def test_concurrency(): | |
| async with get_build() as tl: | |
| fetch_one = tl.list_models(include_remote=True) | |
| fetch_two = tl.list_models(include_remote=False) | |
| fetch_three = tl.list_models(include_remote=True) | |
| await asyncio.gather(fetch_one, fetch_two, fetch_three) | |
| async def test_shutdown(): | |
| tasks = [] | |
| async with get_build() as tl: | |
| for n in range(10): | |
| print(f"Requesting translation {n}") | |
| tasks.append(tl.request("Translate", { | |
| "src": "en", | |
| "trg": "de", | |
| "text": f"This is simple sentence number {n}!", | |
| "html": False | |
| })) | |
| print("Shutting down") | |
| print("Shutdown complete") | |
| for translation in asyncio.as_completed(tasks): | |
| print(await translation) | |
| print("Fin.") | |
| async def test_concurrent_download(): | |
| """Test parallel downloads.""" | |
| async with get_build() as tl: | |
| models = await tl.list_models(include_remote=True) | |
| remote = [model for model in models if not model["local"]] | |
| downloads = [ | |
| tl.download_model(model["id"]) | |
| for model, _ in zip(remote, range(3)) | |
| ] | |
| await asyncio.gather(*downloads) | |
| def run_async_function(func): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| return loop.run_until_complete(func) | |
| def list_models(): | |
| include_remote = request.args.get('include_remote', 'false').lower() == 'true' | |
| async def async_func(): | |
| async with get_build() as tl: | |
| models = await tl.list_models(include_remote=include_remote) | |
| return models | |
| result = run_async_function(async_func()) | |
| return jsonify(result) | |
| def translate(): | |
| data = request.get_json() | |
| async def async_func(): | |
| async with get_build() as tl: | |
| result = await tl.translate( | |
| data['text'], | |
| src=data.get('src'), | |
| trg=data.get('trg'), | |
| model=data.get('model'), | |
| pivot=data.get('pivot'), | |
| html=data.get('html', False) | |
| ) | |
| return result | |
| result = run_async_function(async_func()) | |
| return jsonify(result) | |
| # Define more routes for other operations like download_model, etc. | |
| if __name__ == "__main__": | |
| config = Config() | |
| config.bind = ["0.0.0.0:7860"] # You can specify the host and port here | |
| asyncio.run(serve(app, config)) |