| systemtraytest.py | systemtraytest.py | |||
|---|---|---|---|---|
| #!/usr/bin/env python3 | #!/usr/bin/env python3 | |||
| # SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org> | # SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org> | |||
| # SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com> | # SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com> | |||
| # SPDX-License-Identifier: MIT | # SPDX-License-Identifier: MIT | |||
| import os | import os | |||
| from subprocess import Popen, PIPE | import queue | |||
| import sys | ||||
| import threading | ||||
| import unittest | import unittest | |||
| from datetime import date | from datetime import date | |||
| from subprocess import PIPE, Popen | ||||
| from time import sleep | from time import sleep | |||
| from typing import Any, Final | from typing import IO, Final | |||
| import threading | ||||
| import gi | import gi | |||
| gi.require_version("Gtk", "3.0") # StatusIcon is removed in 4 | gi.require_version("Gtk", "3.0") # StatusIcon is removed in 4 | |||
| from appium import webdriver | from appium import webdriver | |||
| from appium.options.common.base import AppiumOptions | ||||
| from appium.webdriver.common.appiumby import AppiumBy | from appium.webdriver.common.appiumby import AppiumBy | |||
| from gi.repository import Gtk, GLib, Gio | from gi.repository import Gio, GLib, Gtk | |||
| from selenium.common.exceptions import TimeoutException | from selenium.common.exceptions import TimeoutException | |||
| from selenium.webdriver.support import expected_conditions as EC | from selenium.webdriver.support import expected_conditions as EC | |||
| from selenium.webdriver.support.ui import WebDriverWait | from selenium.webdriver.support.ui import WebDriverWait | |||
| KDE_VERSION: Final = 5 | KDE_VERSION: Final = 5 | |||
| WIDGET_ID: Final = "org.kde.plasma.systemtray" | ||||
| class XEmbedTrayIcon(threading.Thread): | class XEmbedTrayIcon(threading.Thread): | |||
| """ | """ | |||
| XEmbed tray icon implementation using Gtk.StatusIcon | XEmbed tray icon implementation using Gtk.StatusIcon | |||
| """ | """ | |||
| def __init__(self, title: str) -> None: | def __init__(self, title: str) -> None: | |||
| super().__init__() | super().__init__() | |||
| self.__timer: threading.Timer = threading.Timer(20, Gtk.main_quit) # Failsafe | self.__timer: threading.Timer = threading.Timer(300, Gtk.main_quit) # Failsafe | |||
| self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title) | self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title) | |||
| self.__status_icon.set_from_icon_name("xorg") | self.__status_icon.set_from_icon_name("xorg") | |||
| self.__status_icon.connect("button-press-event", self.__on_button_p ress_event) | self.__status_icon.connect("button-press-event", self.__on_button_p ress_event) | |||
| self.__status_icon.connect("button-release-event", self.__on_button _release_event) | self.__status_icon.connect("button-release-event", self.__on_button _release_event) | |||
| self.__status_icon.connect("popup-menu", self.__on_popup_menu) | self.__status_icon.connect("popup-menu", self.__on_popup_menu) | |||
| self.__status_icon.connect("scroll-event", self.__on_scroll_event) | self.__status_icon.connect("scroll-event", self.__on_scroll_event) | |||
| def run(self) -> None: | def run(self) -> None: | |||
| self.__timer.start() | self.__timer.start() | |||
| Gtk.main() | Gtk.main() | |||
| def quit(self) -> None: | def quit(self) -> None: | |||
| self.__timer.cancel() | self.__timer.cancel() | |||
| Gtk.main_quit() | Gtk.main_quit() | |||
| def __on_button_press_event(self, status_icon: Gtk.StatusIcon, button_e vent) -> None: | def __on_button_press_event(self, status_icon: Gtk.StatusIcon, button_e vent) -> None: | |||
| print("button-press-event", button_event.button) | print("button-press-event", button_event.button, file=sys.stderr, f lush=True) | |||
| def __on_button_release_event(self, status_icon: Gtk.StatusIcon, button _event) -> None: | def __on_button_release_event(self, status_icon: Gtk.StatusIcon, button _event) -> None: | |||
| print("button-release-event", button_event.button) | print("button-release-event", button_event.button, file=sys.stderr, flush=True) | |||
| self.quit() | self.quit() | |||
| def __on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, act ivate_time: int) -> None: | def __on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, act ivate_time: int) -> None: | |||
| print("popup-menu", button, activate_time) | print("popup-menu", button, activate_time, file=sys.stderr, flush=T rue) | |||
| def __on_scroll_event(self, status_icon, scroll_event) -> None: | def __on_scroll_event(self, status_icon, scroll_event) -> None: | |||
| print("scroll-event", scroll_event.delta_x, scroll_event.delta_y, i | print("scroll-event", scroll_event.delta_x, scroll_event.delta_y, i | |||
| nt(scroll_event.direction)) | nt(scroll_event.direction), file=sys.stderr, flush=True) | |||
| class StreamReaderThread(threading.Thread): | ||||
| """ | ||||
| Non-blocking readline thread | ||||
| """ | ||||
| def __init__(self, stream: IO[bytes]) -> None: | ||||
| """ | ||||
| @param stream: the stream to read from | ||||
| """ | ||||
| self.__stream: IO[bytes] = stream | ||||
| self.__queue = queue.Queue() | ||||
| self.__stop_event = threading.Event() | ||||
| # Create the thread | ||||
| super().__init__() | ||||
| def run(self) -> None: | ||||
| """ | ||||
| Collects lines from the source stream and put them in the queue. | ||||
| """ | ||||
| while self.__stream.readable() and not self.__stop_event.is_set(): | ||||
| line_str: str = self.__stream.readline().decode(encoding="utf-8 | ||||
| ") | ||||
| if "Received click" in line_str: | ||||
| self.__queue.put(line_str) | ||||
| elif len(line_str) == 0: | ||||
| break | ||||
| def stop(self) -> None: | ||||
| """ | ||||
| Stops the thread | ||||
| """ | ||||
| self.__stop_event.set() | ||||
| def readline(self) -> str | None: | ||||
| """ | ||||
| Non-blocking readline | ||||
| The default timeout is 5s. | ||||
| """ | ||||
| try: | ||||
| return self.__queue.get(block=True, timeout=5) | ||||
| except queue.Empty: | ||||
| return None | ||||
| class SystemTrayTests(unittest.TestCase): | class SystemTrayTests(unittest.TestCase): | |||
| """ | """ | |||
| Tests for the system tray widget | Tests for the system tray widget | |||
| """ | """ | |||
| driver: webdriver.Remote | driver: webdriver.Remote | |||
| xembedsniproxy: Popen[bytes] | ||||
| xembed_tray_icon: XEmbedTrayIcon | None | ||||
| stream_reader_thread: StreamReaderThread | None | ||||
| @classmethod | @classmethod | |||
| def setUpClass(cls) -> None: | def setUpClass(cls) -> None: | |||
| """ | """ | |||
| Opens the widget and initialize the webdriver | Opens the widget and initialize the webdriver | |||
| """ | """ | |||
| desired_caps: dict[str, Any] = {} | options = AppiumOptions() | |||
| desired_caps["app"] = "plasmawindowed -p org.kde.plasma.nano org.kd | options.set_capability("app", f"plasmawindowed -p org.kde.plasma.na | |||
| e.plasma.systemtray" | no {WIDGET_ID}") | |||
| desired_caps["timeouts"] = {'implicit': 10000} | cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:47 | |||
| cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:47 | 23', options=options) | |||
| 23', desired_capabilities=desired_caps) | ||||
| cls.driver.implicitly_wait = 10 | ||||
| def setUp(self) -> None: | def setUp(self) -> None: | |||
| self.kded = Popen([f"kded{KDE_VERSION}"]) | self.kded = Popen([f"kded{KDE_VERSION}"]) | |||
| # Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnect ion.html | # Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnect ion.html | |||
| session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESS ION) | session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESS ION) | |||
| SERVICE_NAME: Final = "org.freedesktop.DBus" | SERVICE_NAME: Final = "org.freedesktop.DBus" | |||
| OBJECT_PATH: Final = "/" | OBJECT_PATH: Final = "/" | |||
| INTERFACE_NAME: Final = SERVICE_NAME | INTERFACE_NAME: Final = SERVICE_NAME | |||
| message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_ NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner") | message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_ NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner") | |||
| message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"] )) | message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"] )) | |||
| for _ in range(5): | for _ in range(5): | |||
| reply, _ = session_bus.send_message_with_reply_sync(message, Gi o.DBusSendMessageFlags.NONE, 1000) | reply, _ = session_bus.send_message_with_reply_sync(message, Gi o.DBusSendMessageFlags.NONE, 1000) | |||
| if reply and reply.get_signature() == 'b' and reply.get_body(). get_child_value(0).get_boolean(): | if reply and reply.get_signature() == 'b' and reply.get_body(). get_child_value(0).get_boolean(): | |||
| break | break | |||
| print(f"waiting for kded to appear on the dbus session") | print(f"waiting for kded to appear on the dbus session", file=s ys.stderr, flush=True) | |||
| sleep(1) | sleep(1) | |||
| kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE | kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE | |||
| _VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", | _VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", GLib.Varia | |||
| GLib.Variant("(s)" | nt("(s)", [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), Gio.DBusSen | |||
| , [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), | dMessageFlags.NONE, 1000) | |||
| Gio.DBusSendMessag | ||||
| eFlags.NONE, 1000) | ||||
| self.assertTrue(kded_reply.get_child_value(0).get_boolean(), "Modul e is not loaded") | self.assertTrue(kded_reply.get_child_value(0).get_boolean(), "Modul e is not loaded") | |||
| def tearDown(self) -> None: | def tearDown(self) -> None: | |||
| """ | """ | |||
| Take screenshot when the current test fails | Take screenshot when the current test fails | |||
| """ | """ | |||
| if not self._outcome.result.wasSuccessful(): | if not self._outcome.result.wasSuccessful(): | |||
| self.driver.get_screenshot_as_file(f"systemtraytest_failed_test _shot_#{self.id()}.png") | self.driver.get_screenshot_as_file(f"failed_test_shot_systemtra ytest_#{self.id()}.png") | |||
| self.kded.kill() | self.kded.kill() | |||
| @classmethod | @classmethod | |||
| def tearDownClass(cls) -> None: | def tearDownClass(cls) -> None: | |||
| """ | """ | |||
| Quits the webdriver | Make sure to terminate the driver again, lest it dangles. | |||
| """ | """ | |||
| cls.driver.quit() | cls.driver.quit() | |||
| def cleanup_xembed_tray_icon(self) -> None: | ||||
| """ | ||||
| Cleanup function for test_xembed_tray_icon | ||||
| """ | ||||
| self.xembedsniproxy.terminate() | ||||
| self.xembedsniproxy = None | ||||
| if self.xembed_tray_icon is not None: | ||||
| self.xembed_tray_icon.quit() | ||||
| self.xembed_tray_icon = None | ||||
| if self.stream_reader_thread is not None and self.stream_reader_thr | ||||
| ead.is_alive(): | ||||
| self.stream_reader_thread.stop() | ||||
| self.stream_reader_thread = None | ||||
| def test_xembed_tray_icon(self) -> None: | def test_xembed_tray_icon(self) -> None: | |||
| """ | """ | |||
| Tests XEmbed tray icons can be listed and clicked in the tray. | Tests XEmbed tray icons can be listed and clicked in the tray. | |||
| @note GTK doesn't like send_events and double checks the mouse posi tion | @note GTK doesn't like send_events and double checks the mouse posi tion | |||
| matches where the window is and is top level, so match the debug | matches where the window is and is top level, so match the debug | |||
| output from xembedsniproxy instead. | output from xembedsniproxy instead. | |||
| """ | """ | |||
| self.addCleanup(self.cleanup_xembed_tray_icon) | ||||
| debug_env: dict[str, str] = os.environ.copy() | debug_env: dict[str, str] = os.environ.copy() | |||
| debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true" | debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true" | |||
| xembedsniproxy: Popen[bytes] = Popen(['xembedsniproxy', '--platform | self.xembedsniproxy = Popen(['xembedsniproxy', '--platform', 'xcb'] | |||
| ', 'xcb'], env=debug_env, stderr=PIPE) # For debug output | , env=debug_env, stderr=PIPE) # For debug output | |||
| if not xembedsniproxy.stderr or xembedsniproxy.poll() != None: | if not self.xembedsniproxy.stderr or self.xembedsniproxy.poll() != | |||
| None: | ||||
| self.fail("xembedsniproxy is not available") | self.fail("xembedsniproxy is not available") | |||
| print(f"xembedsniproxy PID: {xembedsniproxy.pid}") | print(f"xembedsniproxy PID: {self.xembedsniproxy.pid}", file=sys.st derr, flush=True) | |||
| title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m %d')}" | title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m %d')}" | |||
| xembed_tray_icon: XEmbedTrayIcon = XEmbedTrayIcon(title) | self.xembed_tray_icon = XEmbedTrayIcon(title) | |||
| xembed_tray_icon.start() | self.xembed_tray_icon.start() | |||
| wait: WebDriverWait = WebDriverWait(self.driver, 10) | wait: WebDriverWait = WebDriverWait(self.driver, 10) | |||
| try: | try: | |||
| # FocusScope in StatusNotifierItem.qml | # FocusScope in StatusNotifierItem.qml | |||
| xembed_icon_item = wait.until(EC.presence_of_element_located((A ppiumBy.NAME, title))) | wait.until(EC.presence_of_element_located((AppiumBy.NAME, title ))) | |||
| except TimeoutException: | except TimeoutException: | |||
| xembedsniproxy.terminate() | self.fail(f"Cannot find the XEmbed icon in the system tray: {se | |||
| xembed_tray_icon.quit() | lf.xembedsniproxy.stderr.readlines()}") | |||
| self.fail("Cannot find the XEmbed icon in the system tray: {}". | ||||
| format(xembedsniproxy.stderr.readlines())) | # Create a reader thread to work around read block | |||
| self.stream_reader_thread = StreamReaderThread(self.xembedsniproxy. | ||||
| stderr) | ||||
| self.stream_reader_thread.start() | ||||
| self.assertTrue(self.stream_reader_thread.is_alive(), "The reader t | ||||
| hread is not running") | ||||
| # Now test clickable | # Now test clickable | |||
| xembed_icon_item.click() | self.driver.find_element(AppiumBy.NAME, title).click() | |||
| success: bool = False | success: bool = False | |||
| for _ in range(10): | for _ in range(10): | |||
| if xembedsniproxy.stderr.readable(): | if self.stream_reader_thread.readline() is not None: | |||
| stderr = xembedsniproxy.stderr.readline() | ||||
| print(stderr) | ||||
| else: | ||||
| print("Retrying...") | ||||
| xembed_icon_item.click() | ||||
| sleep(1) | ||||
| continue | ||||
| line_str: str = stderr.decode(encoding="utf-8").strip() | ||||
| if "Received click" in line_str: | ||||
| success = True | success = True | |||
| break | break | |||
| xembedsniproxy.terminate() | print("Retrying...", file=sys.stderr, flush=True) | |||
| xembed_tray_icon.quit() | self.driver.find_element(AppiumBy.NAME, title).click() | |||
| self.assertTrue(success, "xembedsniproxy did not receive the click event") | self.assertTrue(success, "xembedsniproxy did not receive the click event") | |||
| if __name__ == '__main__': | if __name__ == '__main__': | |||
| unittest.main() | unittest.main() | |||
| End of changes. 27 change blocks. | ||||
| 51 lines changed or deleted | 114 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ | ||||