Skip to content

Commit a7fba1d

Browse files
authored
Simpler message frame implementation. (#401)
1 parent 548cb7a commit a7fba1d

File tree

3 files changed

+138
-309
lines changed

3 files changed

+138
-309
lines changed

lib/src/message_window.dart

Lines changed: 70 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import 'dart:async';
2-
import 'dart:collection';
32
import 'dart:typed_data';
43

54
import 'package:buffer/buffer.dart';
@@ -35,76 +34,95 @@ Map<int, _ServerMessageFn> _messageTypeMap = {
3534
$N: NoticeMessage.parse,
3635
};
3736

38-
class MessageFramer {
39-
final CodecContext _codecContext;
40-
late final _reader = PgByteDataReader(codecContext: _codecContext);
41-
final messageQueue = Queue<ServerMessage>();
37+
class _BytesFrame {
38+
final int type;
39+
final int length;
40+
final Uint8List bytes;
4241

43-
MessageFramer(this._codecContext);
42+
_BytesFrame(this.type, this.length, this.bytes);
43+
}
4444

45-
int? _type;
46-
int _expectedLength = 0;
45+
StreamTransformer<Uint8List, ServerMessage> bytesToMessageParser() {
46+
return StreamTransformer<Uint8List, ServerMessage>.fromHandlers(
47+
handleData: (data, sink) {},
48+
);
49+
}
4750

48-
bool get _hasReadHeader => _type != null;
49-
bool get _canReadHeader => _reader.remainingLength >= _headerByteSize;
51+
final _emptyData = Uint8List(0);
5052

51-
bool get _isComplete =>
52-
_expectedLength == 0 || _expectedLength <= _reader.remainingLength;
53+
class _BytesToFrameParser
54+
extends StreamTransformerBase<Uint8List, _BytesFrame> {
55+
final CodecContext _codecContext;
5356

54-
Future<void> addBytes(Uint8List bytes) async {
55-
_reader.add(bytes);
57+
_BytesToFrameParser(this._codecContext);
5658

57-
while (true) {
58-
if (!_hasReadHeader && _canReadHeader) {
59-
_type = _reader.readUint8();
60-
_expectedLength = _reader.readUint32() - 4;
61-
}
59+
@override
60+
Stream<_BytesFrame> bind(Stream<Uint8List> stream) async* {
61+
final reader = PgByteDataReader(codecContext: _codecContext);
6262

63-
// special case
64-
if (_type == SharedMessageId.copyDone) {
65-
// unlike other messages, CopyDoneMessage only takes the length as an
66-
// argument (must be the full length including the length bytes)
67-
final msg = CopyDoneMessage(_expectedLength + 4);
68-
_addMsg(msg);
69-
continue;
70-
}
63+
int? type;
64+
int expectedLength = 0;
7165

72-
if (_hasReadHeader && _isComplete) {
73-
final msgMaker = _messageTypeMap[_type];
74-
if (msgMaker == null) {
75-
_addMsg(UnknownMessage(_type!, _reader.read(_expectedLength)));
76-
continue;
66+
await for (final bytes in stream) {
67+
reader.add(bytes);
68+
69+
while (true) {
70+
if (type == null && reader.remainingLength >= _headerByteSize) {
71+
type = reader.readUint8();
72+
expectedLength = reader.readUint32() - 4;
7773
}
7874

79-
final targetRemainingLength = _reader.remainingLength - _expectedLength;
80-
final msg = await msgMaker(_reader, _expectedLength);
81-
if (_reader.remainingLength > targetRemainingLength) {
82-
throw StateError(
83-
'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength');
75+
// special case
76+
if (type == SharedMessageId.copyDone) {
77+
// unlike other messages, CopyDoneMessage only takes the length as an
78+
// argument (must be the full length including the length bytes)
79+
yield _BytesFrame(type!, expectedLength, _emptyData);
80+
type = null;
81+
expectedLength = 0;
82+
continue;
8483
}
85-
// consume the rest of the message
86-
if (_reader.remainingLength < targetRemainingLength) {
87-
_reader.read(targetRemainingLength - _reader.remainingLength);
84+
85+
if (type != null && expectedLength <= reader.remainingLength) {
86+
final data = reader.read(expectedLength);
87+
yield _BytesFrame(type, expectedLength, data);
88+
type = null;
89+
expectedLength = 0;
90+
continue;
8891
}
8992

90-
_addMsg(msg);
91-
continue;
93+
break;
9294
}
93-
94-
break;
9595
}
9696
}
97+
}
9798

98-
void _addMsg(ServerMessage msg) {
99-
messageQueue.add(msg);
100-
_type = null;
101-
_expectedLength = 0;
102-
}
99+
class BytesToMessageParser
100+
extends StreamTransformerBase<Uint8List, ServerMessage> {
101+
final CodecContext _codecContext;
102+
103+
BytesToMessageParser(this._codecContext);
103104

104-
bool get hasMessage => messageQueue.isNotEmpty;
105+
@override
106+
Stream<ServerMessage> bind(Stream<Uint8List> stream) {
107+
return stream
108+
.transform(_BytesToFrameParser(_codecContext))
109+
.asyncMap((frame) async {
110+
// special case
111+
if (frame.type == SharedMessageId.copyDone) {
112+
// unlike other messages, CopyDoneMessage only takes the length as an
113+
// argument (must be the full length including the length bytes)
114+
return CopyDoneMessage(frame.length + 4);
115+
}
116+
117+
final msgMaker = _messageTypeMap[frame.type];
118+
if (msgMaker == null) {
119+
return UnknownMessage(frame.type, frame.bytes);
120+
}
105121

106-
ServerMessage popMessage() {
107-
return messageQueue.removeFirst();
122+
return await msgMaker(
123+
PgByteDataReader(codecContext: _codecContext)..add(frame.bytes),
124+
frame.bytes.length);
125+
});
108126
}
109127
}
110128

lib/src/v3/protocol.dart

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
import 'dart:async';
2-
import 'dart:typed_data';
3-
41
import 'package:async/async.dart';
52
import 'package:postgres/src/types/codec.dart';
63
import 'package:stream_channel/stream_channel.dart';
74

85
import '../buffer.dart';
96
import '../message_window.dart';
107
import '../messages/client_messages.dart';
11-
import '../messages/server_messages.dart';
128
import '../messages/shared_messages.dart';
139

1410
export '../messages/client_messages.dart';
@@ -36,7 +32,7 @@ class AggregatedClientMessage extends ClientMessage {
3632
StreamChannelTransformer<Message, List<int>> messageTransformer(
3733
CodecContext codecContext) {
3834
return StreamChannelTransformer(
39-
_readMessages(codecContext),
35+
BytesToMessageParser(codecContext),
4036
StreamSinkTransformer.fromHandlers(
4137
handleData: (message, out) {
4238
if (message is! ClientMessage) {
@@ -52,59 +48,3 @@ StreamChannelTransformer<Message, List<int>> messageTransformer(
5248
),
5349
);
5450
}
55-
56-
StreamTransformer<Uint8List, ServerMessage> _readMessages(
57-
CodecContext codecContext) {
58-
return StreamTransformer.fromBind((rawStream) {
59-
return Stream.multi((listener) {
60-
final framer = MessageFramer(codecContext);
61-
62-
var paused = false;
63-
64-
void emitFinishedMessages() {
65-
while (framer.hasMessage) {
66-
listener.addSync(framer.popMessage());
67-
68-
if (paused) break;
69-
}
70-
}
71-
72-
Future<void> handleChunk() async {
73-
try {
74-
// await framer.addBytes(bytes);
75-
emitFinishedMessages();
76-
} catch (e, st) {
77-
listener.addErrorSync(e, st);
78-
}
79-
}
80-
81-
// Don't cancel this subscription on error! If the listener wants that,
82-
// they'll unsubscribe in time after we forward it synchronously.
83-
final rawSubscription = rawStream
84-
// TODO: figure out a better way to handle multiple callbacks to framer
85-
.asyncMap(framer.addBytes)
86-
.listen((_) => handleChunk(), cancelOnError: false)
87-
..onError(listener.addErrorSync)
88-
..onDone(listener.closeSync);
89-
90-
listener.onPause = () {
91-
paused = true;
92-
rawSubscription.pause();
93-
};
94-
95-
listener.onResume = () {
96-
paused = false;
97-
emitFinishedMessages();
98-
99-
if (!paused) {
100-
rawSubscription.resume();
101-
}
102-
};
103-
104-
listener.onCancel = () {
105-
paused = true;
106-
rawSubscription.cancel();
107-
};
108-
});
109-
});
110-
}

0 commit comments

Comments
 (0)