#!/opt/venv/bin/python3
# 파일명 : yy_cmd.py
# 위치   : /var/www/html/y/yy_cmd.py
# 버전   : 2.1
# 용도   : yyy_cmd.php 의 xterm.js 와 연결되는 PTY WebSocket 서버
# 실행   : python3 /var/www/html/y/yy_cmd.py --host 127.0.0.1 --port 8765

import argparse
import asyncio
import json
import os
import re
import pty
import shlex
import signal
import struct
import sys
import termios
from contextlib import suppress
from pathlib import Path
from urllib.parse import parse_qs, urlsplit

try:
    from websockets.asyncio.server import serve
except Exception:
    try:
        from websockets import serve
    except Exception:
        try:
            from websockets.legacy.server import serve
        except Exception:
            print('websockets 모듈이 필요합니다.')
            print('설치: python3 -m pip install websockets')
            raise


DEFAULT_SHELL = os.environ.get('SHELL') or '/bin/bash'
DEFAULT_CWD = str(Path.home())
TMUX_DEFAULT_SESSION_NAME = 'yy'
TMUX_SESSION_NAME_RE = re.compile(r'^yy(?:[1-9][0-9]*)?$')


def ws_request_path(websocket, path=None) -> str:
    if isinstance(path, str) and path:
        return path

    raw = getattr(websocket, 'path', None)
    if isinstance(raw, str) and raw:
        return raw

    req = getattr(websocket, 'request', None)
    raw = getattr(req, 'path', None)
    if isinstance(raw, str) and raw:
        return raw

    return ''


def normalize_tmux_session_name(value: str) -> str:
    name = str(value or '').strip()
    if not name:
        return TMUX_DEFAULT_SESSION_NAME
    if not TMUX_SESSION_NAME_RE.match(name):
        return TMUX_DEFAULT_SESSION_NAME
    return name


def ws_tmux_info(websocket, path=None) -> tuple[bool, str]:
    raw_path = ws_request_path(websocket, path)
    if not raw_path:
        return False, TMUX_DEFAULT_SESSION_NAME

    try:
        qs = parse_qs(urlsplit(raw_path).query)
    except Exception:
        return False, TMUX_DEFAULT_SESSION_NAME

    v = str((qs.get('tmux') or ['0'])[0]).strip().lower()
    use_tmux = v in ('1', 'true', 'yes', 'on', 'y')
    tmux_name = normalize_tmux_session_name((qs.get('tmux_name') or [TMUX_DEFAULT_SESSION_NAME])[0])

    return use_tmux, tmux_name


def ws_use_tmux(websocket, path=None) -> bool:
    use_tmux, _tmux_name = ws_tmux_info(websocket, path)
    return use_tmux


def build_startup_cmd(use_tmux: bool, tmux_name: str = TMUX_DEFAULT_SESSION_NAME) -> str:
    if not use_tmux:
        return ''

    sess = shlex.quote(normalize_tmux_session_name(tmux_name))
    return (
        f'tmux has-session -t {sess} 2>/dev/null || '
        f'tmux new-session -d -s {sess}; '
        f'exec tmux attach-session -t {sess}'
    )


def set_winsize(fd: int, rows: int, cols: int) -> None:
    rows = max(5, int(rows or 24))
    cols = max(20, int(cols or 80))
    packed = struct.pack('HHHH', rows, cols, 0, 0)
    fcntl_ioctl(fd, termios.TIOCSWINSZ, packed)


def fcntl_ioctl(fd: int, cmd: int, arg) -> None:
    import fcntl
    fcntl.ioctl(fd, cmd, arg)


class PtyBridge:
    def __init__(self, shell_path: str, cwd: str, startup_cmd: str = '') -> None:
        self.shell_path = shell_path
        self.cwd = cwd
        self.startup_cmd = startup_cmd or ''
        self.pid = None
        self.fd = None

    def spawn(self, rows: int = 30, cols: int = 120) -> None:
        pid, fd = pty.fork()
        if pid == 0:
            try:
                os.chdir(self.cwd)
            except Exception:
                pass

            env = os.environ.copy()
            env.setdefault('TERM', 'xterm-256color')
            env.setdefault('COLORTERM', 'truecolor')
            env.setdefault('LANG', 'C.UTF-8')
            env.setdefault('LC_ALL', 'C.UTF-8')

            shell = self.shell_path or DEFAULT_SHELL

            if self.startup_cmd:
                if shell.endswith('bash') or shell.endswith('zsh'):
                    argv = [shell, '-lc', self.startup_cmd]
                else:
                    argv = [shell, '-c', self.startup_cmd]
            else:
                argv = [shell]
                if shell.endswith('bash') or shell.endswith('zsh'):
                    argv.append('-il')
                else:
                    argv.append('-i')

            try:
                os.execvpe(shell, argv, env)
            except Exception as exc:
                print(f'shell exec failed: {exc}', file=sys.stderr)
                os._exit(1)

        self.pid = pid
        self.fd = fd
        set_winsize(fd, rows, cols)
        os.set_blocking(fd, False)

    async def read_chunk(self) -> bytes:
        if self.fd is None:
            return b''

        loop = asyncio.get_running_loop()
        fut = loop.create_future()

        def _reader() -> None:
            try:
                data = os.read(self.fd, 4096)
            except BlockingIOError:
                return
            except OSError:
                data = b''
            if not fut.done():
                fut.set_result(data)

        loop.add_reader(self.fd, _reader)
        try:
            data = await fut
            return data
        finally:
            with suppress(Exception):
                loop.remove_reader(self.fd)

    def write(self, data: bytes) -> None:
        if self.fd is None:
            return
        if not data:
            return
        os.write(self.fd, data)

    def resize(self, rows: int, cols: int) -> None:
        if self.fd is None:
            return
        set_winsize(self.fd, rows, cols)

    def close(self) -> None:
        if self.fd is not None:
            with suppress(Exception):
                os.close(self.fd)
            self.fd = None

        if self.pid:
            with suppress(Exception):
                os.kill(self.pid, signal.SIGHUP)
            with suppress(Exception):
                os.waitpid(self.pid, 0)
            self.pid = None


async def pump_pty_to_ws(websocket, bridge: PtyBridge) -> None:
    while True:
        data = await bridge.read_chunk()
        if not data:
            break
        await websocket.send(data.decode('utf-8', errors='ignore'))


async def pump_ws_to_pty(websocket, bridge: PtyBridge) -> None:
    async for message in websocket:
        if isinstance(message, bytes):
            bridge.write(message)
            continue

        msg = (message or '').strip()
        if not msg:
            continue

        try:
            payload = json.loads(msg)
        except Exception:
            bridge.write(msg.encode('utf-8', errors='ignore'))
            continue

        msg_type = str(payload.get('type') or '').strip().lower()

        if msg_type == 'input':
            data = str(payload.get('data') or '')
            bridge.write(data.encode('utf-8', errors='ignore'))
            continue

        if msg_type == 'resize':
            cols = int(payload.get('cols') or 80)
            rows = int(payload.get('rows') or 24)
            bridge.resize(rows, cols)
            continue


async def ws_handler(websocket, path=None) -> None:
    raw_path = ws_request_path(websocket, path)
    use_tmux, tmux_name = ws_tmux_info(websocket, path)
    startup_cmd = build_startup_cmd(use_tmux, tmux_name)

    print(
        f'[yy_cmd] raw_path={raw_path!r} use_tmux={use_tmux} tmux_name={tmux_name!r} startup_cmd={startup_cmd!r}',
        file=sys.stderr,
        flush=True,
    )

    bridge = PtyBridge(
        shell_path=ARGS.shell,
        cwd=ARGS.cwd,
        startup_cmd=startup_cmd,
    )

    try:
        bridge.spawn(rows=30, cols=120)
    except Exception as exc:
        print(f'[yy_cmd] bridge.spawn failed: {exc}', file=sys.stderr, flush=True)
        raise

    producer = asyncio.create_task(pump_pty_to_ws(websocket, bridge))
    consumer = asyncio.create_task(pump_ws_to_pty(websocket, bridge))

    done, pending = await asyncio.wait(
        {producer, consumer},
        return_when=asyncio.FIRST_COMPLETED,
    )

    for task in pending:
        task.cancel()
        with suppress(Exception):
            await task

    bridge.close()


async def main() -> None:
    print(f'yy_cmd.py listen: ws://{ARGS.host}:{ARGS.port}')
    async with serve(
        ws_handler,
        ARGS.host,
        ARGS.port,
        max_size=None,
        ping_interval=20,
        ping_timeout=20,
    ):
        await asyncio.Future()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description='yyy_cmd xterm PTY WebSocket server')
    parser.add_argument('--host', default='127.0.0.1')
    parser.add_argument('--port', type=int, default=8765)
    parser.add_argument('--shell', default=DEFAULT_SHELL)
    parser.add_argument('--cwd', default=DEFAULT_CWD)
    return parser.parse_args()


ARGS = parse_args()


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('\nbye')