"""leveldb log datastore. Format is described at: https://github.com/google/leveldb/blob/master/doc/log_format.md block := record* trailer? record := checksum: uint32 // crc32c of type and data[] ; little-endian length: uint16 // little-endian type: uint8 // One of FULL, FIRST, MIDDLE, LAST data: uint8[length] header := ident: char[4] magic: uint16 version: uint8 """ # TODO: possibly restructure code by porting the C++ or go implementation import logging import os import struct import zlib from typing import TYPE_CHECKING, Optional, Tuple import wandb if TYPE_CHECKING: from typing import IO, Any from wandb.proto.wandb_internal_pb2 import Record logger = logging.getLogger(__name__) LEVELDBLOG_HEADER_LEN = 7 LEVELDBLOG_BLOCK_LEN = 32768 LEVELDBLOG_DATA_LEN = LEVELDBLOG_BLOCK_LEN - LEVELDBLOG_HEADER_LEN LEVELDBLOG_FULL = 1 LEVELDBLOG_FIRST = 2 LEVELDBLOG_MIDDLE = 3 LEVELDBLOG_LAST = 4 LEVELDBLOG_HEADER_IDENT = ":W&B" LEVELDBLOG_HEADER_MAGIC = ( 0xBEE1 # zlib.crc32(bytes("Weights & Biases", 'iso8859-1')) & 0xffff ) LEVELDBLOG_HEADER_VERSION = 0 try: bytes("", "ascii") def strtobytes(x): """Strtobytes.""" return bytes(x, "iso8859-1") # def bytestostr(x): # return str(x, 'iso8859-1') except Exception: strtobytes = str # bytestostr = str class DataStore: _index: int _flush_offset: int def __init__(self) -> None: self._opened_for_scan = False self._fp: Optional[IO[Any]] = None self._index = 0 self._flush_offset = 0 self._size_bytes = 0 self._crc = [0] * (LEVELDBLOG_LAST + 1) for x in range(1, LEVELDBLOG_LAST + 1): self._crc[x] = zlib.crc32(strtobytes(chr(x))) & 0xFFFFFFFF assert ( wandb._assert_is_internal_process # type: ignore ), "DataStore can only be used in the internal process" def open_for_write(self, fname: str) -> None: self._fname = fname logger.info("open: %s", fname) open_flags = "xb" self._fp = open(fname, open_flags) self._write_header() def open_for_append(self, fname): # TODO: implement self._fname = fname logger.info("open: %s", fname) self._fp = open(fname, "wb") # do something with _index def open_for_scan(self, fname): self._fname = fname logger.info("open for scan: %s", fname) self._fp = open(fname, "r+b") self._index = 0 self._size_bytes = os.stat(fname).st_size self._opened_for_scan = True self._read_header() def seek(self, offset: int) -> None: self._fp.seek(offset) # type: ignore self._index = offset def get_offset(self) -> int: offset = self._fp.tell() # type: ignore return offset def in_last_block(self): """Determine if we're in the last block to handle in-progress writes.""" return self._index > self._size_bytes - LEVELDBLOG_DATA_LEN def scan_record(self): assert self._opened_for_scan, "file not open for scanning" # TODO(jhr): handle some assertions as file corruption issues # assume we have enough room to read header, checked by caller? header = self._fp.read(LEVELDBLOG_HEADER_LEN) if len(header) == 0: return None assert ( len(header) == LEVELDBLOG_HEADER_LEN ), f"record header is {len(header)} bytes instead of the expected {LEVELDBLOG_HEADER_LEN}" fields = struct.unpack(" LEVELDBLOG_DATA_LEN: self._write_record( s[data_used : data_used + LEVELDBLOG_DATA_LEN], LEVELDBLOG_MIDDLE, ) data_used += LEVELDBLOG_DATA_LEN data_left -= LEVELDBLOG_DATA_LEN # write last and flush the entire block to disk self._write_record(s[data_used:], LEVELDBLOG_LAST) self._fp.flush() os.fsync(self._fp.fileno()) self._flush_offset = self._index return start_offset, self._index, self._flush_offset def ensure_flushed(self, off: int) -> None: self._fp.flush() # type: ignore def write(self, obj: "Record") -> Tuple[int, int, int]: """Write a protocol buffer. Args: obj: Protocol buffer to write. Returns: (start_offset, end_offset, flush_offset) if successful, None otherwise """ raw_size = obj.ByteSize() s = obj.SerializeToString() assert len(s) == raw_size, "invalid serialization" ret = self._write_data(s) return ret def close(self) -> None: if self._fp is not None: logger.info("close: %s", self._fname) self._fp.close()