Source code for ReFreSH.MobileSuit.Core.Services.IOHub

import sys
from abc import ABC
from datetime import datetime
from typing import TextIO, List
from pyasn1.type import char

from ....ConsoleColor import ConsoleColor
from ..ColorSetting import IColorSetting
from .PromptFormatter import *
from ...IIOHub import IIOHub, IIOHubConfigurator, IOOptions
from ...OutputType import OutputType
from ...PrintUnit import PrintUnit

[docs] class IOHub(IIOHub, ABC): """An entity, which serves the input/output of a mobile suit.""" def __init__(self, promptFormatter: PromptFormatter, configurator: IIOHubConfigurator): """Initialize a IOServer.""" self._Options: IOOptions = IOOptions.NoFlag self.ColorSetting = IColorSetting.DefaultColorSetting() self.Input = sys.stdin self.Output = sys.stdout self.ErrorStream = sys.stderr self.FormatPrompt = promptFormatter if configurator is not None: configurator(self) self.Prefix = [] @property def ColorSetting(self) -> IColorSetting: return self._ColorSetting @ColorSetting.setter def ColorSetting(self, value: IColorSetting) -> None: self._ColorSetting = value @property def FormatPrompt(self) -> PromptFormatter: return self._FormatPrompt @FormatPrompt.setter def FormatPrompt(self, value: PromptFormatter) -> None: self._FormatPrompt = value @property def Input(self) -> TextIO: return self._Input @Input.setter def Input(self, value: TextIO) -> None: self._Input = value @property def IsInputRedirected(self) -> bool: return sys.stdin is not self.Input
[docs] def ResetInput(self) -> None: self.Input = sys.stdin
[docs] def ReadLinePrimary(self) -> str: return self.Input.readline()
[docs] def Peek(self) -> char: next_char = - 1) return next_char
[docs] def Read(self) -> char: return
[docs] def ReadToEnd(self) -> str: return '\n'.join(self.Input.readlines())
@property def Options(self) -> IOOptions: return self._Options @Options.setter def Options(self, value: IOOptions) -> None: self._Options = value @property def IsErrorRedirected(self) -> bool: return sys.stderr is not self.ErrorStream @property def IsOutputRedirected(self) -> bool: return sys.stdout is not self.Output @property def ErrorStream(self) -> TextIO: return self._ErrorStream @ErrorStream.setter def ErrorStream(self, value: TextIO) -> None: self._ErrorStream = value @property def Output(self) -> TextIO: return self._Output @Output.setter def Output(self, value: TextIO) -> None: self._Output = value
[docs] def ResetError(self) -> None: self.ErrorStream = sys.stderr
[docs] def ResetOutput(self) -> None: self.Output = sys.stdout
[docs] def AppendWriteLinePrefixPrimary(self, prefix: Iterable) -> None: self.AppendWriteLinePrefixInternal(PrintUnit.FromIterable(prefix))
[docs] def AppendWriteLinePrefixInternal(self, prefix: PrintUnit) -> None: self.Prefix.append(prefix)
[docs] def SubtractWriteLinePrefix(self) -> None: self.Prefix.pop()
[docs] def ClearWriteLinePrefix(self) -> None: self.Prefix.clear()
[docs] def WritePrimary(self, content: Iterable) -> None: self.WriteInternal(PrintUnit.FromIterable(content))
[docs] def WriteInternal(self, content: PrintUnit) -> None: if content.Foreground is not None: (R, G, B) = content.Foreground.rgb self.Output.write(f"\u001b[38;2;{R};{G};{B}m") if content.Background is not None: (R, G, B) = content.Background self.Output.write(f"\u001b[48;2;{R};{G};{B}m") self.Output.write(content.Text) if content.Foreground is not None or content.Background is not None: self.Output.write("\u001b[0m")
[docs] def GetLinePrefix(self, otype: OutputType) -> List[PrintUnit]: if IOOptions.DisableLinePrefix not in self.Options: return self.Prefix elif IOOptions.DisableTag in self.Options: return [] else: sb = [IIOHub.GetLabel(otype)] # AppendTimeStamp(sb) return [PrintUnit(''.join(sb), None)]
[docs] @staticmethod def AppendTimeStamp(sb: List[str]) -> None: sb.append('[') sb.append( sb.append(']')
[docs] class PureTextIOHub(IOHub): """IO hub with pure text output.""" def __init__(self, promptFormatter: PromptFormatter, configurator: IIOHubConfigurator): """Initialize a IOhub.""" super().__init__(promptFormatter, configurator)
[docs] def WritePrimary(self, content: Iterable) -> None: """Write content to the output stream.""" self.Output.write(PrintUnit.FromIterable(content).Text)
[docs] class IOHub4Bit(IOHub): """IO hub using 4-bit color output.""" def __init__(self, promptFormatter: PromptFormatter, configurator: IIOHubConfigurator): """Initialize a IOhub.""" super().__init__(promptFormatter, configurator)
[docs] @staticmethod def BackgroundCodeOf(c: Color) -> int: return 10 + IOHub4Bit.ForegroundCodeOf(c)
[docs] @staticmethod def ForegroundCodeOf(c: Color) -> int: return { ConsoleColor.Black: 30, ConsoleColor.DarkBlue: 34, ConsoleColor.DarkGreen: 32, ConsoleColor.DarkCyan: 36, ConsoleColor.DarkRed: 31, ConsoleColor.DarkMagenta: 35, ConsoleColor.DarkYellow: 33, ConsoleColor.Gray: 90, ConsoleColor.DarkGray: 37, ConsoleColor.Blue: 94, ConsoleColor.Green: 92, ConsoleColor.Cyan: 96, ConsoleColor.Red: 91, ConsoleColor.Magenta: 95, ConsoleColor.Yellow: 93, ConsoleColor.White: 97 }[IOHub4Bit.ConsoleColorOf(c)]
[docs] @staticmethod def ConsoleColorOf(color: Color) -> ConsoleColor: (r, g, b) = color.rgb delta = float("inf") re = None for cc in ConsoleColor: c = PrintUnit.ConsoleColorCast(cc) (cr,cg,cb) =c.rgb t = (cr - r) ** 2 + (cg - g) ** 2 + (cb - b) ** 2 if t == 0: return cc if t < delta: delta = t re = cc return re
[docs] def WriteInternal(self, content: PrintUnit) -> None: """Write content to the output stream.""" if content.Foreground is not None: f = content.Foreground self.Output.write(f"\u001b[{self.ForegroundCodeOf(f)}m") if content.Background is not None: b = content.Background self.Output.write(f"\u001b[{self.BackgroundCodeOf(b)}m") self.Output.write(content.Text) if content.Foreground is not None or content.Background is not None: self.Output.write("\u001b[0m")