DBus 自省 XML 生成 Qt 代码
2024-12-18 21:18:31+08:00

DBus 自省 XML 生成 Qt 代码

各种支持 DBus 的开发框架都能够通过 XML 自动生成代码,例如 Glib 的 gdbus-codegen 和 Qt 的 qdbusxml2cpp

通过 DBus 对象 org.freedesktop.DBus.Introspectable 接口下的 Introspect 方法可以自省 XML,这样就不需要手写了。

但是 d-feet、dbus-send 等工具会给返回值加上类型标注或者换行符导致需要人工修改。因此需要自己写一个脚本来自省 DBus。

使用示例:

$ ./introspect.py -t system -n org.freedesktop.NetworkManager -p /org/freedesktop/NetworkManager
$ ./introspect.py -t system -n org.freedesktop.NetworkManager -p /org/freedesktop/NetworkManager/Devices/5
$ ls -1
introspect.py  
org.freedesktop.NetworkManager.Device.Statistics.xml  
org.freedesktop.NetworkManager.Device.WifiP2P.xml  
org.freedesktop.NetworkManager.Device.xml
org.freedesktop.NetworkManager.xml
$ qdbusxml2cpp -c NetworkManager -p NetworkManager org.freedesktop.NetworkManager.xml --no-namespaces
$ qdbusxml2cpp -c Device -p Device org.freedesktop.NetworkManager.Device.xml --no-namespaces
$ qdbusxml2cpp -c WifiP2P -p WifiP2P org.freedesktop.NetworkManager.Device.WifiP2P.xml --no-namespaces
$ ls -1
Device.cpp
Device.h
introspect.py
NetworkManager.cpp
NetworkManager.h
org.freedesktop.NetworkManager.Device.Statistics.xml
org.freedesktop.NetworkManager.Device.WifiP2P.xml
org.freedesktop.NetworkManager.Device.xml
org.freedesktop.NetworkManager.xml
WifiP2P.cpp
WifiP2P.h

使用 --no-namespaces 选项的原因是,qdbusxml2cpp 会把 DBus Name 的最后一项作为类名,而之前项作为命名空间名 即 org.freedesktop.NetworkManager 生成 ::org::freedesktop::NetworkManager 类。 而 org.freedesktop.NetworkManager.Device 生成的 ::org::freedesktop::NetworkManager::Device 类。 这两者会发生冲突。前者的 NetworkManager 是类名,而后者的是命名空间明。

验证生成的代码:

#include <QDebug>
#include "NetworkManager.h"
#include "Device.h"
#include "WifiP2P.h"

int main(void)
{
    NetworkManager networkManager{"org.freedesktop.NetworkManager", "/org/freedesktop/NetworkManager", QDBusConnection::systemBus()};
    auto devicePathes = networkManager.devices();
    for (auto& devicePath : devicePathes)
    {
        Device device("org.freedesktop.NetworkManager", devicePath.path(), QDBusConnection::systemBus());
        qDebug() << "check wifi p2p device" << devicePath.path();
        if (device.deviceType() == 30)
        {
            qDebug() << "found wifi p2p device" << devicePath.path();
            break;
        }
    }
}
17:06:43: Starting /home/planc/miracast/build-wifi-p2p-unknown-Default/wifi-p2p...
check wifi p2p device "/org/freedesktop/NetworkManager/Devices/1"
check wifi p2p device "/org/freedesktop/NetworkManager/Devices/2"
check wifi p2p device "/org/freedesktop/NetworkManager/Devices/5"
found wifi p2p device "/org/freedesktop/NetworkManager/Devices/5"
17:06:43: /home/planc/miracast/build-wifi-p2p-unknown-Default/wifi-p2p exited with code 0

如果看不到日志打印可以参考 Qt 日志模块的使用 进行配置

#! /usr/bin/env python3
import dbus
from dbus.proxies import ProxyObject
import xml.dom.minidom as minidom
from typing import Callable, List, Set, Dict
from argparse import ArgumentParser, Namespace

# 不需要的 Interface
filter:Set[str] = {
    "org.freedesktop.DBus.Introspectable",
    "org.freedesktop.DBus.Peer",
    "org.freedesktop.DBus.Properties",
}

class DBusTypeParser(object):
    dbusQtContainerType:Dict[str,str] = {
        "<array>": "QList",
        "<struct>": "QVariant",
        "<dict>": "QMap",
    }

    dbusQtType:Dict[str,str] = {
        "y": "quint8",
        "b": "bool",
        "n": "qint16",
        "q": "quint16",
        "i": "qint32",
        "u": "quint32",
        "x": "qint64",
        "t": "quint64",
        "d": "double",
        "h": "quint32",
        "s": "QString",
        "o": "QDBusObject",
        "g": "QString",
        "v": "QVariant",
    }

    def __init__(self) -> None:
        # 状态
        self.currentState:str = "<normal>"
        self.stateStack:List[str] = []

        # dict里的第几个参数
        self.currentIndex:int = 0
        self.indexStack:List[int] = []

    def pushState(self, state:str) -> None:
        if state == "<dict>":
            self.pushIndex(self.currentIndex)
            self.currentIndex = 0
        self.stateStack.append(state)

    def popState(self) -> str:
        state:str = self.stateStack.pop()
        if state == "<dict>":
            self.currentIndex = self.popIndex()
        return state

    def pushIndex(self, index:int) -> None:
        self.indexStack.append(index)

    def popIndex(self) -> int:
        return self.indexStack.pop()

    def parse(self, signature:str) -> str:
        self.currentState = "<normal>"
        self.currentIndex = 0
        qtype:str = ""
        while len(signature) > 0:
            ch:str = signature[0]
            signature = signature[1:]

            if ch in DBusTypeParser.dbusQtType:
                if self.currentState == "<normal>":
                    qtype += self.parseNormal(ch)
                elif self.currentState == "<struct>":
                    qtype += self.parseStruct(ch)
                elif self.currentState == "<dict>":
                    qtype += self.parseDict(ch)
                elif self.currentState == "<array>":
                    qtype += self.parseArray(ch)
            elif ch == "a" and signature:
                if self.currentState == "<dict>" and self.currentIndex == 0:
                    qtype += DBusTypeParser.dbusQtContainerType['<dict>'] + "<"
                self.pushState(self.currentState)
                self.currentState = "<array>"
            elif ch == "(":
                if self.currentState == "<dict>" and self.currentIndex == 0:
                    qtype += DBusTypeParser.dbusQtContainerType['<dict>'] + "<"
                self.pushState(self.currentState)
                self.currentState = "<struct>"
            elif ch == "{":
                # a{ 开启dict模式,之前为暂态的array模式,不push
                self.currentState = "<dict>"
            elif ch == ")":
                qtype += self.parseStruct(ch)
                if self.currentState == "<dict>" and self.currentIndex == 0:
                    qtype += ", "
                    self.currentIndex += 1
            elif ch == "}":
                self.currentState = self.popState()
                qtype += self.parseDict(ch)
                if self.currentState == "<dict>" and self.currentIndex == 0:
                    qtype += ", "
                    self.currentIndex += 1
            else:
                print(f"{ch}")
                raise f"Unknown signature '{ch}'"
        return qtype

    def parseNormal(self, ch:str) -> str:
        return DBusTypeParser.dbusQtType[ch]
        
    def parseArray(self, ch:str) -> str:
        self.currentState = self.popState()
        return f"{DBusTypeParser.dbusQtContainerType['<array>']}<{DBusTypeParser.dbusQtType[ch]}>"

    def parseStruct(self, ch:str) -> str:
        if ch != ")":
            return ""

        self.currentState = self.popState()
        if self.currentState == "<struct>":
            return ""

        if self.currentState == "<normal>":
            return DBusTypeParser.dbusQtContainerType["<struct>"]

        if self.currentState == "<dict>":
            return DBusTypeParser.dbusQtContainerType["<struct>"]

        if self.currentState == "<array>":
            self.currentState = self.popState()
            return f"{DBusTypeParser.dbusQtContainerType['<array>']}<{DBusTypeParser.dbusQtContainerType['<struct>']}>"

    def parseDict(self, ch:str) -> str:
        if ch == "}":
            return ">"

        self.currentIndex += 1
        if self.currentIndex == 1:
            return DBusTypeParser.dbusQtContainerType['<dict>'] + "<" + DBusTypeParser.dbusQtType[ch] + ", "
        else:
            return DBusTypeParser.dbusQtType[ch]

dbusTypeParser = DBusTypeParser()

parser:ArgumentParser = ArgumentParser(description='DBus Introspect XML')
parser.add_argument("-t", "--type", default="session", help="bus type, system or session")
parser.add_argument("-n", "--name", help="bus namae")
parser.add_argument("-p", "--path", help="object path")
args:Namespace = parser.parse_args()

bus:dbus.Bus = dbus.SystemBus() if args.type == "system" else dbus.SessionBus() 
proxy:ProxyObject = bus.get_object(args.name, args.path)
xmlString:str = proxy.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')

root:minidom.Document = minidom.parseString(xmlString)
interfaces:List[minidom.Element] = root.getElementsByTagName("interface")

neededInterfaces:List[minidom.Element] = []
for interface in interfaces:
    name:str = interface.getAttribute("name")
    if name in filter:
        continue

    methods:List[minidom.Element] = interface.getElementsByTagName("method")
    for method in methods:
        inIndex:int = 0
        outIndex:int = 0
        methodArgs:List[minidom.Element] = method.getElementsByTagName("arg")
        for arg in methodArgs:
            sign:str = arg.getAttribute("type")
            qtype:str = dbusTypeParser.parse(sign)
            annotation:minidom.Element = root.createElement("annotation")
            
            if arg.getAttribute("direction") == "in":
                annotation.setAttribute("name", f"org.qtproject.QtDBus.QtTypeName.In{inIndex}")
                inIndex += 1
            if arg.getAttribute("direction") == "out":
                annotation.setAttribute("name", f"org.qtproject.QtDBus.QtTypeName.Out{outIndex}")
                outIndex += 1

            annotation.setAttribute("value", qtype)
            method.appendChild(annotation)

    properties = interface.getElementsByTagName("property")
    for property in properties:
        sign:str = property.getAttribute("type")
        qtype:str = dbusTypeParser.parse(sign)
        annotation:minidom.Element = root.createElement("annotation")
        annotation.setAttribute("name", "org.qtproject.QtDBus.QtTypeName")
        annotation.setAttribute("value", qtype)
        property.appendChild(annotation)
    
    neededInterfaces.append(interface)


for interface in neededInterfaces:
    with open(interface.getAttribute("name") + ".xml", "w") as fp:
        fp.write(interface.toprettyxml())
Python 寻环境与依赖导出
2024-12-18 21:18:31+08:00

Python 虚拟环境与依赖导出

因为不同的项目之间可能存在依赖冲突,因此需要使用虚拟环境,避免在全局环境上安装依赖。

Python 依赖导出

通过 pipreqs 可以将当前项目依赖的所有包导出到 requirements.txt 中:

$ pipreqs . --encoding=utf8 --force 

注意,如果在当前路径下配置了虚拟环境,pipreqs 会读取寻环境的目录产生多余的依赖,需要添加 --ignore .venv 参数来消除影响
可以使用 --proxy http://localhost:7890 来配置代理

然后通过该文件即可一键安装所有依赖

$ pip install -r requirements.txt

virtualenv

  1. 安装
pip install virtualenv
  1. 创建
virtualenv [虚拟环境名称] 
  1. 激活
cd [虚拟环境目录]
source ./bin/activate
  1. 退出
deactivate

pipenv

  1. 安装
pip install pipenv
  1. 创建
pipenv install

在项目目录里执行,如果没有 pipfile,则会创建;如果有则会安装记录在当中的依赖

  1. 安装依赖包
pipenv install [包名]

在项目目录里执行,安装依赖的同时记录到 pipfile 中去,如果文件不存在则自动创建

  1. 激活
pipenv shell

另外还有 virtualenvwrapperconda 可以管理虚拟环境,此处暂不列出。

Python的异步IO
2024-12-18 21:18:31+08:00

Python的异步IO

协程

Python 的异步 I/O 基于协程实现。使用async关键字来创建一个异步函数,对异步函数的调用不会执行该函数,而是生成一个协程对象。
对每一个协程对象,都必须等待其结束(即使是没有启动的协程),否则会产生一个RuntimeWarning

示例 :

# 创建一个异步函数
async def say_hello():
    print("hello world")

# 创建协程
coro = say_hello()
print(coro)

运行结果 :

<coroutine object say_hello at 0x109bf6170>
sys:1: RuntimeWarning: coroutine 'say_hello' was never awaited

要启动一个协程,有三种方式 :

  • 通过asyncio.run运行一个协程
  • 使用await关键字,这种方法只能在另一个async函数中才能使用
  • 通过asyncio.create_task

await必须在async函数中才能使用,因此无法启动协程的顶层入口点,此时只能使用asyncio.run函数。

await让出当前协程并运行目标协程,当前协程直到目标目标的状态变为done时才会恢复就绪。 如果await的目标不是一个协程(例如Task和Future),让出当前协程后,事件循环(EventLoop)会从就绪队列中选择一个协程运行。

asyncio.create_task让出当前协程并运行目标协程,当前协程不会等待而是加入就绪队列。
只要目标协程让出,当前协程就有机会执行,从而将启动多个协程,实现并发执行。
返回的Task对象也可以在适当的时候使用await等待其结束。

简化的协程状态 :

协程状态

await的示例 :

import asyncio
import time

async def say_hello():
    print("hello", time.strftime('%X'))
    await asyncio.sleep(1)
    print("hello", time.strftime('%X'))

async def say_world():
    print("world", time.strftime('%X'))
    await asyncio.sleep(1)
    print("world", time.strftime('%X'))

# 顶层入口点
async def main():
    await say_hello() # 启动say_hello()返回的协程,并等待其结束
    await say_world() # 要等到前一个await结束后,才会启动

# 启动顶层入口点
asyncio.run(main())

运行结果 :

hello 15:27:26
hello 15:27:27
world 15:27:27
world 15:27:28

asyncio.create_task的示例 :

import asyncio
import time

async def say_hello():
    print("hello", time.strftime('%X'))
    await asyncio.sleep(1)
    print("hello", time.strftime('%X'))

async def say_world():
    print("world", time.strftime('%X'))
    await asyncio.sleep(1)
    print("world", time.strftime('%X'))

# 顶层入口点
async def main():
    task_say_hello = asyncio.create_task(say_hello()) # 启动协程不等待
    task_say_world = asyncio.create_task(say_world()) 

    await task_say_hello
    await task_say_world

# 启动顶层入口点
asyncio.run(main())

运行结果 :

hello 15:29:41
world 15:29:41
hello 15:29:42
world 15:29:42

通过上面两个示例打印的顺序和时间可以看出awaitasyncio.create_task的区别

本来准备介绍一下asyncio中的TCP和UDP接口,但是抄袭官方文档没有意义,而且我懒得写了,下面是一个TCP server的示例,旨在演示如何使用协程并发处理客户请求。

/block的请求处理函数中有一个延时10秒的操作(await asyncio.sleep(delay)),但是因为使用异步操作进行,所有不需要等待它结束就能相应其它请求。

  • await asyncio.sleep(delay)将当前协程让出,运行asyncio.sleep(delay)返回的协程。
  • asyncio.sleep(delay)返回的协程里,会创建一个Future对象,并在EventLoop中注册(EventLoop将在delay秒后将Future对象的状态设为done ),之后await future让出,等待future的状态变为done
  • 由于目标不是协程,EventLoop会从就绪队列中选取一个协程来运行,因此可以对新的请求做出相应。
import asyncio
import re

class DemoProtocol(asyncio.Protocol):
    # 获取url的正则
    url_re = re.compile(b'GET (.*) HTTP/1.1')

    # 连接创建时的回调函数
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    # 收到数据时的回调函数
    def data_received(self, data):
        # 获取url
        url = DemoProtocol.url_re.match(data).group(1)
        print("GET", url)
        # 根据url做不同的处理
        if url == b"/block" :
            # 10s后响应
            asyncio.create_task(self.response_after(b'<h1>Are you block?</h1>', 10))
        else:
            asyncio.create_task(self.response(b'<h1>hello world</h1>'))

    # 立刻返回响应
    async def response(self, content):
        self.transport.write(b"HTTP/1.1 200 OK\r\n")
        self.transport.write(b"Content-Type: text/html\r\n")
        self.transport.write(b"\r\n")
        self.transport.write(content)
        self.transport.write(b"\r\n")
        self.transport.close()

    # 延迟返回响应
    async def response_after(self, content, delay):
        await asyncio.sleep(delay)
        await self.response(content)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(lambda: DemoProtocol(), '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()

asyncio.run(main())