systemtraytest.py   systemtraytest.py 
#!/usr/bin/env python3 #!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org> # SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org>
# SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com> # SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com>
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
import os import os
from subprocess import Popen, PIPE import queue
import sys
import threading
import unittest import unittest
from datetime import date from datetime import date
from subprocess import PIPE, Popen
from time import sleep from time import sleep
from typing import Any, Final from typing import IO, Final
import threading
import gi import gi
gi.require_version("Gtk", "3.0") # StatusIcon is removed in 4 gi.require_version("Gtk", "3.0") # StatusIcon is removed in 4
from appium import webdriver from appium import webdriver
from appium.options.common.base import AppiumOptions
from appium.webdriver.common.appiumby import AppiumBy from appium.webdriver.common.appiumby import AppiumBy
from gi.repository import Gtk, GLib, Gio from gi.repository import Gio, GLib, Gtk
from selenium.common.exceptions import TimeoutException from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
KDE_VERSION: Final = 5 KDE_VERSION: Final = 5
WIDGET_ID: Final = "org.kde.plasma.systemtray"
class XEmbedTrayIcon(threading.Thread): class XEmbedTrayIcon(threading.Thread):
""" """
XEmbed tray icon implementation using Gtk.StatusIcon XEmbed tray icon implementation using Gtk.StatusIcon
""" """
def __init__(self, title: str) -> None: def __init__(self, title: str) -> None:
super().__init__() super().__init__()
self.__timer: threading.Timer = threading.Timer(20, Gtk.main_quit) # Failsafe self.__timer: threading.Timer = threading.Timer(300, Gtk.main_quit) # Failsafe
self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title) self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title)
self.__status_icon.set_from_icon_name("xorg") self.__status_icon.set_from_icon_name("xorg")
self.__status_icon.connect("button-press-event", self.__on_button_p ress_event) self.__status_icon.connect("button-press-event", self.__on_button_p ress_event)
self.__status_icon.connect("button-release-event", self.__on_button _release_event) self.__status_icon.connect("button-release-event", self.__on_button _release_event)
self.__status_icon.connect("popup-menu", self.__on_popup_menu) self.__status_icon.connect("popup-menu", self.__on_popup_menu)
self.__status_icon.connect("scroll-event", self.__on_scroll_event) self.__status_icon.connect("scroll-event", self.__on_scroll_event)
def run(self) -> None: def run(self) -> None:
self.__timer.start() self.__timer.start()
Gtk.main() Gtk.main()
def quit(self) -> None: def quit(self) -> None:
self.__timer.cancel() self.__timer.cancel()
Gtk.main_quit() Gtk.main_quit()
def __on_button_press_event(self, status_icon: Gtk.StatusIcon, button_e vent) -> None: def __on_button_press_event(self, status_icon: Gtk.StatusIcon, button_e vent) -> None:
print("button-press-event", button_event.button) print("button-press-event", button_event.button, file=sys.stderr, f lush=True)
def __on_button_release_event(self, status_icon: Gtk.StatusIcon, button _event) -> None: def __on_button_release_event(self, status_icon: Gtk.StatusIcon, button _event) -> None:
print("button-release-event", button_event.button) print("button-release-event", button_event.button, file=sys.stderr, flush=True)
self.quit() self.quit()
def __on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, act ivate_time: int) -> None: def __on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, act ivate_time: int) -> None:
print("popup-menu", button, activate_time) print("popup-menu", button, activate_time, file=sys.stderr, flush=T rue)
def __on_scroll_event(self, status_icon, scroll_event) -> None: def __on_scroll_event(self, status_icon, scroll_event) -> None:
print("scroll-event", scroll_event.delta_x, scroll_event.delta_y, i print("scroll-event", scroll_event.delta_x, scroll_event.delta_y, i
nt(scroll_event.direction)) nt(scroll_event.direction), file=sys.stderr, flush=True)
class StreamReaderThread(threading.Thread):
"""
Non-blocking readline thread
"""
def __init__(self, stream: IO[bytes]) -> None:
"""
@param stream: the stream to read from
"""
self.__stream: IO[bytes] = stream
self.__queue = queue.Queue()
self.__stop_event = threading.Event()
# Create the thread
super().__init__()
def run(self) -> None:
"""
Collects lines from the source stream and put them in the queue.
"""
while self.__stream.readable() and not self.__stop_event.is_set():
line_str: str = self.__stream.readline().decode(encoding="utf-8
")
if "Received click" in line_str:
self.__queue.put(line_str)
elif len(line_str) == 0:
break
def stop(self) -> None:
"""
Stops the thread
"""
self.__stop_event.set()
def readline(self) -> str | None:
"""
Non-blocking readline
The default timeout is 5s.
"""
try:
return self.__queue.get(block=True, timeout=5)
except queue.Empty:
return None
class SystemTrayTests(unittest.TestCase): class SystemTrayTests(unittest.TestCase):
""" """
Tests for the system tray widget Tests for the system tray widget
""" """
driver: webdriver.Remote driver: webdriver.Remote
xembedsniproxy: Popen[bytes]
xembed_tray_icon: XEmbedTrayIcon | None
stream_reader_thread: StreamReaderThread | None
@classmethod @classmethod
def setUpClass(cls) -> None: def setUpClass(cls) -> None:
""" """
Opens the widget and initialize the webdriver Opens the widget and initialize the webdriver
""" """
desired_caps: dict[str, Any] = {} options = AppiumOptions()
desired_caps["app"] = "plasmawindowed -p org.kde.plasma.nano org.kd options.set_capability("app", f"plasmawindowed -p org.kde.plasma.na
e.plasma.systemtray" no {WIDGET_ID}")
desired_caps["timeouts"] = {'implicit': 10000} cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:47
cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:47 23', options=options)
23', desired_capabilities=desired_caps)
cls.driver.implicitly_wait = 10
def setUp(self) -> None: def setUp(self) -> None:
self.kded = Popen([f"kded{KDE_VERSION}"]) self.kded = Popen([f"kded{KDE_VERSION}"])
# Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnect ion.html # Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnect ion.html
session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESS ION) session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESS ION)
SERVICE_NAME: Final = "org.freedesktop.DBus" SERVICE_NAME: Final = "org.freedesktop.DBus"
OBJECT_PATH: Final = "/" OBJECT_PATH: Final = "/"
INTERFACE_NAME: Final = SERVICE_NAME INTERFACE_NAME: Final = SERVICE_NAME
message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_ NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner") message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_ NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner")
message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"] )) message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"] ))
for _ in range(5): for _ in range(5):
reply, _ = session_bus.send_message_with_reply_sync(message, Gi o.DBusSendMessageFlags.NONE, 1000) reply, _ = session_bus.send_message_with_reply_sync(message, Gi o.DBusSendMessageFlags.NONE, 1000)
if reply and reply.get_signature() == 'b' and reply.get_body(). get_child_value(0).get_boolean(): if reply and reply.get_signature() == 'b' and reply.get_body(). get_child_value(0).get_boolean():
break break
print(f"waiting for kded to appear on the dbus session") print(f"waiting for kded to appear on the dbus session", file=s ys.stderr, flush=True)
sleep(1) sleep(1)
kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE
_VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", _VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", GLib.Varia
GLib.Variant("(s)" nt("(s)", [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), Gio.DBusSen
, [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), dMessageFlags.NONE, 1000)
Gio.DBusSendMessag
eFlags.NONE, 1000)
self.assertTrue(kded_reply.get_child_value(0).get_boolean(), "Modul e is not loaded") self.assertTrue(kded_reply.get_child_value(0).get_boolean(), "Modul e is not loaded")
def tearDown(self) -> None: def tearDown(self) -> None:
""" """
Take screenshot when the current test fails Take screenshot when the current test fails
""" """
if not self._outcome.result.wasSuccessful(): if not self._outcome.result.wasSuccessful():
self.driver.get_screenshot_as_file(f"systemtraytest_failed_test _shot_#{self.id()}.png") self.driver.get_screenshot_as_file(f"failed_test_shot_systemtra ytest_#{self.id()}.png")
self.kded.kill() self.kded.kill()
@classmethod @classmethod
def tearDownClass(cls) -> None: def tearDownClass(cls) -> None:
""" """
Quits the webdriver Make sure to terminate the driver again, lest it dangles.
""" """
cls.driver.quit() cls.driver.quit()
def cleanup_xembed_tray_icon(self) -> None:
"""
Cleanup function for test_xembed_tray_icon
"""
self.xembedsniproxy.terminate()
self.xembedsniproxy = None
if self.xembed_tray_icon is not None:
self.xembed_tray_icon.quit()
self.xembed_tray_icon = None
if self.stream_reader_thread is not None and self.stream_reader_thr
ead.is_alive():
self.stream_reader_thread.stop()
self.stream_reader_thread = None
def test_xembed_tray_icon(self) -> None: def test_xembed_tray_icon(self) -> None:
""" """
Tests XEmbed tray icons can be listed and clicked in the tray. Tests XEmbed tray icons can be listed and clicked in the tray.
@note GTK doesn't like send_events and double checks the mouse posi tion @note GTK doesn't like send_events and double checks the mouse posi tion
matches where the window is and is top level, so match the debug matches where the window is and is top level, so match the debug
output from xembedsniproxy instead. output from xembedsniproxy instead.
""" """
self.addCleanup(self.cleanup_xembed_tray_icon)
debug_env: dict[str, str] = os.environ.copy() debug_env: dict[str, str] = os.environ.copy()
debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true" debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true"
xembedsniproxy: Popen[bytes] = Popen(['xembedsniproxy', '--platform self.xembedsniproxy = Popen(['xembedsniproxy', '--platform', 'xcb']
', 'xcb'], env=debug_env, stderr=PIPE) # For debug output , env=debug_env, stderr=PIPE) # For debug output
if not xembedsniproxy.stderr or xembedsniproxy.poll() != None: if not self.xembedsniproxy.stderr or self.xembedsniproxy.poll() !=
None:
self.fail("xembedsniproxy is not available") self.fail("xembedsniproxy is not available")
print(f"xembedsniproxy PID: {xembedsniproxy.pid}") print(f"xembedsniproxy PID: {self.xembedsniproxy.pid}", file=sys.st derr, flush=True)
title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m %d')}" title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m %d')}"
xembed_tray_icon: XEmbedTrayIcon = XEmbedTrayIcon(title) self.xembed_tray_icon = XEmbedTrayIcon(title)
xembed_tray_icon.start() self.xembed_tray_icon.start()
wait: WebDriverWait = WebDriverWait(self.driver, 10) wait: WebDriverWait = WebDriverWait(self.driver, 10)
try: try:
# FocusScope in StatusNotifierItem.qml # FocusScope in StatusNotifierItem.qml
xembed_icon_item = wait.until(EC.presence_of_element_located((A ppiumBy.NAME, title))) wait.until(EC.presence_of_element_located((AppiumBy.NAME, title )))
except TimeoutException: except TimeoutException:
xembedsniproxy.terminate() self.fail(f"Cannot find the XEmbed icon in the system tray: {se
xembed_tray_icon.quit() lf.xembedsniproxy.stderr.readlines()}")
self.fail("Cannot find the XEmbed icon in the system tray: {}".
format(xembedsniproxy.stderr.readlines())) # Create a reader thread to work around read block
self.stream_reader_thread = StreamReaderThread(self.xembedsniproxy.
stderr)
self.stream_reader_thread.start()
self.assertTrue(self.stream_reader_thread.is_alive(), "The reader t
hread is not running")
# Now test clickable # Now test clickable
xembed_icon_item.click() self.driver.find_element(AppiumBy.NAME, title).click()
success: bool = False success: bool = False
for _ in range(10): for _ in range(10):
if xembedsniproxy.stderr.readable(): if self.stream_reader_thread.readline() is not None:
stderr = xembedsniproxy.stderr.readline()
print(stderr)
else:
print("Retrying...")
xembed_icon_item.click()
sleep(1)
continue
line_str: str = stderr.decode(encoding="utf-8").strip()
if "Received click" in line_str:
success = True success = True
break break
xembedsniproxy.terminate() print("Retrying...", file=sys.stderr, flush=True)
xembed_tray_icon.quit() self.driver.find_element(AppiumBy.NAME, title).click()
self.assertTrue(success, "xembedsniproxy did not receive the click event") self.assertTrue(success, "xembedsniproxy did not receive the click event")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
 End of changes. 27 change blocks. 
51 lines changed or deleted 114 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/