DBus 自省 XML 生成 Qt 代码
各种支持 DBus 的开发框架都能够通过 XML 自动生成代码,例如 Glib 的 gdbus-codegen
和 Qt 的 qdbusxml2cpp
。
通过 DBus 对象 org.freedesktop.DBus.Introspectable
接口下的 Introspect
方法可以自省 XML,这样就不需要手写了。
但是 d-feet、dbus-send 等工具会给返回值加上类型标注或者换行符导致需要人工修改。因此需要自己写一个脚本来自省 DBus。
使用示例:
$ ./introspect.py -t system -n org.freedesktop.NetworkManager -p /org/freedesktop/NetworkManager
$ ./introspect.py -t system -n org.freedesktop.NetworkManager -p /org/freedesktop/NetworkManager/Devices/5
$ ls -1
introspect.py
org.freedesktop.NetworkManager.Device.Statistics.xml
org.freedesktop.NetworkManager.Device.WifiP2P.xml
org.freedesktop.NetworkManager.Device.xml
org.freedesktop.NetworkManager.xml
$ qdbusxml2cpp -c NetworkManager -p NetworkManager org.freedesktop.NetworkManager.xml --no-namespaces
$ qdbusxml2cpp -c Device -p Device org.freedesktop.NetworkManager.Device.xml --no-namespaces
$ qdbusxml2cpp -c WifiP2P -p WifiP2P org.freedesktop.NetworkManager.Device.WifiP2P.xml --no-namespaces
$ ls -1
Device.cpp
Device.h
introspect.py
NetworkManager.cpp
NetworkManager.h
org.freedesktop.NetworkManager.Device.Statistics.xml
org.freedesktop.NetworkManager.Device.WifiP2P.xml
org.freedesktop.NetworkManager.Device.xml
org.freedesktop.NetworkManager.xml
WifiP2P.cpp
WifiP2P.h
使用
--no-namespaces
选项的原因是,qdbusxml2cpp
会把 DBus Name 的最后一项作为类名,而之前项作为命名空间名 即org.freedesktop.NetworkManager
生成::org::freedesktop::NetworkManager
类。 而org.freedesktop.NetworkManager.Device
生成的::org::freedesktop::NetworkManager::Device
类。 这两者会发生冲突。前者的NetworkManager
是类名,而后者的是命名空间明。
验证生成的代码:
#include <QDebug>
#include "NetworkManager.h"
#include "Device.h"
#include "WifiP2P.h"
int main(void)
{
NetworkManager networkManager{"org.freedesktop.NetworkManager", "/org/freedesktop/NetworkManager", QDBusConnection::systemBus()};
auto devicePathes = networkManager.devices();
for (auto& devicePath : devicePathes)
{
Device device("org.freedesktop.NetworkManager", devicePath.path(), QDBusConnection::systemBus());
qDebug() << "check wifi p2p device" << devicePath.path();
if (device.deviceType() == 30)
{
qDebug() << "found wifi p2p device" << devicePath.path();
break;
}
}
}
17:06:43: Starting /home/planc/miracast/build-wifi-p2p-unknown-Default/wifi-p2p...
check wifi p2p device "/org/freedesktop/NetworkManager/Devices/1"
check wifi p2p device "/org/freedesktop/NetworkManager/Devices/2"
check wifi p2p device "/org/freedesktop/NetworkManager/Devices/5"
found wifi p2p device "/org/freedesktop/NetworkManager/Devices/5"
17:06:43: /home/planc/miracast/build-wifi-p2p-unknown-Default/wifi-p2p exited with code 0
如果看不到日志打印可以参考 Qt 日志模块的使用 进行配置
#! /usr/bin/env python3
import dbus
from dbus.proxies import ProxyObject
import xml.dom.minidom as minidom
from typing import Callable, List, Set, Dict
from argparse import ArgumentParser, Namespace
# 不需要的 Interface
filter:Set[str] = {
"org.freedesktop.DBus.Introspectable",
"org.freedesktop.DBus.Peer",
"org.freedesktop.DBus.Properties",
}
class DBusTypeParser(object):
dbusQtContainerType:Dict[str,str] = {
"<array>": "QList",
"<struct>": "QVariant",
"<dict>": "QMap",
}
dbusQtType:Dict[str,str] = {
"y": "quint8",
"b": "bool",
"n": "qint16",
"q": "quint16",
"i": "qint32",
"u": "quint32",
"x": "qint64",
"t": "quint64",
"d": "double",
"h": "quint32",
"s": "QString",
"o": "QDBusObject",
"g": "QString",
"v": "QVariant",
}
def __init__(self) -> None:
# 状态
self.currentState:str = "<normal>"
self.stateStack:List[str] = []
# dict里的第几个参数
self.currentIndex:int = 0
self.indexStack:List[int] = []
def pushState(self, state:str) -> None:
if state == "<dict>":
self.pushIndex(self.currentIndex)
self.currentIndex = 0
self.stateStack.append(state)
def popState(self) -> str:
state:str = self.stateStack.pop()
if state == "<dict>":
self.currentIndex = self.popIndex()
return state
def pushIndex(self, index:int) -> None:
self.indexStack.append(index)
def popIndex(self) -> int:
return self.indexStack.pop()
def parse(self, signature:str) -> str:
self.currentState = "<normal>"
self.currentIndex = 0
qtype:str = ""
while len(signature) > 0:
ch:str = signature[0]
signature = signature[1:]
if ch in DBusTypeParser.dbusQtType:
if self.currentState == "<normal>":
qtype += self.parseNormal(ch)
elif self.currentState == "<struct>":
qtype += self.parseStruct(ch)
elif self.currentState == "<dict>":
qtype += self.parseDict(ch)
elif self.currentState == "<array>":
qtype += self.parseArray(ch)
elif ch == "a" and signature:
if self.currentState == "<dict>" and self.currentIndex == 0:
qtype += DBusTypeParser.dbusQtContainerType['<dict>'] + "<"
self.pushState(self.currentState)
self.currentState = "<array>"
elif ch == "(":
if self.currentState == "<dict>" and self.currentIndex == 0:
qtype += DBusTypeParser.dbusQtContainerType['<dict>'] + "<"
self.pushState(self.currentState)
self.currentState = "<struct>"
elif ch == "{":
# a{ 开启dict模式,之前为暂态的array模式,不push
self.currentState = "<dict>"
elif ch == ")":
qtype += self.parseStruct(ch)
if self.currentState == "<dict>" and self.currentIndex == 0:
qtype += ", "
self.currentIndex += 1
elif ch == "}":
self.currentState = self.popState()
qtype += self.parseDict(ch)
if self.currentState == "<dict>" and self.currentIndex == 0:
qtype += ", "
self.currentIndex += 1
else:
print(f"{ch}")
raise f"Unknown signature '{ch}'"
return qtype
def parseNormal(self, ch:str) -> str:
return DBusTypeParser.dbusQtType[ch]
def parseArray(self, ch:str) -> str:
self.currentState = self.popState()
return f"{DBusTypeParser.dbusQtContainerType['<array>']}<{DBusTypeParser.dbusQtType[ch]}>"
def parseStruct(self, ch:str) -> str:
if ch != ")":
return ""
self.currentState = self.popState()
if self.currentState == "<struct>":
return ""
if self.currentState == "<normal>":
return DBusTypeParser.dbusQtContainerType["<struct>"]
if self.currentState == "<dict>":
return DBusTypeParser.dbusQtContainerType["<struct>"]
if self.currentState == "<array>":
self.currentState = self.popState()
return f"{DBusTypeParser.dbusQtContainerType['<array>']}<{DBusTypeParser.dbusQtContainerType['<struct>']}>"
def parseDict(self, ch:str) -> str:
if ch == "}":
return ">"
self.currentIndex += 1
if self.currentIndex == 1:
return DBusTypeParser.dbusQtContainerType['<dict>'] + "<" + DBusTypeParser.dbusQtType[ch] + ", "
else:
return DBusTypeParser.dbusQtType[ch]
dbusTypeParser = DBusTypeParser()
parser:ArgumentParser = ArgumentParser(description='DBus Introspect XML')
parser.add_argument("-t", "--type", default="session", help="bus type, system or session")
parser.add_argument("-n", "--name", help="bus namae")
parser.add_argument("-p", "--path", help="object path")
args:Namespace = parser.parse_args()
bus:dbus.Bus = dbus.SystemBus() if args.type == "system" else dbus.SessionBus()
proxy:ProxyObject = bus.get_object(args.name, args.path)
xmlString:str = proxy.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')
root:minidom.Document = minidom.parseString(xmlString)
interfaces:List[minidom.Element] = root.getElementsByTagName("interface")
neededInterfaces:List[minidom.Element] = []
for interface in interfaces:
name:str = interface.getAttribute("name")
if name in filter:
continue
methods:List[minidom.Element] = interface.getElementsByTagName("method")
for method in methods:
inIndex:int = 0
outIndex:int = 0
methodArgs:List[minidom.Element] = method.getElementsByTagName("arg")
for arg in methodArgs:
sign:str = arg.getAttribute("type")
qtype:str = dbusTypeParser.parse(sign)
annotation:minidom.Element = root.createElement("annotation")
if arg.getAttribute("direction") == "in":
annotation.setAttribute("name", f"org.qtproject.QtDBus.QtTypeName.In{inIndex}")
inIndex += 1
if arg.getAttribute("direction") == "out":
annotation.setAttribute("name", f"org.qtproject.QtDBus.QtTypeName.Out{outIndex}")
outIndex += 1
annotation.setAttribute("value", qtype)
method.appendChild(annotation)
properties = interface.getElementsByTagName("property")
for property in properties:
sign:str = property.getAttribute("type")
qtype:str = dbusTypeParser.parse(sign)
annotation:minidom.Element = root.createElement("annotation")
annotation.setAttribute("name", "org.qtproject.QtDBus.QtTypeName")
annotation.setAttribute("value", qtype)
property.appendChild(annotation)
neededInterfaces.append(interface)
for interface in neededInterfaces:
with open(interface.getAttribute("name") + ".xml", "w") as fp:
fp.write(interface.toprettyxml())
Python 虚拟环境与依赖导出
因为不同的项目之间可能存在依赖冲突,因此需要使用虚拟环境,避免在全局环境上安装依赖。
Python 依赖导出
通过 pipreqs
可以将当前项目依赖的所有包导出到 requirements.txt
中:
$ pipreqs . --encoding=utf8 --force
注意,如果在当前路径下配置了虚拟环境,pipreqs 会读取寻环境的目录产生多余的依赖,需要添加
--ignore .venv
参数来消除影响
可以使用--proxy http://localhost:7890
来配置代理
然后通过该文件即可一键安装所有依赖
$ pip install -r requirements.txt
virtualenv
- 安装
pip install virtualenv
- 创建
virtualenv [虚拟环境名称]
- 激活
cd [虚拟环境目录]
source ./bin/activate
- 退出
deactivate
pipenv
- 安装
pip install pipenv
- 创建
pipenv install
在项目目录里执行,如果没有
pipfile
,则会创建;如果有则会安装记录在当中的依赖
- 安装依赖包
pipenv install [包名]
在项目目录里执行,安装依赖的同时记录到
pipfile
中去,如果文件不存在则自动创建
- 激活
pipenv shell
另外还有
virtualenvwrapper
、conda
可以管理虚拟环境,此处暂不列出。
Python的异步IO
协程
Python 的异步 I/O 基于协程实现。使用async
关键字来创建一个异步函数,对异步函数的调用不会执行该函数,而是生成一个协程对象。
对每一个协程对象,都必须等待其结束(即使是没有启动的协程),否则会产生一个RuntimeWarning
。
示例 :
# 创建一个异步函数
async def say_hello():
print("hello world")
# 创建协程
coro = say_hello()
print(coro)
运行结果 :
<coroutine object say_hello at 0x109bf6170>
sys:1: RuntimeWarning: coroutine 'say_hello' was never awaited
要启动一个协程,有三种方式 :
- 通过
asyncio.run
运行一个协程 - 使用
await
关键字,这种方法只能在另一个async
函数中才能使用 - 通过
asyncio.create_task
await
必须在async
函数中才能使用,因此无法启动协程的顶层入口点,此时只能使用asyncio.run
函数。
await
让出当前协程并运行目标协程,当前协程直到目标目标的状态变为done
时才会恢复就绪。 如果await
的目标不是一个协程(例如Task和Future),让出当前协程后,事件循环(EventLoop
)会从就绪队列中选择一个协程运行。
asyncio.create_task
让出当前协程并运行目标协程,当前协程不会等待而是加入就绪队列。
只要目标协程让出,当前协程就有机会执行,从而将启动多个协程,实现并发执行。
返回的Task
对象也可以在适当的时候使用await
等待其结束。
简化的协程状态 :
await
的示例 :
import asyncio
import time
async def say_hello():
print("hello", time.strftime('%X'))
await asyncio.sleep(1)
print("hello", time.strftime('%X'))
async def say_world():
print("world", time.strftime('%X'))
await asyncio.sleep(1)
print("world", time.strftime('%X'))
# 顶层入口点
async def main():
await say_hello() # 启动say_hello()返回的协程,并等待其结束
await say_world() # 要等到前一个await结束后,才会启动
# 启动顶层入口点
asyncio.run(main())
运行结果 :
hello 15:27:26
hello 15:27:27
world 15:27:27
world 15:27:28
asyncio.create_task
的示例 :
import asyncio
import time
async def say_hello():
print("hello", time.strftime('%X'))
await asyncio.sleep(1)
print("hello", time.strftime('%X'))
async def say_world():
print("world", time.strftime('%X'))
await asyncio.sleep(1)
print("world", time.strftime('%X'))
# 顶层入口点
async def main():
task_say_hello = asyncio.create_task(say_hello()) # 启动协程不等待
task_say_world = asyncio.create_task(say_world())
await task_say_hello
await task_say_world
# 启动顶层入口点
asyncio.run(main())
运行结果 :
hello 15:29:41
world 15:29:41
hello 15:29:42
world 15:29:42
通过上面两个示例打印的顺序和时间可以看出await
和asyncio.create_task
的区别
本来准备介绍一下asyncio
中的TCP和UDP接口,但是抄袭官方文档没有意义,而且我懒得写了,下面是一个TCP server的示例,旨在演示如何使用协程并发处理客户请求。
在/block
的请求处理函数中有一个延时10秒的操作(await asyncio.sleep(delay)
),但是因为使用异步操作进行,所有不需要等待它结束就能相应其它请求。
await asyncio.sleep(delay)
将当前协程让出,运行asyncio.sleep(delay)
返回的协程。asyncio.sleep(delay)
返回的协程里,会创建一个Future
对象,并在EventLoop
中注册(EventLoop
将在delay
秒后将Future
对象的状态设为done
),之后await future
让出,等待future
的状态变为done
。- 由于目标不是协程,
EventLoop
会从就绪队列中选取一个协程来运行,因此可以对新的请求做出相应。
import asyncio
import re
class DemoProtocol(asyncio.Protocol):
# 获取url的正则
url_re = re.compile(b'GET (.*) HTTP/1.1')
# 连接创建时的回调函数
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
# 收到数据时的回调函数
def data_received(self, data):
# 获取url
url = DemoProtocol.url_re.match(data).group(1)
print("GET", url)
# 根据url做不同的处理
if url == b"/block" :
# 10s后响应
asyncio.create_task(self.response_after(b'<h1>Are you block?</h1>', 10))
else:
asyncio.create_task(self.response(b'<h1>hello world</h1>'))
# 立刻返回响应
async def response(self, content):
self.transport.write(b"HTTP/1.1 200 OK\r\n")
self.transport.write(b"Content-Type: text/html\r\n")
self.transport.write(b"\r\n")
self.transport.write(content)
self.transport.write(b"\r\n")
self.transport.close()
# 延迟返回响应
async def response_after(self, content, delay):
await asyncio.sleep(delay)
await self.response(content)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(lambda: DemoProtocol(), '127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())