Browse Source

Tyrell v0.1-dev now under testing

All actions need to be tested, I need to rework it (minor change) to
show_help on startup (so you don't need to ask it first)

I've developed _match_key to unify key and modifier checks

I've developed _key_output (I think I'll rename it to _key_write) which
presses the modifiers then keys, and releases them)

> It will be upto the user to catch custom binds that trigger certain
events in their use cases (like Minecraft '/' or 't' which auto-opens
command or chat)
apollo 4 days ago
parent
commit
b992abf24c
1 changed files with 291 additions and 36 deletions
  1. 291 36
      tyrell.py

+ 291 - 36
tyrell.py

@@ -1,13 +1,11 @@
 
-from sys import argv as _argv
-
 from os import name as _OS_NAME, environ as _env
 from os import listdir as _listdir, mkdir as _mkdir, chown as _chown
 from os.path import exists as _exists, join as _join, isdir as _isdir
-
+from copy import copy as _copy
 from time import sleep as _sleep
 
-from typing import Dict as _Dict, Tuple as _Tuple, List as _List, Optional as _Option, Any as _Any
+from typing import Dict as _Dict, List as _List, Optional as _Option, Any as _Any
 
 try:
     from click import echo as _echo, style as _style
@@ -24,13 +22,16 @@ def _print_warn(msg: str):
 def _print_ok(msg: str):
     _echo(_style(" OK  ", fg="bright_green") + " " + msg)
 
+def _print_on(msg: str):
+    _echo(_style(" ON  ", fg="bright_green") + " " + msg)
+
+def _print_off(msg: str):
+    _echo(_style(" OFF ", fg="bright_red") + " " + msg)
+
 def _print_info(msg: str):
     print("      " + msg)
 
-from pyautogui import LEFT as _LEFT, RIGHT as _RIGHT, MIDDLE as _MIDDLE
-from keyboard import press as _key_down, release as _key_up, KeyboardEvent as _KeyboardEvent, add_hotkey as _add_hotkey, hook as _key_hook, write as _key_write
-
-_BUTTON_MAP: _Dict[str | int, int] = {_LEFT: 1, _MIDDLE:3, _RIGHT: 3, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7}
+from keyboard import press as _key_down, release as _key_up, KeyboardEvent as _KeyboardEvent, hook as _key_hook, unhook as _key_unhook
 
 platform: str = ""
 os_name = _OS_NAME.lower()
@@ -51,39 +52,25 @@ if platform == 'LINUX':
     _display = _Display(_env['DISPLAY'])
     from Xlib.X import ButtonPress as _ButtonPress, ButtonRelease as _ButtonRelease
     from Xlib.ext.xtest import fake_input as _fake_input
-    def _mouse_down(button: str | int=_LEFT):
-        assert button in _BUTTON_MAP.keys(), "button argument not in ('left', 'middle', 'right', 1, 2, 3, 4, 5, 6, 7)"
+    from pyautogui import LEFT as _LEFT, RIGHT as _RIGHT, MIDDLE as _MIDDLE
+    _BUTTON_MAP: _Dict[str, int] = {_LEFT: 1, _MIDDLE:3, _RIGHT: 3}
+    def _mouse_down(button: str=_LEFT):
+        assert button in _BUTTON_MAP.keys(), "button argument not in ('left', 'middle', 'right')"
         btn = _BUTTON_MAP[button]
         _fake_input(_display, _ButtonPress, btn)
         _display.sync()
-    def _mouse_up(button: str | int=_LEFT):
-        assert button in _BUTTON_MAP.keys(), "button argument not in ('left', 'middle', 'right', 1, 2, 3, 4, 5, 6, 7)"
+    def _mouse_up(button: str=_LEFT):
+        assert button in _BUTTON_MAP.keys(), "button argument not in ('left', 'middle', 'right')"
         btn = _BUTTON_MAP[button]
         _fake_input(_display, _ButtonRelease, btn)
         _display.sync()
-    def _click(button: str|int=_LEFT):
-        assert button in _BUTTON_MAP.keys(), "button argument not in ('left', 'middle', 'right', 1, 2, 3, 4, 5, 6, 7)"
-        _mouse_down(button)
-        _mouse_up(button)
 else:
     # We can simply use the mouse module under WINDOWS and MACOS
-    from mouse import press as _mouse_down, release as _mouse_up, click as _click
+    from mouse import press as _mouse_down, release as _mouse_up
 
 from toml import load as _toml_load, TomlDecodeError as _TomlDecodeError
 
-from asyncio import run as _run, sleep as _asleep
-from asyncio.tasks import gather as _task_group
-
-def _keyboard_output(msg: str, delay: int=50, hold: int=20):
-    _key_write(msg, delay=hold)
-    _sleep(delay/1000)
-
-def _mouse_output(button: str, delay: int=50, hold: int=20):
-    _mouse_down(button=button)
-    _sleep(hold / 1000)
-    _mouse_up(button=button)
-    _sleep(hold / 1000)
-    _sleep(delay / 1000)
+from asyncio import run as _run, sleep as _asleep, TaskGroup as _TaskGroup
 
 def _profile_pre_check(profile_name: str):
     if profile_name.endswith(".toml"):
@@ -258,6 +245,74 @@ def _write_example_mirror(profile_name: str):
     if "SUDO_USER" in _env:
         _chown(_join(profile_name, "keys", "example_mirror.toml"), int(_env['SUDO_UID']), int(_env['SUDO_GID']))
 
+def _match_key(key: str, event: _KeyboardEvent) -> bool:
+    # Process modifiers
+    key_mods: _List[str] = []
+    k = _copy(key)
+    if 'shift' in k:
+        k = k.replace('shift+', '')
+        key_mods.append('shift')
+    elif 'ctrl' in k:
+        k = k.replace('ctrl+', '')
+        key_mods.append('ctrl')
+    elif 'alt' in k:
+        k = k.replace('alt+', '')
+        key_mods.append('alt')
+    # Check Modifiers
+    if len(key_mods) == 0 and event.modifiers is not None and len(event.modifiers) != 0:
+        # We register no modifiers
+        # Given modifiers (not the same)
+        return False
+    if len(key_mods) != 0:
+        mods_ok: _Dict[str, bool] = {}
+        for mod in key_mods:
+            mods_ok[mod] = False
+        if len(key_mods) != 0 and event.modifiers is None:
+            # We register modifiers
+            # Given no modifiers (not the same)
+            return False
+        for mod in key_mods:
+            for m in event.modifiers:
+                if mod == m:
+                    mods_ok[mod] = True
+        for mod in mods_ok:
+            val = mods_ok[mod]
+            if val is False:
+                return False
+        if len(k) == 0:
+            # It was only a modifier
+            return True
+    return event.name == k
+
+def _key_output(write: str, delay:float=50.0, hold:float=20.0):
+    if ', ' in write:
+        for part in write.split(', '):
+            if '+' in part:
+                mod, key = part.split('+', maxsplit=1)
+                _key_down(mod)
+                _key_down(key)
+                _sleep(hold / 1000)
+                _key_up(key)
+                _key_up(mod)
+            else:
+                _key_down(part)
+                _sleep(hold / 1000)
+                _key_up(part)
+            _sleep(delay / 1000)
+    else:
+        if '+' in write:
+            mod, key = write.split('+', maxsplit=1)
+            _key_down(mod)
+            _key_down(key)
+            _sleep(hold / 1000)
+            _key_up(key)
+            _key_up(mod)
+        else:
+            _key_down(write)
+            _sleep(hold / 1000)
+            _key_up(write)
+        _sleep(delay / 1000)
+
 class Action:
     # Toggle to determine if this action is 'on' or 'off'
     state: bool
@@ -269,6 +324,8 @@ class Action:
     write: _Option[str]
     # Optional, Repeat X number of ticks
     duration: _Option[int]
+    # Internal record of where the duration is
+    elapsed: _Option[int]
     # Optional, mouse button to send
     button: _Option[str]
     # Optional, Key to mirror
@@ -302,18 +359,101 @@ class Action:
             self.mirror = str(data['mirror'])
         else:
             self.mirror = None
-        if 'duration' in data:
+        if 'duration' in data and data['duration'] is not None:
             self.duration = int(data['duration'])
+            self.elapsed = None
         else:
             self.duration = None
-        if 'delay' in data:
+            self.elapsed = None
+        if 'delay' in data and data['delay'] is not None:
             self.delay = int(data['delay'])
         else:
             self.delay = None
-        if 'hold' in data:
+        if 'hold' in data and data['hold'] is not None:
             self.hold = int(data['hold'])
         else:
             self.hold = None
+    
+    def is_on(self) -> bool:
+        return self.state
+    
+    def toggle(self, on: bool = False, off: bool = False) -> bool:
+        if (not on and not off) or (on and off):
+            self.state = not self.state
+        elif on and not off:
+            self.state = True
+        elif off and not on:
+            self.state = False
+        if self.state is False and self.is_ticker():
+            # Reset elapsed time when set to off
+            if self.elapsed is not None:
+                self.elapsed = None
+        elif self.state is True and self.is_ticker():
+            # Start elapsed time when set to on
+            if self.elapsed is None:
+                self.elapsed = self.duration
+        return self.state
+    
+    def is_ticker(self) -> bool:
+        return self.duration is not None
+    
+    def tick(self) -> bool:
+        if not self.is_ticker():
+            return False
+        if self.elapsed is None:
+            return False
+        self.elapsed -= 1
+        if self.elapsed <= 0:
+            self.elapsed = self.duration
+            return True
+        return False
+    
+    def trigger_key(self, event: _KeyboardEvent) -> bool:
+        return _match_key(self.key, event)
+    
+    def trigger_mirror(self, event: _KeyboardEvent) -> bool:
+        if self.kind != 'mirror' or self.mirror is None:
+            return False
+        return _match_key(self.key, event)
+    
+    def do(self, global_delay: int=50, global_hold: int=20, mirror: str=""):
+        if self.kind == 'key down' and self.write is not None:
+            _key_down(self.write)
+        elif self.kind == 'click down' and self.button is not None:
+            _mouse_down(self.button)
+        elif self.kind == 'key' and self.write is not None:
+            if self.hold is not None:
+                if self.delay is not None:
+                    _key_output(self.write, delay=self.delay, hold=self.hold)
+                else:
+                    _key_output(self.write, delay=global_delay, hold=self.hold)
+            else:
+                if self.delay is not None:
+                    _key_output(self.write, delay=self.delay, hold=global_hold)
+                else:
+                    _key_output(self.write, delay=global_delay, hold=global_hold)
+        elif self.kind == 'click' and self.button is not None:
+            _mouse_down(self.button)
+            if self.hold is not None:
+                _sleep(self.hold / 1000)
+            else:
+                _sleep(global_hold / 1000)
+            _mouse_up(self.button)
+            if self.delay is not None:
+                _sleep(self.delay / 1000)
+            else:
+                _sleep(global_delay / 1000)
+        elif self.kind == 'mirror':
+            if self.write is not None:
+                if mirror == 'down':
+                    _key_down(self.write)
+                elif mirror == 'up':
+                    _key_up(self.write)
+            elif self.button is not None:
+                if mirror == 'down':
+                    _mouse_down(self.button)
+                elif mirror == 'up':
+                    _mouse_up(self.button)
 
 class Profile:
     name: str
@@ -323,7 +463,11 @@ class Profile:
     hold: int
     activator: str
     helper: str
+    _actions: _Dict[str, Action]
+    _state: bool
+    _help_screen: _List[str]
     def __init__(self, name: str):
+        self._state = False
         # Unify name without extension
         if name.endswith(".toml"):
             self.name = name.removesuffix(".toml")
@@ -354,7 +498,7 @@ class Profile:
             }
             self.tick = 50
             self.activator = "`"
-            self.helper = "shift+/"
+            self.helper = "shift+/" # ?
             self.delay = 50
             self.hold = 20
             for key in dat:
@@ -375,6 +519,13 @@ class Profile:
                 elif key == "helper":
                     self.helper = str(val)
         # Locate and "load" actions
+        self._actions = {}
+        self._help_screen = [
+            _style(f"{self.activator:15}", fg='bright_magenta') + " Activator",
+            _style(f"{self.helper:15}", fg='bright_magenta') + " Help",
+            _style(f"{'ctrl+c':15}", fg='bright_red') + " Quit",
+            "",
+        ]
         for entry in _listdir(_join(self.name, "keys")):
             if entry.startswith(".") or _isdir(entry) or not entry.endswith(".toml"):
                 _print_info(f"{_join(self.name, "keys", entry)} => skipped")
@@ -385,15 +536,119 @@ class Profile:
                 data = _toml_load(file)
                 a = Action(data)
                 if a.kind != 'mirror':
-                    _print_ok(f"{file} => '{a.kind}' action, trigger on '{a.key}'")
+                    if a.duration is not None:
+                        if a.duration > 0:
+                            d = int(a.duration*self.tick)
+                            if d < 1000:
+                                self._help_screen.append(_style(f"{a.key:15}", fg='bright_cyan') + f" executes {entry.removesuffix(".toml")} every {a.duration} ticks ({d}ms)")
+                            else:
+                                df = float(d / 1000.0)
+                                self._help_screen.append(_style(f"{a.key:15}", fg='bright_cyan') + f" executes {entry.removesuffix(".toml")} every {a.duration} ticks ({df:0.3}s)")
+                        else:
+                            self._help_screen.append(_style(f"{a.key:15}", fg='bright_cyan') + f" executes {entry.removesuffix(".toml")} every tick ({int(self.tick)}ms)")
+                    else:
+                        self._help_screen.append(_style(f"{a.key:15}", fg='bright_cyan') + f" executes {entry.removesuffix(".toml")}")
                 else:
-                    _print_ok(f"{file} => '{a.kind}' action, trigger on '{a.mirror}' (toggle with '{a.key}')")
+                    self._help_screen.append(_style(f"{a.key:15}", fg='bright_cyan') + f" toggles {entry.removesuffix(".toml")} (Mirrors {_style(f'{a.mirror}', fg='bright_cyan')})")
+                self._actions[entry.removesuffix(".toml")] = a
             except _TomlDecodeError:
                 _print_warn(f"{file} => invalid toml")
 
+    def is_on(self) -> bool:
+        return self._state
+
+    def toggle(self, on: bool=False, off: bool=False) -> bool:
+        if (not on and not off) or (on and off):
+            self._state = not self._state
+        elif on and not off:
+            self._state = True
+        elif off and not on:
+            self._state = False
+        return self._state
+    
+    def show_help(self):
+        for line in self._help_screen:
+            _echo(line)
+
+    def _trigger_activator(self, event: _KeyboardEvent) -> bool:
+        return _match_key(self.activator, event)
+    
+    def _trigger_helper(self, event: _KeyboardEvent) -> bool:
+        return _match_key(self.helper, event)
+        
+    def _callback(self, event: _KeyboardEvent):
+        # Quickly process CTRL+C as quit
+        if event.modifiers is not None:
+            if 'ctrl' in event.modifiers:
+                if event.name == 'c':
+                    return
+        # Process activator and helper (These occur before actions defined)
+        if event.event_type == 'up':
+            if self._trigger_activator(event):
+                if self.toggle():
+                    _print_on("Tyrell")
+                else:
+                    _print_off("Tyrell")
+                return
+            elif self._trigger_helper(event) and self.is_on():
+                self.show_help()
+                self.toggle(off=True)
+                return
+        # Process actions defined
+        for act in self._actions:
+            a = self._actions[act]
+            if a.trigger_key(event) and event.event_type == 'up' and self.is_on():
+                if a.kind != 'mirror' and not a.is_ticker():
+                    #_print_ok(f"{act} => '{a.kind}' action, trigger on '{a.key}'")
+                    a.do(self.delay, self.hold)
+                elif a.kind == 'mirror' or a.is_ticker():
+                    if a.toggle():
+                        _print_on(f"{act}")
+                    else:
+                        _print_off(f"{act}")
+                    #_print_ok(f"{act} => '{a.kind}' action, trigger on '{a.mirror}' (toggle with '{a.key}')")
+                self.toggle(off=True)
+                return
+            elif a.trigger_mirror(event):
+                if a.is_on() and event.event_type is not None:
+                    #_print_warn(f"{act} => mirror '{a.mirror}' is {event.event_type}")
+                    a.do(self.delay, self.hold, mirror=event.event_type)
+
+    async def _ticker(self):
+        while True:
+            for act in self._actions:
+                a = self._actions[act]
+                if a.tick():
+                    #_print_warn(f"{act} => tick event fired")
+                    a.do(self.delay, self.hold)
+            await _asleep(self.tick / 1000)
+
+    async def run(self):
+        _h = _key_hook(self._callback)
+        # I'm not sure why if I don't have any async TaskGroup
+        # We get some ugly errors
+        """
+        Task was destroyed but it is pending!
+        task: <Task pending name='Task-3' coro=<BaseEventLoop.shutdown_default_executor() running at /usr/lib/python3.12/asyncio/base_events.py:586>>
+        tyrell.py:607: RuntimeWarning: coroutine 'BaseEventLoop.shutdown_default_executor' was never awaited
+        RuntimeWarning: Enable tracemalloc to get the object allocation traceback
+
+        OR
+
+        tyrell.py:571: RuntimeWarning: coroutine 'BaseEventLoop.shutdown_default_executor' was never awaited
+        RuntimeWarning: Enable tracemalloc to get the object allocation traceback
+        """
+        async with _TaskGroup() as background:
+            background.create_task(self._ticker())
+        _key_unhook(_h)
+
 if __name__ == "__main__":
     if not _exists("_example"):
         _print_warn("Missing '_example' profile, creating...")
         Profile("_example")
         _print_ok("")
     p = Profile("_test")
+    try:
+        _run(p.run())
+    except KeyboardInterrupt:
+        print()