systemtraytest.py | systemtraytest.py | |||
---|---|---|---|---|
#!/usr/bin/env python3 | #!/usr/bin/env python3 | |||
# SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org> | # SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org> | |||
# SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com> | # SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com> | |||
# SPDX-License-Identifier: MIT | # SPDX-License-Identifier: MIT | |||
import os | import os | |||
from subprocess import Popen, PIPE | import queue | |||
import sys | ||||
import threading | ||||
import unittest | import unittest | |||
from datetime import date | from datetime import date | |||
from subprocess import PIPE, Popen | ||||
from time import sleep | from time import sleep | |||
from typing import Any, Final | from typing import IO, Final | |||
import threading | ||||
import gi | import gi | |||
gi.require_version("Gtk", "3.0") # StatusIcon is removed in 4 | gi.require_version("Gtk", "3.0") # StatusIcon is removed in 4 | |||
from appium import webdriver | from appium import webdriver | |||
from appium.options.common.base import AppiumOptions | ||||
from appium.webdriver.common.appiumby import AppiumBy | from appium.webdriver.common.appiumby import AppiumBy | |||
from gi.repository import Gtk, GLib, Gio | from gi.repository import Gio, GLib, Gtk | |||
from selenium.common.exceptions import TimeoutException | from selenium.common.exceptions import TimeoutException | |||
from selenium.webdriver.support import expected_conditions as EC | from selenium.webdriver.support import expected_conditions as EC | |||
from selenium.webdriver.support.ui import WebDriverWait | from selenium.webdriver.support.ui import WebDriverWait | |||
KDE_VERSION: Final = 5 | KDE_VERSION: Final = 5 | |||
WIDGET_ID: Final = "org.kde.plasma.systemtray" | ||||
class XEmbedTrayIcon(threading.Thread): | class XEmbedTrayIcon(threading.Thread): | |||
""" | """ | |||
XEmbed tray icon implementation using Gtk.StatusIcon | XEmbed tray icon implementation using Gtk.StatusIcon | |||
""" | """ | |||
def __init__(self, title: str) -> None: | def __init__(self, title: str) -> None: | |||
super().__init__() | super().__init__() | |||
self.__timer: threading.Timer = threading.Timer(20, Gtk.main_quit) # Failsafe | self.__timer: threading.Timer = threading.Timer(300, Gtk.main_quit) # Failsafe | |||
self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title) | self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title) | |||
self.__status_icon.set_from_icon_name("xorg") | self.__status_icon.set_from_icon_name("xorg") | |||
self.__status_icon.connect("button-press-event", self.__on_button_p ress_event) | self.__status_icon.connect("button-press-event", self.__on_button_p ress_event) | |||
self.__status_icon.connect("button-release-event", self.__on_button _release_event) | self.__status_icon.connect("button-release-event", self.__on_button _release_event) | |||
self.__status_icon.connect("popup-menu", self.__on_popup_menu) | self.__status_icon.connect("popup-menu", self.__on_popup_menu) | |||
self.__status_icon.connect("scroll-event", self.__on_scroll_event) | self.__status_icon.connect("scroll-event", self.__on_scroll_event) | |||
def run(self) -> None: | def run(self) -> None: | |||
self.__timer.start() | self.__timer.start() | |||
Gtk.main() | Gtk.main() | |||
def quit(self) -> None: | def quit(self) -> None: | |||
self.__timer.cancel() | self.__timer.cancel() | |||
Gtk.main_quit() | Gtk.main_quit() | |||
def __on_button_press_event(self, status_icon: Gtk.StatusIcon, button_e vent) -> None: | def __on_button_press_event(self, status_icon: Gtk.StatusIcon, button_e vent) -> None: | |||
print("button-press-event", button_event.button) | print("button-press-event", button_event.button, file=sys.stderr, f lush=True) | |||
def __on_button_release_event(self, status_icon: Gtk.StatusIcon, button _event) -> None: | def __on_button_release_event(self, status_icon: Gtk.StatusIcon, button _event) -> None: | |||
print("button-release-event", button_event.button) | print("button-release-event", button_event.button, file=sys.stderr, flush=True) | |||
self.quit() | self.quit() | |||
def __on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, act ivate_time: int) -> None: | def __on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, act ivate_time: int) -> None: | |||
print("popup-menu", button, activate_time) | print("popup-menu", button, activate_time, file=sys.stderr, flush=T rue) | |||
def __on_scroll_event(self, status_icon, scroll_event) -> None: | def __on_scroll_event(self, status_icon, scroll_event) -> None: | |||
print("scroll-event", scroll_event.delta_x, scroll_event.delta_y, i | print("scroll-event", scroll_event.delta_x, scroll_event.delta_y, i | |||
nt(scroll_event.direction)) | nt(scroll_event.direction), file=sys.stderr, flush=True) | |||
class StreamReaderThread(threading.Thread): | ||||
""" | ||||
Non-blocking readline thread | ||||
""" | ||||
def __init__(self, stream: IO[bytes]) -> None: | ||||
""" | ||||
@param stream: the stream to read from | ||||
""" | ||||
self.__stream: IO[bytes] = stream | ||||
self.__queue = queue.Queue() | ||||
self.__stop_event = threading.Event() | ||||
# Create the thread | ||||
super().__init__() | ||||
def run(self) -> None: | ||||
""" | ||||
Collects lines from the source stream and put them in the queue. | ||||
""" | ||||
while self.__stream.readable() and not self.__stop_event.is_set(): | ||||
line_str: str = self.__stream.readline().decode(encoding="utf-8 | ||||
") | ||||
if "Received click" in line_str: | ||||
self.__queue.put(line_str) | ||||
elif len(line_str) == 0: | ||||
break | ||||
def stop(self) -> None: | ||||
""" | ||||
Stops the thread | ||||
""" | ||||
self.__stop_event.set() | ||||
def readline(self) -> str | None: | ||||
""" | ||||
Non-blocking readline | ||||
The default timeout is 5s. | ||||
""" | ||||
try: | ||||
return self.__queue.get(block=True, timeout=5) | ||||
except queue.Empty: | ||||
return None | ||||
class SystemTrayTests(unittest.TestCase): | class SystemTrayTests(unittest.TestCase): | |||
""" | """ | |||
Tests for the system tray widget | Tests for the system tray widget | |||
""" | """ | |||
driver: webdriver.Remote | driver: webdriver.Remote | |||
xembedsniproxy: Popen[bytes] | ||||
xembed_tray_icon: XEmbedTrayIcon | None | ||||
stream_reader_thread: StreamReaderThread | None | ||||
@classmethod | @classmethod | |||
def setUpClass(cls) -> None: | def setUpClass(cls) -> None: | |||
""" | """ | |||
Opens the widget and initialize the webdriver | Opens the widget and initialize the webdriver | |||
""" | """ | |||
desired_caps: dict[str, Any] = {} | options = AppiumOptions() | |||
desired_caps["app"] = "plasmawindowed -p org.kde.plasma.nano org.kd | options.set_capability("app", f"plasmawindowed -p org.kde.plasma.na | |||
e.plasma.systemtray" | no {WIDGET_ID}") | |||
desired_caps["timeouts"] = {'implicit': 10000} | cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:47 | |||
cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:47 | 23', options=options) | |||
23', desired_capabilities=desired_caps) | ||||
cls.driver.implicitly_wait = 10 | ||||
def setUp(self) -> None: | def setUp(self) -> None: | |||
self.kded = Popen([f"kded{KDE_VERSION}"]) | self.kded = Popen([f"kded{KDE_VERSION}"]) | |||
# Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnect ion.html | # Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnect ion.html | |||
session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESS ION) | session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESS ION) | |||
SERVICE_NAME: Final = "org.freedesktop.DBus" | SERVICE_NAME: Final = "org.freedesktop.DBus" | |||
OBJECT_PATH: Final = "/" | OBJECT_PATH: Final = "/" | |||
INTERFACE_NAME: Final = SERVICE_NAME | INTERFACE_NAME: Final = SERVICE_NAME | |||
message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_ NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner") | message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_ NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner") | |||
message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"] )) | message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"] )) | |||
for _ in range(5): | for _ in range(5): | |||
reply, _ = session_bus.send_message_with_reply_sync(message, Gi o.DBusSendMessageFlags.NONE, 1000) | reply, _ = session_bus.send_message_with_reply_sync(message, Gi o.DBusSendMessageFlags.NONE, 1000) | |||
if reply and reply.get_signature() == 'b' and reply.get_body(). get_child_value(0).get_boolean(): | if reply and reply.get_signature() == 'b' and reply.get_body(). get_child_value(0).get_boolean(): | |||
break | break | |||
print(f"waiting for kded to appear on the dbus session") | print(f"waiting for kded to appear on the dbus session", file=s ys.stderr, flush=True) | |||
sleep(1) | sleep(1) | |||
kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE | kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE | |||
_VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", | _VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", GLib.Varia | |||
GLib.Variant("(s)" | nt("(s)", [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), Gio.DBusSen | |||
, [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), | dMessageFlags.NONE, 1000) | |||
Gio.DBusSendMessag | ||||
eFlags.NONE, 1000) | ||||
self.assertTrue(kded_reply.get_child_value(0).get_boolean(), "Modul e is not loaded") | self.assertTrue(kded_reply.get_child_value(0).get_boolean(), "Modul e is not loaded") | |||
def tearDown(self) -> None: | def tearDown(self) -> None: | |||
""" | """ | |||
Take screenshot when the current test fails | Take screenshot when the current test fails | |||
""" | """ | |||
if not self._outcome.result.wasSuccessful(): | if not self._outcome.result.wasSuccessful(): | |||
self.driver.get_screenshot_as_file(f"systemtraytest_failed_test _shot_#{self.id()}.png") | self.driver.get_screenshot_as_file(f"failed_test_shot_systemtra ytest_#{self.id()}.png") | |||
self.kded.kill() | self.kded.kill() | |||
@classmethod | @classmethod | |||
def tearDownClass(cls) -> None: | def tearDownClass(cls) -> None: | |||
""" | """ | |||
Quits the webdriver | Make sure to terminate the driver again, lest it dangles. | |||
""" | """ | |||
cls.driver.quit() | cls.driver.quit() | |||
def cleanup_xembed_tray_icon(self) -> None: | ||||
""" | ||||
Cleanup function for test_xembed_tray_icon | ||||
""" | ||||
self.xembedsniproxy.terminate() | ||||
self.xembedsniproxy = None | ||||
if self.xembed_tray_icon is not None: | ||||
self.xembed_tray_icon.quit() | ||||
self.xembed_tray_icon = None | ||||
if self.stream_reader_thread is not None and self.stream_reader_thr | ||||
ead.is_alive(): | ||||
self.stream_reader_thread.stop() | ||||
self.stream_reader_thread = None | ||||
def test_xembed_tray_icon(self) -> None: | def test_xembed_tray_icon(self) -> None: | |||
""" | """ | |||
Tests XEmbed tray icons can be listed and clicked in the tray. | Tests XEmbed tray icons can be listed and clicked in the tray. | |||
@note GTK doesn't like send_events and double checks the mouse posi tion | @note GTK doesn't like send_events and double checks the mouse posi tion | |||
matches where the window is and is top level, so match the debug | matches where the window is and is top level, so match the debug | |||
output from xembedsniproxy instead. | output from xembedsniproxy instead. | |||
""" | """ | |||
self.addCleanup(self.cleanup_xembed_tray_icon) | ||||
debug_env: dict[str, str] = os.environ.copy() | debug_env: dict[str, str] = os.environ.copy() | |||
debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true" | debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true" | |||
xembedsniproxy: Popen[bytes] = Popen(['xembedsniproxy', '--platform | self.xembedsniproxy = Popen(['xembedsniproxy', '--platform', 'xcb'] | |||
', 'xcb'], env=debug_env, stderr=PIPE) # For debug output | , env=debug_env, stderr=PIPE) # For debug output | |||
if not xembedsniproxy.stderr or xembedsniproxy.poll() != None: | if not self.xembedsniproxy.stderr or self.xembedsniproxy.poll() != | |||
None: | ||||
self.fail("xembedsniproxy is not available") | self.fail("xembedsniproxy is not available") | |||
print(f"xembedsniproxy PID: {xembedsniproxy.pid}") | print(f"xembedsniproxy PID: {self.xembedsniproxy.pid}", file=sys.st derr, flush=True) | |||
title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m %d')}" | title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m %d')}" | |||
xembed_tray_icon: XEmbedTrayIcon = XEmbedTrayIcon(title) | self.xembed_tray_icon = XEmbedTrayIcon(title) | |||
xembed_tray_icon.start() | self.xembed_tray_icon.start() | |||
wait: WebDriverWait = WebDriverWait(self.driver, 10) | wait: WebDriverWait = WebDriverWait(self.driver, 10) | |||
try: | try: | |||
# FocusScope in StatusNotifierItem.qml | # FocusScope in StatusNotifierItem.qml | |||
xembed_icon_item = wait.until(EC.presence_of_element_located((A ppiumBy.NAME, title))) | wait.until(EC.presence_of_element_located((AppiumBy.NAME, title ))) | |||
except TimeoutException: | except TimeoutException: | |||
xembedsniproxy.terminate() | self.fail(f"Cannot find the XEmbed icon in the system tray: {se | |||
xembed_tray_icon.quit() | lf.xembedsniproxy.stderr.readlines()}") | |||
self.fail("Cannot find the XEmbed icon in the system tray: {}". | ||||
format(xembedsniproxy.stderr.readlines())) | # Create a reader thread to work around read block | |||
self.stream_reader_thread = StreamReaderThread(self.xembedsniproxy. | ||||
stderr) | ||||
self.stream_reader_thread.start() | ||||
self.assertTrue(self.stream_reader_thread.is_alive(), "The reader t | ||||
hread is not running") | ||||
# Now test clickable | # Now test clickable | |||
xembed_icon_item.click() | self.driver.find_element(AppiumBy.NAME, title).click() | |||
success: bool = False | success: bool = False | |||
for _ in range(10): | for _ in range(10): | |||
if xembedsniproxy.stderr.readable(): | if self.stream_reader_thread.readline() is not None: | |||
stderr = xembedsniproxy.stderr.readline() | ||||
print(stderr) | ||||
else: | ||||
print("Retrying...") | ||||
xembed_icon_item.click() | ||||
sleep(1) | ||||
continue | ||||
line_str: str = stderr.decode(encoding="utf-8").strip() | ||||
if "Received click" in line_str: | ||||
success = True | success = True | |||
break | break | |||
xembedsniproxy.terminate() | print("Retrying...", file=sys.stderr, flush=True) | |||
xembed_tray_icon.quit() | self.driver.find_element(AppiumBy.NAME, title).click() | |||
self.assertTrue(success, "xembedsniproxy did not receive the click event") | self.assertTrue(success, "xembedsniproxy did not receive the click event") | |||
if __name__ == '__main__': | if __name__ == '__main__': | |||
unittest.main() | unittest.main() | |||
End of changes. 27 change blocks. | ||||
51 lines changed or deleted | 114 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ |