본문 바로가기

개발하자

Python 및 SetWindows로 낮은 수준의 키보드 후크 적용후크엑스에이

반응형

Python 및 SetWindows로 낮은 수준의 키보드 후크 적용후크엑스에이

그래서 파이썬을 이용해서 글로벌 키보드 후크를 등록하는 방법을 찾고 있어요. 내가 읽은 바로는 DLL에 콜백이 없는 것은 괜찮은 것 같다. WH_KEYboard_LL을 사용하는 경우. 나는 그것을 확실히 확인할 수는 없지만 나는 내가 말을 하려고 하면 나처럼 1428 오류가 발생하지 않는다는 것이 고무적이라고 생각한다.

나는 후크 핸들을 얻었지만 예상했던 대로 키보드의 버튼을 누르면 아무것도 나타나지 않는다.

내 콜백이 왜 안 걸려오는지 알아? 아니면 이게 가능하긴 해?

관련 코드:

import time
import string
import ctypes
import functools
import atexit
import pythoncom
from ctypes import windll

hookID = 0

class Keyboard(object):

    KEY_EVENT_DOWN = 0
    KEY_EVENT_UP = 2

    KEY_ENTER = 2
    KEY_SHIFT = 16
    KEY_SPACE = 32

    HOOK_ACTION = 13
    HOOK_KEYBOARD = 13
    HOOK_KEYDOWN = 0x100
    HOOK_KEYUP = 0x101

    class Hook:
        '''Holds general hook information'''
        def __init__(self):
            self.hook = 0
            self.struct = None            

    class HookStruct(ctypes.Structure):
        '''Structure that windows returns for keyboard events'''
        __fields__ = [
            ('keycode', ctypes.c_long),
            ('scancode', ctypes.c_long),
            ('flags', ctypes.c_long),
            ('time', ctypes.c_long),
            ('info', ctypes.POINTER(ctypes.c_ulong))
        ]

    def ascii_to_keycode(self, char):
        return windll.user32.VkKeyScanA(ord(char))

    def inject_key_down(self, keycode):
        scancode = windll.user32.MapVirtualKeyA(keycode, 0)
        windll.user32.keybd_event(keycode, scancode, Keyboard.KEY_EVENT_DOWN, 0)

    def inject_key_up(self, keycode):
        scan = windll.user32.MapVirtualKeyA(keycode, 0)
        windll.user32.keybd_event(keycode, scan, Keyboard.KEY_EVENT_UP, 0)

    def inject_key_press(self, keycode, pause=0.05):
        self.inject_key_down(keycode)
        time.sleep(pause)
        self.inject_key_up(keycode)

    def inject_sequence(self, seq, pause=0.05):
        for key in seq:
            if key == ' ':
                self.inject_key_press(Keyboard.KEY_SPACE, pause)
            elif key == '\n':
                self.inject_key_press(Keyboard.KEY_ENTER, pause)
            else:
                if key in string.ascii_uppercase:
                    self.inject_key_down(Keyboard.KEY_SHIFT)
                    self.inject_key_press(self.ascii_to_keycode(key), pause)
                    self.inject_key_up(Keyboard.KEY_SHIFT)
                else:
                    self.inject_key_press(self.ascii_to_keycode(key), pause)

    def _win32_copy_mem(self, dest, src):
        src = ctypes.c_void_p(src)
        windll.kernel32.RtlMoveMemory(ctypes.addressof(dest), src, ctypes.sizeof(dest))

    def _win32_get_last_error(self):
        return windll.kernel32.GetLastError()

    def _win32_get_module(self, mname):
        return windll.kernel32.GetModuleHandleA(mname)

    def _win32_call_next_hook(self, id, code, wparam, lparam):
        return windll.kernel32.CallNextHookEx(id, code, wparam, lparam)

    def _win32_set_hook(self, id, callback, module, thread):
        callback_decl = ctypes.WINFUNCTYPE(ctypes.c_long, ctypes.c_long, ctypes.c_long, ctypes.c_long)
        return windll.user32.SetWindowsHookExA(id, callback_decl(callback), module, thread)

    def _win32_unhook(self, id):
        return windll.user32.UnhookWindowsHookEx(id)

    def keyboard_event(self, data):
        print data.scancode
        return False

    def capture_input(self):

        self.hook = Keyboard.Hook()
        self.hook.struct = Keyboard.HookStruct()

        def low_level_keyboard_proc(code, event_type, kb_data_ptr):
            # win32 spec says return result of CallNextHookEx if code is less than 0
            if code < 0:
                return self._win32_call_next_hook(self.hook.hook, code, event_type, kb_data_ptr)

            if code == Keyboard.HOOK_ACTION:
                # copy data from struct into Python structure
                self._win32_copy_mem(self.hook.struct, kb_data_ptr)

                # only call other handlers if we return false from our handler - allows to stop processing of keys
                if self.keyboard_event(self.hook.struct):
                    return self._win32_call_next_hook(self.hook.hook, code, event_type, kb_data_ptr)

        # register hook 
        try:          
            hookId = self.hook.hook = self._win32_set_hook(Keyboard.HOOK_KEYBOARD, low_level_keyboard_proc, self._win32_get_module(0), 0)
            if self.hook.hook == 0:
                print 'Error - ', self._win32_get_last_error()
            else:
                print 'Hook ID - ', self.hook.hook

        except Exception, error:
            print error

        # unregister hook if python exits
        atexit.register(functools.partial(self._win32_unhook, self.hook.hook))

    def end_capture(self):
        if self.hook.hook:
            return self._win32_unhook(self.hook.hook)


kb = Keyboard()#kb.inject_sequence('This is a test\nand tHis is line 2')
kb.capture_input()
pythoncom.PumpMessages()
kb.end_capture()



파이썬으로 해본 적은 없지만, 네, 로우 레벨 키보드나 마우스 후크는 가능할 거예요. 다른 후크 유형의 경우 후크 함수는 dll이어야 합니다.

HOOK_ACTION은 13이 아니라 0이어야 합니다.




당신의 수업을 진행시킬 수는 없었지만, 같은 목표를 달성하기 위한 비슷한 방법을 찾았습니다.

수정된 코드는 다음과 같습니다:

from collections import namedtuple

KeyboardEvent = namedtuple('KeyboardEvent', ['event_type', 'key_code',
                                             'scan_code', 'alt_pressed',
                                             'time'])

handlers = []

def listen():
    """
    Calls `handlers` for each keyboard event received. This is a blocking call.
    """
    # Adapted from http://www.hackerthreads.org/Topic-42395
    from ctypes import windll, CFUNCTYPE, POINTER, c_int, c_uint, c_void_p, byref
    import win32con, win32api, win32gui, atexit

    event_types = {win32con.WM_KEYDOWN: 'key down',
                   win32con.WM_KEYUP: 'key up',
                   0x104: 'key down', # WM_SYSKEYDOWN, used for Alt key.
                   0x105: 'key up', # WM_SYSKEYUP, used for Alt key.
                  }

    def low_level_handler(nCode, wParam, lParam):
        """
        Processes a low level Windows keyboard event.
        """
        event = KeyboardEvent(event_types[wParam], lParam[0], lParam[1],
                              lParam[2] == 32, lParam[3])
        for handler in handlers:
            handler(event)

        # Be a good neighbor and call the next hook.
        return windll.user32.CallNextHookEx(hook_id, nCode, wParam, lParam)
       
    # Our low level handler signature.
    CMPFUNC = CFUNCTYPE(c_int, c_int, c_int, POINTER(c_void_p))
    # add argtypes for 64-bit Python compatibility (per @BaiJiFeiLong)
    windll.user32.SetWindowsHookExW.argtypes = (
        c_int,
        c_void_p, 
        c_void_p,
        c_uint
    )
    # Convert the Python handler into C pointer.
    pointer = CMPFUNC(low_level_handler)

    # Hook both key up and key down events for common keys (non-system).
    hook_id = windll.user32.SetWindowsHookExA(win32con.WH_KEYBOARD_LL, pointer,
                                             win32api.GetModuleHandle(None), 0)

    # Register to remove the hook when the interpreter exits. Unfortunately a
    # try/finally block doesn't seem to work here.
    atexit.register(windll.user32.UnhookWindowsHookEx, hook_id)

    while True:
        msg = win32gui.GetMessage(None, 0, 0)
        win32gui.TranslateMessage(byref(msg))
        win32gui.DispatchMessage(byref(msg))

if __name__ == '__main__':
    def print_event(e):
        print(e)

    handlers.append(print_event)
    listen()

이것을 포장하기 위해 고급 라이브러리를 만들었습니다.




팀의 원래 코드가 작동하지 않은 이유는 ctypes 함수 포인터가 가비지 수집되었기 때문에 그의 콜백이 무효가 되어 호출되지 않았기 때문이다. 그냥 조용히 실패한 거예요.

윈도우는 파이썬 포인터를 보유하고 있지 않기 때문에 우리는 윈도우 설정에 전달되는 정확한 ctype 함수 포인터 파라미터에 대한 참조를 별도로 보유해야 한다후크엑스.




낮은 수준의 후크를 구현한 방법은 다음과 같습니다. 누군가가 유용하다고 생각한다면 나는 기쁠 것이다. 행운을 빕니다.

print('__file__={0:<35} | __name__={1:<25} | __package__={2:<25}'.format(__file__,__name__,str(__package__)))
import ctypes
import win32con
import win32api
import win32gui
import atexit
from collections import namedtuple
import threading
import queue
import time

class InputListener:
    """
    https://gist.github.com/Amgarak/5df8477bad67dabbc491322e74ce1c2c
    
    This class is designed for listening to keyboard and mouse events in a Windows environment. 
    It utilizes low-level hooks to capture events such as key presses, 
    key releases, mouse movements, and mouse button clicks. 
    The class provides functionalities to register event handlers and filters, 
    allowing custom processing of events. 
    It also supports the control of threads for concurrent event handling.
    """
    def __init__(self):
        self.flagRun=False
        self.signal = queue.Queue()
        
        self._key_down = set()
        
        self.if_handlers = set()
        self.handlers = set()
        
        self.listener_queues = {}
        self.listeners =[]

        self.variable_key_down = {}

        self.event_types = {
                    win32con.WM_KEYDOWN: 'key_down', # 0x100: 'key down', - WM_KeyDown for normal keys - 256
                    win32con.WM_KEYUP: 'key_up', # 0x101: 'key up', - WM_KeyUp for normal keys - 257 
                    0x104: 'key_down',  # WM_SYSKEYDOWN, used for Alt key - 260
                    0x105: 'key_up'}  # WM_SYSKEYUP, used for Alt key - 261
        
        self.mouse_types={
                    0x200: 'move', # WM_MOUSEMOVE - 512 
                    0x20A: 'wheel', # WM_MOUSEWHEEL - 522 - scan_code_top: 7864320; scan_code_bot: 4287102976
                    0x20E: 'H_wheel', # WM_MOUSEHWHEEL - 526 
                    0x204: 'key_down', # WM_RBUTTONDOWN - 516
                    0x205: 'key_up', # WM_RBUTTONUP - 517
                    0x201: 'key_down', # WM_LBUTTONDOWN - 513
                    0x202: 'key_up', # WM_LBUTTONUP - 514
                    0x207: 'key_down', # WM_MBUTTONDOWN - 519
                    0x208: 'key_up', # WM_MBUTTONUP - 520
                    0x20B: 'key_down', # WM_XBUTTONDOWN - 523 - scan_code: 131072; scan_code: 65536
                    0x20C: 'key_up'} # WM_XBUTTONUP - 524 - scan_code: 131072; scan_code: 65536

        self.mouse_key={
                    0x200: 'move',  
                    0x20A: 'wheel', 
                    7864320: 'wheel_top', 
                    4287102976: 'wheel_bot', 
                    0x20E: 'H_wheel',  
                    0x204: 'right', 
                    0x205: 'right', 
                    0x201: 'left', 
                    0x202: 'left', 
                    0x207: 'middle', 
                    0x208: 'middle', 
                    131072: 'X_Button_1', 
                    65536: 'X_Button_2'}       

        self.DeviceEvent = namedtuple('DeviceEvent', ['inputDevice', 'event_type', 'key_code', 'x', 'y', 'scan_code', 'key_down'])
        self.DeviceEvent.__new__.__defaults__ = (None, None, None, None, None, None, None)
        
        self.default_key = "Value not defined"
        
    def key_event(self, event):
        while not self.signal.empty():
            self.signal_handler(self.signal.get())

        for listener_queue in self.listener_queues.values():
            listener_queue.put(event)
            
    def key_down(self, event_type, key_code):
        key = str(key_code)

        if key not in self._key_down and event_type == 'key_down':
            self._key_down.add(key)
            return True
        
        elif key in self._key_down and event_type == 'key_up':
            self._key_down.remove(key)
            return True
        
    def if_block(self, event):
        for if_handler in self.if_handlers:
            result = if_handler(event)
            if result is not None and not result:
                return False
        return True

    def keyboard_low_level_handler(self, nCode, wParam, lParam):
        lParam = ctypes.cast(lParam, ctypes.POINTER(ctypes.c_ulong)).contents.value
        
        rezult = self.key_down(self.event_types[wParam], lParam)
        event = self.DeviceEvent('Keyboard', self.event_types[wParam], lParam, key_down=self._key_down)
        
        if rezult: # !!! - Отсекаем повторяющийся ивент key_down
            self.key_event(event)
        
        if self.if_block(event):   
            return ctypes.windll.user32.CallNextHookEx(self.keyboard_hook_id, nCode, wParam, lParam)
        else:
            return -1
               
    def mouse_low_level_handler(self, nCode, wParam, lParam):
        point = ctypes.cast(lParam, ctypes.POINTER(ctypes.c_long * 2)).contents
        
        x = point[0]
        y = point[1]
        
        castomParam = self.mouse_key.get(lParam[1], self.default_key) if lParam[1] is not None else self.mouse_key[wParam]
        self.key_down(self.mouse_types[wParam], castomParam)
        
        event = self.DeviceEvent('Mouse', self.mouse_types[wParam], castomParam, x, y, lParam[1], self._key_down)
        self.key_event(event)
        
        if self.if_block(event):   
            return ctypes.windll.user32.CallNextHookEx(self.mouse_hook_id, nCode, wParam, lParam)
        else:
            return -1
    
    def listen(self):
        CMPFUNC = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_void_p))
        ctypes.windll.user32.SetWindowsHookExW.argtypes = (
            ctypes.c_int,
            CMPFUNC,
            ctypes.c_void_p,
            ctypes.c_uint
        )

        keyboard_pointer = CMPFUNC(self.keyboard_low_level_handler)
        self.keyboard_hook_id = ctypes.windll.user32.SetWindowsHookExW(win32con.WH_KEYBOARD_LL, keyboard_pointer,
                                                                       win32api.GetModuleHandle(None), 0)

        mouse_pointer = CMPFUNC(self.mouse_low_level_handler)
        self.mouse_hook_id = ctypes.windll.user32.SetWindowsHookExW(win32con.WH_MOUSE_LL, mouse_pointer,
                                                                    win32api.GetModuleHandle(None), 0)

        atexit.register(ctypes.windll.user32.UnhookWindowsHookEx, self.keyboard_hook_id)
        atexit.register(ctypes.windll.user32.UnhookWindowsHookEx, self.mouse_hook_id)

        win32gui.GetMessage(None, 0, 0)
        print(f"Hooks завершил свою работу..")
            
    def add_if_handler(self, func): # Регестрируем колбэк-фильтров
        self.if_handlers.add(func) 
            
    def add_handler(self, func): # Регестрируем колбэк-обработчиков
        self.handlers.add(func) 
        
    def remove_if_handler(self, func): # Удаление колбэк-фильтров
        self.if_handlers.discard(func) 
            
    def remove_handler(self, func): # Удаление колбэк-обработчиков
        self.handlers.discard(func) 
        self.variable_key_down.pop(func.__name__, None)
        
        
    def create_listener_queues(self):
        self.listener_queues = {func.__name__: queue.Queue() for func in self.handlers}
     
    def create_listener(self):        
        self.listeners = [
        threading.Thread(target=self.wrapper, args=(self.listener_queues[func.__name__], func), name=func.__name__)
        for func in self.handlers
    ]
        
    def start_listener(self):
        for listener_thread in self.listeners:
            listener_thread.daemon = True
            listener_thread.start()
      
    def wrapper(self, listener_queue, func):
        self.variable_key_down[func.__name__] = set()
        while True:
            event = listener_queue.get()
            if event is None:
                break
            
            self.variable_key_down[func.__name__].clear()
            self.variable_key_down[func.__name__].update(event.key_down)
            event = event._replace(key_down=self.variable_key_down.get(func.__name__, None))
            
            func(event) 
            
        print(f"Демон {func.__name__} завершил свою работу..")
            
    def start(self, use_daemon = False):
        self.create_listener_queues()
        self.create_listener()
        self.start_listener()
        
        self.flagRun=True
        # Создаем новый поток и передаем в него функцию self.listen() - без скобок! 
        # Мы передаем саму функцию в качестве объекта. В этом случае, функция не вызывается немедленно.
        thread = threading.Thread(target=self.listen, name="Hooks")
        thread.daemon = use_daemon # Включить при использовании GUI 
        thread.start() # Запускаем Hooks в отдельном потоке
        self.thread_id = thread.native_id # Получаем дескриптор потока, используя атрибут native_id объекта threading.Thread
   
    def stop(self): # Останавливаем слушатели\хуки
        for listener_queue in self.listener_queues.values():
            listener_queue.put(None)
        
        self.flagRun=False
        ctypes.windll.user32.UnhookWindowsHookEx(self.keyboard_hook_id)
        ctypes.windll.user32.UnhookWindowsHookEx(self.mouse_hook_id)
        
        self.listener_queues.clear()
        self.listeners.clear()
        self._key_down.clear()
        self.variable_key_down.clear()
        
        user32 = ctypes.windll.user32
        WM_QUIT = 0x0012
        user32.PostThreadMessageW(self.thread_id, WM_QUIT, 0, 0) # Используя дискриптор потока, отправляем сообщение WM_QUIT 
                                                            # Для остановки win32gui.GetMessage(None, 0, 0)    

    def hot_removal_handler(self, func): # Горячее удаление колбэк-обработчиков
        signal = {"hot_removal_handler": func}
        self.signal.put(signal)
    
    def hot_plugging_handler(self, func):# Горячая регестрирация колбэк-обработчиков
        signal = {"hot_plugging_handler": func}
        self.signal.put(signal)
    
    def hot_removal_if_handler(self, func): # Горячее удаление колбэк-фильтров
        signal = {"hot_removal_if_handler": func}
        self.signal.put(signal)
    
    def hot_plugging_if_handler(self, func): # Горячая регестрирация колбэк-фильтров
        signal = {"hot_plugging_if_handler": func}
        self.signal.put(signal)

    def signal_handler(self, handler_dict):
        match handler_dict:
            case {"hot_removal_handler": func}:
                
                self.handlers.discard(func) 
                stop_signal_queue = self.listener_queues.get(func.__name__)
                stop_signal_queue.put(None)
                self.variable_key_down.pop(func.__name__, None)
                self.listener_queues.pop(func.__name__, None)
                print(f"Handling hot removal for function {func.__name__}")
                
            case {"hot_plugging_handler": func}:
                
                self.handlers.add(func) 
                self.listener_queues.update({func.__name__: queue.Queue()})
                listener_thread=threading.Thread(target=self.wrapper, args=(self.listener_queues[func.__name__], func), name=func.__name__)
                self.listeners.append(listener_thread)
                listener_thread.daemon = True
                listener_thread.start()
                print(f"Handling hot plugging for function {func.__name__}")
                
            case {"hot_removal_if_handler": func}:
                
                self.if_handlers.discard(func) 
                print(f"Handling conditional hot removal for function {func.__name__}")
                
            case {"hot_plugging_if_handler": func}:
                
                self.if_handlers.add(func) 
                print(f"Handling conditional hot plugging for function {func.__name__}")
                
            case _:
                print("Unknown case")
                
    def get_if_handlers(self, objekt=None):
        if objekt == "__name__":
            return set(if_handler.__name__ for if_handler in self.if_handlers)
        else:
             return self.if_handlers

    def get_handlers(self, objekt=None):
        if objekt == "__name__":
            return set(handler.__name__ for handler in self.handlers)
        else:
             return self.handlers

    def get_status_hooks(self):
        return self.flagRun

if __name__ == '__main__':

    log = True
    def print_event(event):

        if log:
            print(f"!-----------: \n"
                  f"{time.strftime('%H:%M:%S')} -> {event.inputDevice} -> {event.event_type} -> {event.key_code}\n"
                  f"X: {event.x}, Y: {event.y}, Scan_code: {event.scan_code}\n"
                  f"____________! \n")   
            
    def print_event2(event):

        if log: 
            print(f"!-----------: \n"
                  f"Нажатые кнопки: {event.key_down}\n"
                  f"____________! \n")
                
    def print_event3(event):     
        if log:   
            threads = threading.enumerate()
            thread_names = [thread.name for thread in threads]
            threads_str = ', '.join(thread_names)
            print(f"!-----------: \n"
                  f"Активные потоки: {threads_str}\n"
                  f"____________! \n") 
            
    def _if_block(event):     
        if event.key_code == 49:   
            print(f"Блокируем кнопку клавиатуры [1]")
            return False
            
    def _if2_block(event):     
        if event.key_code == "left":   
            print(f"Блокируем кнопку мыши [left]")
            return False

    def _if3_block(event):     
    # Проверка условия ограничения координат мыши
        if event.inputDevice == "Mouse" and 0 <= event.y <= 250:
            # Блокировка движения мыши
            win32api.SetCursorPos((event.x, 250))
            return False  # Блокировать обработку события мыши 

        if event.inputDevice == "Mouse" and 1000 <= event.y <= 1080:
            win32api.SetCursorPos((event.x, 1000))
            return False

    input_listener = InputListener()
    print(input_listener.get_status_hooks())
    
    input_listener.add_handler(print_event)
    input_listener.add_handler(print_event2)
    input_listener.add_handler(print_event3)
    
    input_listener.add_if_handler(_if_block)
    input_listener.add_if_handler(_if2_block)
    input_listener.add_if_handler(_if3_block)
    
    input_listener.start()
    print(input_listener.get_status_hooks())
    time.sleep(5)
    
    input_listener.stop()
    print(input_listener.get_status_hooks())
    time.sleep(5)
    
    input_listener.remove_handler(print_event2) # Удаление колбэк-обработчика на холодную
    input_listener.remove_if_handler(_if3_block)
    
    input_listener.start()
    print(input_listener.get_status_hooks())
    time.sleep(5)

    input_listener.hot_removal_if_handler(_if_block)
    input_listener.hot_removal_if_handler(_if2_block)
    
    input_listener.hot_plugging_if_handler(_if3_block)
    
    input_listener.hot_removal_handler(print_event)
    
    print(input_listener.get_handlers("__name__"))
    print(input_listener.get_handlers())
    
    print(input_listener.get_if_handlers("__name__"))
    print(input_listener.get_if_handlers())
    
    input_listener.hot_plugging_handler(print_event2)
    input_listener.hot_removal_handler(print_event2)
    
    time.sleep(5)
    
    input_listener.hot_plugging_handler(print_event)
    input_listener.hot_plugging_handler(print_event2)
    input_listener.hot_removal_if_handler(_if3_block)

반응형