Spaces:
Running
Running
# Copyright (c) Microsoft Corporation. | |
# Licensed under the MIT license. | |
import os | |
from .base_channel import BaseChannel | |
command_path = "./commands" | |
runner_commands_file_name_prefix = "runner_commands" | |
manager_commands_file_name = "manager_commands.txt" | |
class FileChannel(BaseChannel): | |
def __init__(self, args): | |
self.node_id = args.node_id | |
self.out_file = None | |
self.in_file = None | |
self.in_offset = 0 | |
self.in_cache = b"" | |
super(FileChannel, self).__init__(args) | |
def _inner_open(self): | |
pass | |
def _inner_close(self): | |
if self.out_file is not None: | |
self.out_file.close() | |
self.out_file = None | |
if self.in_file is not None: | |
self.in_file.close() | |
self.in_file = None | |
def _inner_send(self, message): | |
if self.out_file is None: | |
if not os.path.exists(command_path): | |
os.makedirs(command_path, exist_ok=True) | |
if self.node_id is None: | |
file_name = os.path.join(command_path, "%s.txt" % runner_commands_file_name_prefix) | |
else: | |
file_name = os.path.join(command_path, "%s_%s.txt" % ( | |
runner_commands_file_name_prefix, self.node_id)) | |
self.out_file = open(file_name, "ab") | |
self.out_file.write(message) | |
self.out_file.write(b'\n') | |
self.out_file.flush() | |
def _open_manager_command(self): | |
full_name = os.path.join(command_path, manager_commands_file_name) | |
if self.in_file is not None and self.in_file.closed: | |
self.in_file = None | |
if self.in_file is None and os.path.exists(full_name): | |
self.in_file = open(full_name, "rb") | |
self.in_file.seek(self.in_offset) | |
def _inner_receive(self): | |
messages = [] | |
if self.in_file is None: | |
self._open_manager_command() | |
if self.in_file is not None: | |
self.in_file.seek(0, os.SEEK_END) | |
new_offset = self.in_file.tell() | |
self.in_file.seek(self.in_offset, os.SEEK_SET) | |
count = new_offset - self.in_offset | |
if count > 0: | |
self.in_cache += self.in_file.read(count) | |
self.in_offset = new_offset | |
messages, self.in_cache = self._fetch_message(self.in_cache, True) | |
return messages | |