Source code for fitlog.fastlog.log_read

import os
import re
import json
from typing import List
import threading
import time
import traceback
from collections import defaultdict
from .utils import flatten_dict

[docs]class LogReader: """ 用于读取日志的类, 用于配合Table使用 """ def __init__(self): """ self._line_counter里面的内容:{save_log_dir: {filename: (line_count, last_change_time)}} """ self._log_dir = None self._ignore_null_loss_or_metric = True # 如果loss和metric都是null的话,则忽略 self._line_counter = defaultdict(lambda: None) # 记住每个log读取到的line的数量以及修改时间
[docs] def set_log_dir(self, log_dir: str): """ 设置 log 的存放位置 """ if not os.path.isdir(log_dir): raise RuntimeError("`{}` is not a valid directory.".format(log_dir)) empty = True for _dir in os.listdir(log_dir): if is_dirname_log_record(os.path.join(log_dir, _dir)): empty = False if empty: raise RuntimeError("`{}` has no valid logs.".format(log_dir)) self._log_dir = log_dir self._line_counter.clear() # 删除记录,使得重新读取
[docs] def read_logs(self, ignore_log_names: dict = None) -> List[dict]: """ 从日志存放路径读取日志. 只会读取有更新的log :param ignore_log_names: 如果包含在这个里面,就不会读取该log :return: 如果有内容或者有更新的内容,则返回一个 list,里面每个元素都是nested的dict. [{ 'id': 'metric': {nested dict}, 'meta': {}, ... },{ }] """ assert self._log_dir is not None, "You have to set log_dir first." if ignore_log_names is None: ignore_log_names = {} dirs = os.listdir(self._log_dir) logs = [] for _dir in dirs: if _dir in ignore_log_names: continue dir_path = os.path.join(self._log_dir, _dir) if is_dirname_log_record(dir_path): _dict, file_stats = _read_save_log(dir_path, self._ignore_null_loss_or_metric, self._line_counter[_dir]) if len(_dict) != 0: logs.append({'id': _dir, **_dict}) self._line_counter[_dir] = file_stats return logs
[docs] def read_certain_logs(self, log_dir_names): """ 给定log的名称,只读取对应的log :param log_dir_names: list[str] :return: [{}, {}], nested的log """ assert self._log_dir is not None, "You have to set log_dir first." logs = [] for _dir in log_dir_names: dir_path = os.path.join(self._log_dir, _dir) if is_dirname_log_record(dir_path): _dict, file_stats = _read_save_log(dir_path, self._ignore_null_loss_or_metric, self._line_counter[_dir]) if len(_dict) != 0: logs.append({'id': _dir, **_dict}) return logs
def _read_save_log(_save_log_dir: str, ignore_null_loss_or_metric: bool = True, file_stats: dict = None): """ 给定一个包含metric.log, hyper.log, meta.log以及other.log的文件夹,返回一个包含数据的dict. 如果为null则返回空字典 不读取loss.log, 因为里面的内容对table无意义。 :param _save_log_dir: 日志存放的目录, 已经最后一级了,即该目录下应该包含metric.log等了 :param ignore_null_loss_or_metric: 是否忽略metric和loss都为空的文件 :param file_stats:: { 'meta.log': [current_line, last_modified_time], 'hyper.log':[], 'metric.log':[], 'other.log':[] } :return: _dict: {'metric': {nested dict}, 'loss': {} } file_stats: {'meta.log': [current_line, last_modified_time], 'metric.log': [, ]} # 只包含有更新的文件的内容 """ try: filenames = ['meta.log', 'hyper.log', 'best_metric.log', 'other.log'] if file_stats is None: file_stats = {} for filename in filenames: if filename not in file_stats: file_stats[filename] = [-1, -1] _dict = {} def _is_file_empty(fn): empty = True fp = os.path.join(_save_log_dir, fn) if os.path.exists(fp): with open(fp, 'r', encoding='utf-8') as f: for line in f: if len(line.strip()) != 0: empty = False break return empty if os.path.exists(os.path.join(_save_log_dir, 'metric.log')) and \ not os.path.exists(os.path.join(_save_log_dir, 'best_metric.log')): # 可能是之前的版本生成的, 适配一下 with open(os.path.join(_save_log_dir, 'metric.log'), 'r', encoding='utf-8') as f, \ open(os.path.join(_save_log_dir, 'best_metric.log'), 'w', encoding='utf-8') as f2: for line in f: if not line.startswith('S'): # 是best_metric best_line = line f2.write(best_line) empty = _is_file_empty('best_metric.log') and _is_file_empty('loss.log') and _is_file_empty('metric.log') if empty and ignore_null_loss_or_metric: return _dict, file_stats for filename in filenames: filepath = os.path.join(_save_log_dir, filename) last_modified_time = os.path.getmtime(filepath) if file_stats[filename][1] == last_modified_time: continue file_stats[filename][1] = last_modified_time start_line = file_stats[filename][0] __dict, end_line = _read_nonstep_log_file(filepath, start_line) file_stats[filename][0] = end_line _dict = merge(_dict, __dict, use_b=False) # 在这里,需要以文件指定顺序,保留靠前的内容的值 except Exception as e: print("Exception raised when read {}".format(os.path.abspath(_save_log_dir))) print(repr(e)) raise e return _dict, file_stats
[docs]def is_log_dir_has_step(_save_log_dir: str, check_files=('metric.log', 'loss.log')) -> bool: """ 给定log_dir, 判断是否有step数据 :param _save_log_dir 日志存放的目录 :param check_files: 检查file是否含有step :return: 是否有step数据 """ if not is_dirname_log_record(_save_log_dir): return False try: filenames = check_files for filename in filenames: filepath = os.path.join(_save_log_dir, filename) if not os.path.exists(filepath): continue with open(filepath, 'r', encoding='utf-8') as f: for line in f: if line.startswith('S'): return True except Exception as e: traceback.print_exc() print("Exception raised when read {}".format(os.path.abspath(filepath))) return False
def _read_nonstep_log_file(filepath: str, start_line: int = 0) -> (dict, int): """ 给定一个filepath, 读取里面非Step: 开头的line,每一行为json,使用后面的内容覆盖前面的内容 :param filepath: 读取文件的路径 :param start_line: 从哪一行开始读取 :return: 返回一个字典(没有内容为空)和最后读取到的行号 """ a = {} with open(filepath, 'r', encoding='utf-8') as f: index = -1 for index, line in enumerate(f): if index < start_line: continue if not line.startswith('S'): # 读取非step的内容 line = line.strip() try: b = json.loads(line) # TODO 如果含有非法字符(例如“!"#$%&'()*+,./:;<=>?@[\]^`{|}|~ ”),导致前端无法显示怎么办? except: print("Corrupted json format in {}, line:{}".format(filepath, line)) continue a = merge(a, b, use_b=True) return a, index + 1
[docs]def merge(a: dict, b: dict, use_b: bool = True) -> dict: """ 将两个dict recursive合并到a中,有相同key的,根据use_b判断使用哪个值 :param a: 字典 a :param b: 字典 b :param use_b: 是否使用字典 b 的值 :return: 返回字典 a """ for key in b: if key in a: if isinstance(a[key], dict) and isinstance(b[key], dict): merge(a[key], b[key], use_b) elif use_b: a[key] = b[key] else: a[key] = b[key] return a
[docs]def is_dirname_log_record(dir_path: str) -> bool: """ 检查dir_path是否是一个合法的log目录。合法的log目录里必须包含meta.log。 :param dir_path: 被检测的路径 :return: 是否合法 """ if not os.path.isdir(dir_path): return False if len(re.findall(r'log_\d{8}_\d{6}$', dir_path)) != 0: filenames = ['meta.log'] # 至少要有meta.log表明这个是合法的log for filename in filenames: if not os.path.exists(os.path.join(dir_path, filename)): return False return True else: return False
[docs]def is_log_record_finish(save_log_dir: str) -> bool: """ 检测日志的记录是否已经结束 :param save_log_dir: 日志存放的目录 :return: """ if is_dirname_log_record(save_log_dir): with open(os.path.join(save_log_dir, 'meta.log'), 'r', encoding='utf-8') as f: line = '' for line in f: pass if len(line.strip()) != 0: try: _d = json.loads(line) except: return False if 'state' in _d['meta'] and _d['meta']['state'] in ('finish', 'error'): return True return False
[docs]class StandbyStepLogReader(threading.Thread): """ 用于多线程读取日志的类. 配合画图使用的。 :param save_log_dir: 日志存放的目录 :param uuid: 用于唯一识别 Reader 的 uuid :param wait_seconds: 在文件关闭后再等待{wait_seconds}秒结束进程 :param max_no_updates: 在{max_no_updates}次都没有更新时结束进程 """ def __init__(self, save_log_dir: str, uuid: str, wait_seconds: int = 60, max_no_updates: int = 30): super().__init__() self.save_log_dir = save_log_dir self._file_handlers = {} self.uuid = uuid self._last_access_time = time.time() # 如果这么长时间没有读取到新的数据,就认为是不需要再读取的了 # 如果这么长时间没有再次调用,就关掉文件 self._wait_seconds = wait_seconds self.unfinish_lines = {} # 防止读写冲突, key: line self._stop_flag = False self._quit = False self._no_update_count = 0 self.max_no_update = max_no_updates self._last_meta_md_time = None self._meta_path = os.path.join(self.save_log_dir, 'meta.log') self._total_steps = None def _create_file_handler(self, filenames=('metric.log', 'loss.log')): """ 检查是否有未加入的handler,有则加入进来 :return: """ for filename in filenames: handler_name = filename.split('.')[0] if handler_name in self._file_handlers: continue filepath = os.path.join(self.save_log_dir, filename) handler = open(filepath, 'r', encoding='utf-8') self._file_handlers[handler_name] = handler def _is_finish_in_meta(self) -> bool: """ 检查是否已经在meta中写明了finish的状态了 :return: bool """ last_meta_md_time = os.path.getmtime(self._meta_path) if self._last_meta_md_time is None or self._last_meta_md_time != last_meta_md_time: with open(self._meta_path, 'r', encoding='utf-8') as f: line = '' for line in f: pass line = line.strip() if len(line) != 0: try: _dict = json.loads(line)['meta'] if 'state' in _dict and _dict['state'] in ('finish', 'error'): return True except: pass self._last_meta_md_time = last_meta_md_time return False
[docs] @staticmethod def read_update_single_log(filepaths: List[str], ranges: dict) -> dict: """ 调用这个函数,获取filepaths中满足range_min, range_max的log :param filepaths: 完整的path路径 :param ranges: {'metric':[min, max] } :return: 返回值的结构如下。loss这个list是进行了step排序的 { loss: [dict('step':x, epoch:value, 'loss':{'loss1':xx})], metric:[dict('step':x, epoch:value, 'metric':{'SpanFMetric':{'f':xx}})] } """ updates = defaultdict(list) for filepath in filepaths: filename = os.path.basename(filepath).split('.')[0] range_min = int(ranges[filename][0]) range_max = int(ranges[filename][1]) with open(filepath, 'r', encoding='utf-8') as f: for line in f: if not line.endswith('\n'): # 结尾不是回车,说明没有读完 pass else: if line.startswith('S'): step = int(line[line.index(':') + 1:line.index('\t')]) if range_min <= step <= range_max: line = line[line.index('\t') + 1:].strip() try: _dict = json.loads(line) updates[filename].append(_dict) except: pass if filename in updates and len(updates[filename]) != 0: # 对step排序,保证不要出现混乱 updates[filename].sort(key=lambda x: x['step']) return updates
[docs] def read_update(self, only_once: bool = False, handler_names=('metric', 'loss')) -> dict: """ 调用这个函数,获取新的更新。如果第一次调用则是读取到当前所有的记录。 :param only_once: 是否只读取内容一次。是的话就不会保持读取到的行数,之后直接退出了 :param handler_names: 只check包含在handler_name的内容 :return: 返回值的结构如下 { loss: [dict('step':x, epoch:value, 'loss':{}), ...], # 或[dict('step':x, epoch:value, 'loss':value), ...] metric:[dict('step':x, epoch:value, 'metric':{'SpanFMetric':xxx})], finish:bool(not every time), total_steps:int(only the first access) } """ updates = {} if not self._quit: flag = False self._create_file_handler([fn+'.log' for fn in handler_names]) updates = defaultdict(list) if self._last_access_time is None: filepath = os.path.join(self.save_log_dir, 'progress.log') if os.path.exists(filepath): with open(filepath, 'r', encoding='utf-8') as f: line = f.readline() try: _dict = json.loads(line.strip()) if 'total_steps' in _dict: self._total_steps = _dict['total_steps'] updates['total_steps'] = _dict['total_steps'] except: pass flag = True self._last_access_time = time.time() for filename, handler in self._file_handlers.items(): if filename not in handler_names: continue for line in handler.readlines(): if filename in self.unfinish_lines: line = self.unfinish_lines.pop(filename) + line if not line.endswith('\n'): # 结尾不是回车,说明没有读完 self.unfinish_lines[filename] = line else: if line.startswith('S'): line = line[line.index('\t') + 1:].strip() try: _dict = json.loads(line) updates[filename].append(_dict) except: pass if filename in updates and len(updates[filename]) != 0: # 对step排序,保证不要出现混乱 updates[filename].sort(key=lambda x: x['step']) if not only_once: if len(updates) == 0: self._no_update_count += 1 else: self._no_update_count = 0 if flag: self.start() else: # 如果确定只读一次,则直接关闭。应该是finish了 self._close_file_handler() updates['finish'] = True if self._quit or self._no_update_count > self.max_no_update: updates = {'finish': True} if self._is_finish_in_meta(): updates['finish'] = True if 'finish' in updates: self._quit = True self.stop() return updates
def _close_file_handler(self): for key in list(self._file_handlers.keys()): handler = self._file_handlers[key] handler.close() self._file_handlers.clear()
[docs] def stop(self): """ 如果手动停止某个任务 :return: """ self._stop_flag = True self._close_file_handler() count = 0 while not self._quit: time.sleep(1) if count > 3: raise RuntimeError("Multi-thread bug here. It should not run twice.") count += 1
[docs] def run(self): """ 重载了多线程的运行函数 :return: """ while time.time() - self._last_access_time < self._wait_seconds and not self._stop_flag and \ self._no_update_count < self.max_no_update: time.sleep(0.5) print(f"Reader:{self.uuid} for log {os.path.basename(self.save_log_dir)} will quit now.") self._quit = True self._close_file_handler()
[docs]class MultiStandbyStepLogReader(threading.Thread): """ 用于multi_chart读取多个log的数据时使用 """ def __init__(self, root_log_dir, logs, uuid, wait_seconds: int = 60, max_no_updates: int = 30): """ :param str root_log_dir: root的loader :param list[str] logs: 具体的log的名称 :param str uuid: 一个独特的uuid :param int wait_seconds: :param int max_no_updates: """ super().__init__() self.log_readers = {} for log_id in logs: self.log_readers[log_id] = StandbyStepLogReader(save_log_dir=os.path.join(root_log_dir, log_id), uuid=uuid, wait_seconds = wait_seconds, max_no_updates = max_no_updates) self._stop_flag = False
[docs] def read_update(self, handler_names=('metric', 'loss')) -> dict: """ 调用这个函数,获取新的更新。如果第一次调用则是读取到当前所有的记录。 :param handler_names: 只check包含在handler_name的内容 :return: 返回值的结构如下 { metric-1: { log_1: [ [value, step, epoch], [] ] } ... } """ results = defaultdict(dict) """ { loss: [dict('step':x, epoch:value, 'loss':{})], metric:[dict('step':x, epoch:value, 'metric':{'SpanFMetric':{}}})], finish:bool(not every time), total_steps:int(only the first access) } """ results['finish_logs'] = [] for log_id in list(self.log_readers.keys()): reader = self.log_readers.get(log_id, None) if reader is None: results['finish_logs'].append(log_id) continue log_res = reader.read_update(only_once=False, handler_names=handler_names) if 'finish' in log_res: results['finish_logs'].append(log_id) for handler_name in handler_names: if handler_name in log_res: res_lst = log_res[handler_name] for _dict in res_lst: sub_dict = _dict[handler_name] step = _dict.get('step') epoch = _dict.get('epoch', -1) # 可能会没有这个值 flat_dict = flatten_dict(handler_name, sub_dict, connector='-') for key, value in flat_dict.items(): if key not in ('metric-epoch', 'metric-step'): if log_id not in results[key]: results[key][log_id] = [] results[key][log_id].append([value, step, epoch]) return results
def run(self) -> None: for key in list(self.log_readers.keys()): self.log_readers[key].start() # 将这个线程开始起来 while len(self.log_readers)>0 and not self._stop_flag: for key in list(self.log_readers.keys()): reader = self.log_readers[key] if reader._quit: self.log_readers.pop(key) time.sleep(0.5) @property def _quit(self): if len(self.log_readers)==0: return True return False def stop(self): for key in list(self.log_readers.keys()): reader = self.log_readers[key] reader.stop() self._stop_flag = True