Skip to content

Commit 93b9c34

Browse files
Improve MixinWSApi
1 parent 667dbf5 commit 93b9c34

File tree

1 file changed

+62
-31
lines changed

1 file changed

+62
-31
lines changed

pysrc/mixin_ws_api.py

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Union, Callable, Awaitable, Optional, Literal
1212
from io import BytesIO
1313
import base64
14+
import asyncio
1415
import websockets
1516

1617
from dataclasses import dataclass
@@ -65,6 +66,7 @@ def __init__(self, bot_config, on_message: Callable[[str, str, Optional[MessageV
6566
self.bot = MixinBotApi(bot_config)
6667
self.ws = None
6768
self._on_message = on_message
69+
self._paused = False
6870

6971
async def connect(self):
7072
if self.ws:
@@ -77,41 +79,70 @@ async def connect(self):
7779
headers={"Authorization": "Bearer " + encoded}
7880
self.ws = await websockets.connect(uri, subprotocols=["Mixin-Blaze-1"], extra_headers=headers)
7981

82+
@property
83+
def paused(self):
84+
return self._paused
85+
86+
@paused.setter
87+
def paused(self, value):
88+
self._paused = value
89+
90+
async def close_ws(self):
91+
try:
92+
await self.ws.close()
93+
self.ws = None
94+
except Exception as e:
95+
logger.info("%s", e)
96+
97+
async def handle_message(self):
98+
if not self.ws:
99+
return
100+
msg = await self.ws.recv()
101+
msg = BytesIO(msg)
102+
msg = gzip.GzipFile(mode="rb", fileobj=msg)
103+
msg = msg.read()
104+
msg = json.loads(msg)
105+
try:
106+
view = MessageView.from_dict(msg['data'])
107+
except KeyError:
108+
view = None
109+
await self._on_message(msg['id'], msg['action'], view)
110+
111+
async def _run(self):
112+
await self.connect()
113+
114+
Message = {"id": str(uuid.uuid1()), "action": "LIST_PENDING_MESSAGES"}
115+
Message_instring = json.dumps(Message)
116+
117+
fgz = BytesIO()
118+
gzip_obj = gzip.GzipFile(mode='wb', fileobj=fgz)
119+
gzip_obj.write(Message_instring.encode())
120+
gzip_obj.close()
121+
122+
await self.ws.send(fgz.getvalue())
123+
124+
while self.ws and not self.paused:
125+
await self.handle_message()
126+
80127
"""
81128
run websocket server forever
82129
"""
83130
async def run(self):
84-
while True:
85-
await self.connect()
86-
87-
Message = {"id": str(uuid.uuid1()), "action": "LIST_PENDING_MESSAGES"}
88-
Message_instring = json.dumps(Message)
89-
90-
fgz = BytesIO()
91-
gzip_obj = gzip.GzipFile(mode='wb', fileobj=fgz)
92-
gzip_obj.write(Message_instring.encode())
93-
gzip_obj.close()
94-
95-
await self.ws.send(fgz.getvalue())
96-
while True:
97-
if not self.ws:
98-
return
99-
try:
100-
msg = await self.ws.recv()
101-
except websockets.exceptions.ConnectionClosedError:
102-
self.ws = None
103-
break
104-
except websockets.exceptions.ConnectionClosedOK:
105-
return
106-
msg = BytesIO(msg)
107-
msg = gzip.GzipFile(mode="rb", fileobj=msg)
108-
msg = msg.read()
109-
msg = json.loads(msg)
110-
try:
111-
view = MessageView.from_dict(msg['data'])
112-
except KeyError:
113-
view = None
114-
await self._on_message(msg['id'], msg['action'], view)
131+
while not self.paused:
132+
try:
133+
await self._run()
134+
except websockets.exceptions.ConnectionClosedError:
135+
logger.info("+++++ConnectionClosedError")
136+
self.ws = None
137+
await asyncio.sleep(1.0)
138+
continue
139+
except websockets.exceptions.ConnectionClosedOK:
140+
logger.info("+++++ConnectionClosedOK")
141+
return
142+
except Exception as e:
143+
logger.exception(e)
144+
self.ws = None
145+
await asyncio.sleep(1.0)
115146

116147
"""
117148
=================

0 commit comments

Comments
 (0)